drbd: Convert timers to use timer_setup()
[linux-2.6/btrfs-unstable.git] / drivers / block / drbd / drbd_worker.c
blob1476cb3439f46e53a8f42a9397fb6b19afd8ff95
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
45 /* endio handlers:
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = blk_status_to_errno(bio->bi_status);
68 /* special case: drbd_md_read() during drbd_adm_attach() */
69 if (device->ldev)
70 put_ldev(device);
71 bio_put(bio);
73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 * to timeout on the lower level device, and eventually detach from it.
75 * If this io completion runs after that timeout expired, this
76 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 * During normal operation, this only puts that extra reference
78 * down to 1 again.
79 * Make sure we first drop the reference, and only then signal
80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 * next drbd_md_sync_page_io(), that we trigger the
82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
84 drbd_md_put_buffer(device);
85 device->md_io.done = 1;
86 wake_up(&device->misc_wait);
89 /* reads on behalf of the partner,
90 * "submitted" by the receiver
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
94 unsigned long flags = 0;
95 struct drbd_peer_device *peer_device = peer_req->peer_device;
96 struct drbd_device *device = peer_device->device;
98 spin_lock_irqsave(&device->resource->req_lock, flags);
99 device->read_cnt += peer_req->i.size >> 9;
100 list_del(&peer_req->w.list);
101 if (list_empty(&device->read_ee))
102 wake_up(&device->ee_wait);
103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 __drbd_chk_io_error(device, DRBD_READ_ERROR);
105 spin_unlock_irqrestore(&device->resource->req_lock, flags);
107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 put_ldev(device);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_peer_device *peer_device = peer_req->peer_device;
117 struct drbd_device *device = peer_device->device;
118 struct drbd_connection *connection = peer_device->connection;
119 struct drbd_interval i;
120 int do_wake;
121 u64 block_id;
122 int do_al_complete_io;
124 /* after we moved peer_req to done_ee,
125 * we may no longer access it,
126 * it may be freed/reused already!
127 * (as soon as we release the req_lock) */
128 i = peer_req->i;
129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 block_id = peer_req->block_id;
131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
133 if (peer_req->flags & EE_WAS_ERROR) {
134 /* In protocol != C, we usually do not send write acks.
135 * In case of a write error, send the neg ack anyways. */
136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 inc_unacked(device);
138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
141 spin_lock_irqsave(&device->resource->req_lock, flags);
142 device->writ_cnt += peer_req->i.size >> 9;
143 list_move_tail(&peer_req->w.list, &device->done_ee);
146 * Do not remove from the write_requests tree here: we did not send the
147 * Ack yet and did not wake possibly waiting conflicting requests.
148 * Removed from the tree from "drbd_process_done_ee" within the
149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 * _drbd_clear_done_ee.
153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
155 /* FIXME do we want to detach for failed REQ_DISCARD?
156 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
157 if (peer_req->flags & EE_WAS_ERROR)
158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
160 if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 kref_put(&device->kref, drbd_destroy_device);
165 spin_unlock_irqrestore(&device->resource->req_lock, flags);
167 if (block_id == ID_SYNCER)
168 drbd_rs_complete_io(device, i.sector);
170 if (do_wake)
171 wake_up(&device->ee_wait);
173 if (do_al_complete_io)
174 drbd_al_complete_io(device, &i);
176 put_ldev(device);
179 /* writes on behalf of the partner, or resync writes,
180 * "submitted" by the receiver.
182 void drbd_peer_request_endio(struct bio *bio)
184 struct drbd_peer_request *peer_req = bio->bi_private;
185 struct drbd_device *device = peer_req->peer_device->device;
186 bool is_write = bio_data_dir(bio) == WRITE;
187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 bio_op(bio) == REQ_OP_DISCARD;
190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 drbd_warn(device, "%s: error=%d s=%llus\n",
192 is_write ? (is_discard ? "discard" : "write")
193 : "read", bio->bi_status,
194 (unsigned long long)peer_req->i.sector);
196 if (bio->bi_status)
197 set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 bio_put(bio); /* no need for the bio anymore */
200 if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 if (is_write)
202 drbd_endio_write_sec_final(peer_req);
203 else
204 drbd_endio_read_sec_final(peer_req);
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 device->minor, device->resource->name, device->vnr);
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217 void drbd_request_endio(struct bio *bio)
219 unsigned long flags;
220 struct drbd_request *req = bio->bi_private;
221 struct drbd_device *device = req->device;
222 struct bio_and_error m;
223 enum drbd_req_event what;
225 /* If this request was aborted locally before,
226 * but now was completed "successfully",
227 * chances are that this caused arbitrary data corruption.
229 * "aborting" requests, or force-detaching the disk, is intended for
230 * completely blocked/hung local backing devices which do no longer
231 * complete requests at all, not even do error completions. In this
232 * situation, usually a hard-reset and failover is the only way out.
234 * By "aborting", basically faking a local error-completion,
235 * we allow for a more graceful swichover by cleanly migrating services.
236 * Still the affected node has to be rebooted "soon".
238 * By completing these requests, we allow the upper layers to re-use
239 * the associated data pages.
241 * If later the local backing device "recovers", and now DMAs some data
242 * from disk into the original request pages, in the best case it will
243 * just put random data into unused pages; but typically it will corrupt
244 * meanwhile completely unrelated data, causing all sorts of damage.
246 * Which means delayed successful completion,
247 * especially for READ requests,
248 * is a reason to panic().
250 * We assume that a delayed *error* completion is OK,
251 * though we still will complain noisily about it.
253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 if (__ratelimit(&drbd_ratelimit_state))
255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
257 if (!bio->bi_status)
258 drbd_panic_after_delayed_completion_of_aborted_request(device);
261 /* to avoid recursion in __req_mod */
262 if (unlikely(bio->bi_status)) {
263 switch (bio_op(bio)) {
264 case REQ_OP_WRITE_ZEROES:
265 case REQ_OP_DISCARD:
266 if (bio->bi_status == BLK_STS_NOTSUPP)
267 what = DISCARD_COMPLETED_NOTSUPP;
268 else
269 what = DISCARD_COMPLETED_WITH_ERROR;
270 break;
271 case REQ_OP_READ:
272 if (bio->bi_opf & REQ_RAHEAD)
273 what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 else
275 what = READ_COMPLETED_WITH_ERROR;
276 break;
277 default:
278 what = WRITE_COMPLETED_WITH_ERROR;
279 break;
281 } else {
282 what = COMPLETED_OK;
285 bio_put(req->private_bio);
286 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
288 /* not req_mod(), we need irqsave here! */
289 spin_lock_irqsave(&device->resource->req_lock, flags);
290 __req_mod(req, what, &m);
291 spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 put_ldev(device);
294 if (m.bio)
295 complete_master_bio(device, &m);
298 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
300 AHASH_REQUEST_ON_STACK(req, tfm);
301 struct scatterlist sg;
302 struct page *page = peer_req->pages;
303 struct page *tmp;
304 unsigned len;
306 ahash_request_set_tfm(req, tfm);
307 ahash_request_set_callback(req, 0, NULL, NULL);
309 sg_init_table(&sg, 1);
310 crypto_ahash_init(req);
312 while ((tmp = page_chain_next(page))) {
313 /* all but the last page will be fully used */
314 sg_set_page(&sg, page, PAGE_SIZE, 0);
315 ahash_request_set_crypt(req, &sg, NULL, sg.length);
316 crypto_ahash_update(req);
317 page = tmp;
319 /* and now the last, possibly only partially used page */
320 len = peer_req->i.size & (PAGE_SIZE - 1);
321 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
322 ahash_request_set_crypt(req, &sg, digest, sg.length);
323 crypto_ahash_finup(req);
324 ahash_request_zero(req);
327 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
329 AHASH_REQUEST_ON_STACK(req, tfm);
330 struct scatterlist sg;
331 struct bio_vec bvec;
332 struct bvec_iter iter;
334 ahash_request_set_tfm(req, tfm);
335 ahash_request_set_callback(req, 0, NULL, NULL);
337 sg_init_table(&sg, 1);
338 crypto_ahash_init(req);
340 bio_for_each_segment(bvec, bio, iter) {
341 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
342 ahash_request_set_crypt(req, &sg, NULL, sg.length);
343 crypto_ahash_update(req);
344 /* REQ_OP_WRITE_SAME has only one segment,
345 * checksum the payload only once. */
346 if (bio_op(bio) == REQ_OP_WRITE_SAME)
347 break;
349 ahash_request_set_crypt(req, NULL, digest, 0);
350 crypto_ahash_final(req);
351 ahash_request_zero(req);
354 /* MAYBE merge common code with w_e_end_ov_req */
355 static int w_e_send_csum(struct drbd_work *w, int cancel)
357 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
358 struct drbd_peer_device *peer_device = peer_req->peer_device;
359 struct drbd_device *device = peer_device->device;
360 int digest_size;
361 void *digest;
362 int err = 0;
364 if (unlikely(cancel))
365 goto out;
367 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
368 goto out;
370 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
371 digest = kmalloc(digest_size, GFP_NOIO);
372 if (digest) {
373 sector_t sector = peer_req->i.sector;
374 unsigned int size = peer_req->i.size;
375 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
376 /* Free peer_req and pages before send.
377 * In case we block on congestion, we could otherwise run into
378 * some distributed deadlock, if the other side blocks on
379 * congestion as well, because our receiver blocks in
380 * drbd_alloc_pages due to pp_in_use > max_buffers. */
381 drbd_free_peer_req(device, peer_req);
382 peer_req = NULL;
383 inc_rs_pending(device);
384 err = drbd_send_drequest_csum(peer_device, sector, size,
385 digest, digest_size,
386 P_CSUM_RS_REQUEST);
387 kfree(digest);
388 } else {
389 drbd_err(device, "kmalloc() of digest failed.\n");
390 err = -ENOMEM;
393 out:
394 if (peer_req)
395 drbd_free_peer_req(device, peer_req);
397 if (unlikely(err))
398 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
399 return err;
402 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
404 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
406 struct drbd_device *device = peer_device->device;
407 struct drbd_peer_request *peer_req;
409 if (!get_ldev(device))
410 return -EIO;
412 /* GFP_TRY, because if there is no memory available right now, this may
413 * be rescheduled for later. It is "only" background resync, after all. */
414 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
415 size, size, GFP_TRY);
416 if (!peer_req)
417 goto defer;
419 peer_req->w.cb = w_e_send_csum;
420 spin_lock_irq(&device->resource->req_lock);
421 list_add_tail(&peer_req->w.list, &device->read_ee);
422 spin_unlock_irq(&device->resource->req_lock);
424 atomic_add(size >> 9, &device->rs_sect_ev);
425 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
426 DRBD_FAULT_RS_RD) == 0)
427 return 0;
429 /* If it failed because of ENOMEM, retry should help. If it failed
430 * because bio_add_page failed (probably broken lower level driver),
431 * retry may or may not help.
432 * If it does not, you may need to force disconnect. */
433 spin_lock_irq(&device->resource->req_lock);
434 list_del(&peer_req->w.list);
435 spin_unlock_irq(&device->resource->req_lock);
437 drbd_free_peer_req(device, peer_req);
438 defer:
439 put_ldev(device);
440 return -EAGAIN;
443 int w_resync_timer(struct drbd_work *w, int cancel)
445 struct drbd_device *device =
446 container_of(w, struct drbd_device, resync_work);
448 switch (device->state.conn) {
449 case C_VERIFY_S:
450 make_ov_request(device, cancel);
451 break;
452 case C_SYNC_TARGET:
453 make_resync_request(device, cancel);
454 break;
457 return 0;
460 void resync_timer_fn(struct timer_list *t)
462 struct drbd_device *device = from_timer(device, t, resync_timer);
464 drbd_queue_work_if_unqueued(
465 &first_peer_device(device)->connection->sender_work,
466 &device->resync_work);
469 static void fifo_set(struct fifo_buffer *fb, int value)
471 int i;
473 for (i = 0; i < fb->size; i++)
474 fb->values[i] = value;
477 static int fifo_push(struct fifo_buffer *fb, int value)
479 int ov;
481 ov = fb->values[fb->head_index];
482 fb->values[fb->head_index++] = value;
484 if (fb->head_index >= fb->size)
485 fb->head_index = 0;
487 return ov;
490 static void fifo_add_val(struct fifo_buffer *fb, int value)
492 int i;
494 for (i = 0; i < fb->size; i++)
495 fb->values[i] += value;
498 struct fifo_buffer *fifo_alloc(int fifo_size)
500 struct fifo_buffer *fb;
502 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
503 if (!fb)
504 return NULL;
506 fb->head_index = 0;
507 fb->size = fifo_size;
508 fb->total = 0;
510 return fb;
513 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
515 struct disk_conf *dc;
516 unsigned int want; /* The number of sectors we want in-flight */
517 int req_sect; /* Number of sectors to request in this turn */
518 int correction; /* Number of sectors more we need in-flight */
519 int cps; /* correction per invocation of drbd_rs_controller() */
520 int steps; /* Number of time steps to plan ahead */
521 int curr_corr;
522 int max_sect;
523 struct fifo_buffer *plan;
525 dc = rcu_dereference(device->ldev->disk_conf);
526 plan = rcu_dereference(device->rs_plan_s);
528 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
530 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
531 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
532 } else { /* normal path */
533 want = dc->c_fill_target ? dc->c_fill_target :
534 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
537 correction = want - device->rs_in_flight - plan->total;
539 /* Plan ahead */
540 cps = correction / steps;
541 fifo_add_val(plan, cps);
542 plan->total += cps * steps;
544 /* What we do in this step */
545 curr_corr = fifo_push(plan, 0);
546 plan->total -= curr_corr;
548 req_sect = sect_in + curr_corr;
549 if (req_sect < 0)
550 req_sect = 0;
552 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
553 if (req_sect > max_sect)
554 req_sect = max_sect;
557 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
558 sect_in, device->rs_in_flight, want, correction,
559 steps, cps, device->rs_planed, curr_corr, req_sect);
562 return req_sect;
565 static int drbd_rs_number_requests(struct drbd_device *device)
567 unsigned int sect_in; /* Number of sectors that came in since the last turn */
568 int number, mxb;
570 sect_in = atomic_xchg(&device->rs_sect_in, 0);
571 device->rs_in_flight -= sect_in;
573 rcu_read_lock();
574 mxb = drbd_get_max_buffers(device) / 2;
575 if (rcu_dereference(device->rs_plan_s)->size) {
576 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
577 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
578 } else {
579 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
580 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
582 rcu_read_unlock();
584 /* Don't have more than "max-buffers"/2 in-flight.
585 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
586 * potentially causing a distributed deadlock on congestion during
587 * online-verify or (checksum-based) resync, if max-buffers,
588 * socket buffer sizes and resync rate settings are mis-configured. */
590 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
591 * mxb (as used here, and in drbd_alloc_pages on the peer) is
592 * "number of pages" (typically also 4k),
593 * but "rs_in_flight" is in "sectors" (512 Byte). */
594 if (mxb - device->rs_in_flight/8 < number)
595 number = mxb - device->rs_in_flight/8;
597 return number;
600 static int make_resync_request(struct drbd_device *const device, int cancel)
602 struct drbd_peer_device *const peer_device = first_peer_device(device);
603 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
604 unsigned long bit;
605 sector_t sector;
606 const sector_t capacity = drbd_get_capacity(device->this_bdev);
607 int max_bio_size;
608 int number, rollback_i, size;
609 int align, requeue = 0;
610 int i = 0;
611 int discard_granularity = 0;
613 if (unlikely(cancel))
614 return 0;
616 if (device->rs_total == 0) {
617 /* empty resync? */
618 drbd_resync_finished(device);
619 return 0;
622 if (!get_ldev(device)) {
623 /* Since we only need to access device->rsync a
624 get_ldev_if_state(device,D_FAILED) would be sufficient, but
625 to continue resync with a broken disk makes no sense at
626 all */
627 drbd_err(device, "Disk broke down during resync!\n");
628 return 0;
631 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
632 rcu_read_lock();
633 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
634 rcu_read_unlock();
637 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
638 number = drbd_rs_number_requests(device);
639 if (number <= 0)
640 goto requeue;
642 for (i = 0; i < number; i++) {
643 /* Stop generating RS requests when half of the send buffer is filled,
644 * but notify TCP that we'd like to have more space. */
645 mutex_lock(&connection->data.mutex);
646 if (connection->data.socket) {
647 struct sock *sk = connection->data.socket->sk;
648 int queued = sk->sk_wmem_queued;
649 int sndbuf = sk->sk_sndbuf;
650 if (queued > sndbuf / 2) {
651 requeue = 1;
652 if (sk->sk_socket)
653 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
655 } else
656 requeue = 1;
657 mutex_unlock(&connection->data.mutex);
658 if (requeue)
659 goto requeue;
661 next_sector:
662 size = BM_BLOCK_SIZE;
663 bit = drbd_bm_find_next(device, device->bm_resync_fo);
665 if (bit == DRBD_END_OF_BITMAP) {
666 device->bm_resync_fo = drbd_bm_bits(device);
667 put_ldev(device);
668 return 0;
671 sector = BM_BIT_TO_SECT(bit);
673 if (drbd_try_rs_begin_io(device, sector)) {
674 device->bm_resync_fo = bit;
675 goto requeue;
677 device->bm_resync_fo = bit + 1;
679 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
680 drbd_rs_complete_io(device, sector);
681 goto next_sector;
684 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
685 /* try to find some adjacent bits.
686 * we stop if we have already the maximum req size.
688 * Additionally always align bigger requests, in order to
689 * be prepared for all stripe sizes of software RAIDs.
691 align = 1;
692 rollback_i = i;
693 while (i < number) {
694 if (size + BM_BLOCK_SIZE > max_bio_size)
695 break;
697 /* Be always aligned */
698 if (sector & ((1<<(align+3))-1))
699 break;
701 if (discard_granularity && size == discard_granularity)
702 break;
704 /* do not cross extent boundaries */
705 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
706 break;
707 /* now, is it actually dirty, after all?
708 * caution, drbd_bm_test_bit is tri-state for some
709 * obscure reason; ( b == 0 ) would get the out-of-band
710 * only accidentally right because of the "oddly sized"
711 * adjustment below */
712 if (drbd_bm_test_bit(device, bit+1) != 1)
713 break;
714 bit++;
715 size += BM_BLOCK_SIZE;
716 if ((BM_BLOCK_SIZE << align) <= size)
717 align++;
718 i++;
720 /* if we merged some,
721 * reset the offset to start the next drbd_bm_find_next from */
722 if (size > BM_BLOCK_SIZE)
723 device->bm_resync_fo = bit + 1;
724 #endif
726 /* adjust very last sectors, in case we are oddly sized */
727 if (sector + (size>>9) > capacity)
728 size = (capacity-sector)<<9;
730 if (device->use_csums) {
731 switch (read_for_csum(peer_device, sector, size)) {
732 case -EIO: /* Disk failure */
733 put_ldev(device);
734 return -EIO;
735 case -EAGAIN: /* allocation failed, or ldev busy */
736 drbd_rs_complete_io(device, sector);
737 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
738 i = rollback_i;
739 goto requeue;
740 case 0:
741 /* everything ok */
742 break;
743 default:
744 BUG();
746 } else {
747 int err;
749 inc_rs_pending(device);
750 err = drbd_send_drequest(peer_device,
751 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
752 sector, size, ID_SYNCER);
753 if (err) {
754 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
755 dec_rs_pending(device);
756 put_ldev(device);
757 return err;
762 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
763 /* last syncer _request_ was sent,
764 * but the P_RS_DATA_REPLY not yet received. sync will end (and
765 * next sync group will resume), as soon as we receive the last
766 * resync data block, and the last bit is cleared.
767 * until then resync "work" is "inactive" ...
769 put_ldev(device);
770 return 0;
773 requeue:
774 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
775 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
776 put_ldev(device);
777 return 0;
780 static int make_ov_request(struct drbd_device *device, int cancel)
782 int number, i, size;
783 sector_t sector;
784 const sector_t capacity = drbd_get_capacity(device->this_bdev);
785 bool stop_sector_reached = false;
787 if (unlikely(cancel))
788 return 1;
790 number = drbd_rs_number_requests(device);
792 sector = device->ov_position;
793 for (i = 0; i < number; i++) {
794 if (sector >= capacity)
795 return 1;
797 /* We check for "finished" only in the reply path:
798 * w_e_end_ov_reply().
799 * We need to send at least one request out. */
800 stop_sector_reached = i > 0
801 && verify_can_do_stop_sector(device)
802 && sector >= device->ov_stop_sector;
803 if (stop_sector_reached)
804 break;
806 size = BM_BLOCK_SIZE;
808 if (drbd_try_rs_begin_io(device, sector)) {
809 device->ov_position = sector;
810 goto requeue;
813 if (sector + (size>>9) > capacity)
814 size = (capacity-sector)<<9;
816 inc_rs_pending(device);
817 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
818 dec_rs_pending(device);
819 return 0;
821 sector += BM_SECT_PER_BIT;
823 device->ov_position = sector;
825 requeue:
826 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
827 if (i == 0 || !stop_sector_reached)
828 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
829 return 1;
832 int w_ov_finished(struct drbd_work *w, int cancel)
834 struct drbd_device_work *dw =
835 container_of(w, struct drbd_device_work, w);
836 struct drbd_device *device = dw->device;
837 kfree(dw);
838 ov_out_of_sync_print(device);
839 drbd_resync_finished(device);
841 return 0;
844 static int w_resync_finished(struct drbd_work *w, int cancel)
846 struct drbd_device_work *dw =
847 container_of(w, struct drbd_device_work, w);
848 struct drbd_device *device = dw->device;
849 kfree(dw);
851 drbd_resync_finished(device);
853 return 0;
856 static void ping_peer(struct drbd_device *device)
858 struct drbd_connection *connection = first_peer_device(device)->connection;
860 clear_bit(GOT_PING_ACK, &connection->flags);
861 request_ping(connection);
862 wait_event(connection->ping_wait,
863 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
866 int drbd_resync_finished(struct drbd_device *device)
868 struct drbd_connection *connection = first_peer_device(device)->connection;
869 unsigned long db, dt, dbdt;
870 unsigned long n_oos;
871 union drbd_state os, ns;
872 struct drbd_device_work *dw;
873 char *khelper_cmd = NULL;
874 int verify_done = 0;
876 /* Remove all elements from the resync LRU. Since future actions
877 * might set bits in the (main) bitmap, then the entries in the
878 * resync LRU would be wrong. */
879 if (drbd_rs_del_all(device)) {
880 /* In case this is not possible now, most probably because
881 * there are P_RS_DATA_REPLY Packets lingering on the worker's
882 * queue (or even the read operations for those packets
883 * is not finished by now). Retry in 100ms. */
885 schedule_timeout_interruptible(HZ / 10);
886 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
887 if (dw) {
888 dw->w.cb = w_resync_finished;
889 dw->device = device;
890 drbd_queue_work(&connection->sender_work, &dw->w);
891 return 1;
893 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
896 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
897 if (dt <= 0)
898 dt = 1;
900 db = device->rs_total;
901 /* adjust for verify start and stop sectors, respective reached position */
902 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
903 db -= device->ov_left;
905 dbdt = Bit2KB(db/dt);
906 device->rs_paused /= HZ;
908 if (!get_ldev(device))
909 goto out;
911 ping_peer(device);
913 spin_lock_irq(&device->resource->req_lock);
914 os = drbd_read_state(device);
916 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
918 /* This protects us against multiple calls (that can happen in the presence
919 of application IO), and against connectivity loss just before we arrive here. */
920 if (os.conn <= C_CONNECTED)
921 goto out_unlock;
923 ns = os;
924 ns.conn = C_CONNECTED;
926 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
927 verify_done ? "Online verify" : "Resync",
928 dt + device->rs_paused, device->rs_paused, dbdt);
930 n_oos = drbd_bm_total_weight(device);
932 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
933 if (n_oos) {
934 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
935 n_oos, Bit2KB(1));
936 khelper_cmd = "out-of-sync";
938 } else {
939 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
942 khelper_cmd = "after-resync-target";
944 if (device->use_csums && device->rs_total) {
945 const unsigned long s = device->rs_same_csum;
946 const unsigned long t = device->rs_total;
947 const int ratio =
948 (t == 0) ? 0 :
949 (t < 100000) ? ((s*100)/t) : (s/(t/100));
950 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
951 "transferred %luK total %luK\n",
952 ratio,
953 Bit2KB(device->rs_same_csum),
954 Bit2KB(device->rs_total - device->rs_same_csum),
955 Bit2KB(device->rs_total));
959 if (device->rs_failed) {
960 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
962 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963 ns.disk = D_INCONSISTENT;
964 ns.pdsk = D_UP_TO_DATE;
965 } else {
966 ns.disk = D_UP_TO_DATE;
967 ns.pdsk = D_INCONSISTENT;
969 } else {
970 ns.disk = D_UP_TO_DATE;
971 ns.pdsk = D_UP_TO_DATE;
973 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
974 if (device->p_uuid) {
975 int i;
976 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
977 _drbd_uuid_set(device, i, device->p_uuid[i]);
978 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
979 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
980 } else {
981 drbd_err(device, "device->p_uuid is NULL! BUG\n");
985 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
986 /* for verify runs, we don't update uuids here,
987 * so there would be nothing to report. */
988 drbd_uuid_set_bm(device, 0UL);
989 drbd_print_uuids(device, "updated UUIDs");
990 if (device->p_uuid) {
991 /* Now the two UUID sets are equal, update what we
992 * know of the peer. */
993 int i;
994 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
995 device->p_uuid[i] = device->ldev->md.uuid[i];
1000 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001 out_unlock:
1002 spin_unlock_irq(&device->resource->req_lock);
1004 /* If we have been sync source, and have an effective fencing-policy,
1005 * once *all* volumes are back in sync, call "unfence". */
1006 if (os.conn == C_SYNC_SOURCE) {
1007 enum drbd_disk_state disk_state = D_MASK;
1008 enum drbd_disk_state pdsk_state = D_MASK;
1009 enum drbd_fencing_p fp = FP_DONT_CARE;
1011 rcu_read_lock();
1012 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013 if (fp != FP_DONT_CARE) {
1014 struct drbd_peer_device *peer_device;
1015 int vnr;
1016 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017 struct drbd_device *device = peer_device->device;
1018 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1022 rcu_read_unlock();
1023 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024 conn_khelper(connection, "unfence-peer");
1027 put_ldev(device);
1028 out:
1029 device->rs_total = 0;
1030 device->rs_failed = 0;
1031 device->rs_paused = 0;
1033 /* reset start sector, if we reached end of device */
1034 if (verify_done && device->ov_left == 0)
1035 device->ov_start_sector = 0;
1037 drbd_md_sync(device);
1039 if (khelper_cmd)
1040 drbd_khelper(device, khelper_cmd);
1042 return 1;
1045 /* helper */
1046 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1048 if (drbd_peer_req_has_active_page(peer_req)) {
1049 /* This might happen if sendpage() has not finished */
1050 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051 atomic_add(i, &device->pp_in_use_by_net);
1052 atomic_sub(i, &device->pp_in_use);
1053 spin_lock_irq(&device->resource->req_lock);
1054 list_add_tail(&peer_req->w.list, &device->net_ee);
1055 spin_unlock_irq(&device->resource->req_lock);
1056 wake_up(&drbd_pp_wait);
1057 } else
1058 drbd_free_peer_req(device, peer_req);
1062 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1063 * @w: work object.
1064 * @cancel: The connection will be closed anyways
1066 int w_e_end_data_req(struct drbd_work *w, int cancel)
1068 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069 struct drbd_peer_device *peer_device = peer_req->peer_device;
1070 struct drbd_device *device = peer_device->device;
1071 int err;
1073 if (unlikely(cancel)) {
1074 drbd_free_peer_req(device, peer_req);
1075 dec_unacked(device);
1076 return 0;
1079 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081 } else {
1082 if (__ratelimit(&drbd_ratelimit_state))
1083 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084 (unsigned long long)peer_req->i.sector);
1086 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1089 dec_unacked(device);
1091 move_to_net_ee_or_free(device, peer_req);
1093 if (unlikely(err))
1094 drbd_err(device, "drbd_send_block() failed\n");
1095 return err;
1098 static bool all_zero(struct drbd_peer_request *peer_req)
1100 struct page *page = peer_req->pages;
1101 unsigned int len = peer_req->i.size;
1103 page_chain_for_each(page) {
1104 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105 unsigned int i, words = l / sizeof(long);
1106 unsigned long *d;
1108 d = kmap_atomic(page);
1109 for (i = 0; i < words; i++) {
1110 if (d[i]) {
1111 kunmap_atomic(d);
1112 return false;
1115 kunmap_atomic(d);
1116 len -= l;
1119 return true;
1123 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1124 * @w: work object.
1125 * @cancel: The connection will be closed anyways
1127 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1129 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130 struct drbd_peer_device *peer_device = peer_req->peer_device;
1131 struct drbd_device *device = peer_device->device;
1132 int err;
1134 if (unlikely(cancel)) {
1135 drbd_free_peer_req(device, peer_req);
1136 dec_unacked(device);
1137 return 0;
1140 if (get_ldev_if_state(device, D_FAILED)) {
1141 drbd_rs_complete_io(device, peer_req->i.sector);
1142 put_ldev(device);
1145 if (device->state.conn == C_AHEAD) {
1146 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149 inc_rs_pending(device);
1150 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151 err = drbd_send_rs_deallocated(peer_device, peer_req);
1152 else
1153 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154 } else {
1155 if (__ratelimit(&drbd_ratelimit_state))
1156 drbd_err(device, "Not sending RSDataReply, "
1157 "partner DISKLESS!\n");
1158 err = 0;
1160 } else {
1161 if (__ratelimit(&drbd_ratelimit_state))
1162 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163 (unsigned long long)peer_req->i.sector);
1165 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1167 /* update resync data with failure */
1168 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1171 dec_unacked(device);
1173 move_to_net_ee_or_free(device, peer_req);
1175 if (unlikely(err))
1176 drbd_err(device, "drbd_send_block() failed\n");
1177 return err;
1180 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1182 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183 struct drbd_peer_device *peer_device = peer_req->peer_device;
1184 struct drbd_device *device = peer_device->device;
1185 struct digest_info *di;
1186 int digest_size;
1187 void *digest = NULL;
1188 int err, eq = 0;
1190 if (unlikely(cancel)) {
1191 drbd_free_peer_req(device, peer_req);
1192 dec_unacked(device);
1193 return 0;
1196 if (get_ldev(device)) {
1197 drbd_rs_complete_io(device, peer_req->i.sector);
1198 put_ldev(device);
1201 di = peer_req->digest;
1203 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204 /* quick hack to try to avoid a race against reconfiguration.
1205 * a real fix would be much more involved,
1206 * introducing more locking mechanisms */
1207 if (peer_device->connection->csums_tfm) {
1208 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209 D_ASSERT(device, digest_size == di->digest_size);
1210 digest = kmalloc(digest_size, GFP_NOIO);
1212 if (digest) {
1213 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214 eq = !memcmp(digest, di->digest, digest_size);
1215 kfree(digest);
1218 if (eq) {
1219 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220 /* rs_same_csums unit is BM_BLOCK_SIZE */
1221 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223 } else {
1224 inc_rs_pending(device);
1225 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227 kfree(di);
1228 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1230 } else {
1231 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232 if (__ratelimit(&drbd_ratelimit_state))
1233 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1236 dec_unacked(device);
1237 move_to_net_ee_or_free(device, peer_req);
1239 if (unlikely(err))
1240 drbd_err(device, "drbd_send_block/ack() failed\n");
1241 return err;
1244 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1246 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247 struct drbd_peer_device *peer_device = peer_req->peer_device;
1248 struct drbd_device *device = peer_device->device;
1249 sector_t sector = peer_req->i.sector;
1250 unsigned int size = peer_req->i.size;
1251 int digest_size;
1252 void *digest;
1253 int err = 0;
1255 if (unlikely(cancel))
1256 goto out;
1258 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259 digest = kmalloc(digest_size, GFP_NOIO);
1260 if (!digest) {
1261 err = 1; /* terminate the connection in case the allocation failed */
1262 goto out;
1265 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267 else
1268 memset(digest, 0, digest_size);
1270 /* Free e and pages before send.
1271 * In case we block on congestion, we could otherwise run into
1272 * some distributed deadlock, if the other side blocks on
1273 * congestion as well, because our receiver blocks in
1274 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275 drbd_free_peer_req(device, peer_req);
1276 peer_req = NULL;
1277 inc_rs_pending(device);
1278 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279 if (err)
1280 dec_rs_pending(device);
1281 kfree(digest);
1283 out:
1284 if (peer_req)
1285 drbd_free_peer_req(device, peer_req);
1286 dec_unacked(device);
1287 return err;
1290 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1292 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293 device->ov_last_oos_size += size>>9;
1294 } else {
1295 device->ov_last_oos_start = sector;
1296 device->ov_last_oos_size = size>>9;
1298 drbd_set_out_of_sync(device, sector, size);
1301 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1303 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304 struct drbd_peer_device *peer_device = peer_req->peer_device;
1305 struct drbd_device *device = peer_device->device;
1306 struct digest_info *di;
1307 void *digest;
1308 sector_t sector = peer_req->i.sector;
1309 unsigned int size = peer_req->i.size;
1310 int digest_size;
1311 int err, eq = 0;
1312 bool stop_sector_reached = false;
1314 if (unlikely(cancel)) {
1315 drbd_free_peer_req(device, peer_req);
1316 dec_unacked(device);
1317 return 0;
1320 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321 * the resync lru has been cleaned up already */
1322 if (get_ldev(device)) {
1323 drbd_rs_complete_io(device, peer_req->i.sector);
1324 put_ldev(device);
1327 di = peer_req->digest;
1329 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331 digest = kmalloc(digest_size, GFP_NOIO);
1332 if (digest) {
1333 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1335 D_ASSERT(device, digest_size == di->digest_size);
1336 eq = !memcmp(digest, di->digest, digest_size);
1337 kfree(digest);
1341 /* Free peer_req and pages before send.
1342 * In case we block on congestion, we could otherwise run into
1343 * some distributed deadlock, if the other side blocks on
1344 * congestion as well, because our receiver blocks in
1345 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346 drbd_free_peer_req(device, peer_req);
1347 if (!eq)
1348 drbd_ov_out_of_sync_found(device, sector, size);
1349 else
1350 ov_out_of_sync_print(device);
1352 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1355 dec_unacked(device);
1357 --device->ov_left;
1359 /* let's advance progress step marks only for every other megabyte */
1360 if ((device->ov_left & 0x200) == 0x200)
1361 drbd_advance_rs_marks(device, device->ov_left);
1363 stop_sector_reached = verify_can_do_stop_sector(device) &&
1364 (sector + (size>>9)) >= device->ov_stop_sector;
1366 if (device->ov_left == 0 || stop_sector_reached) {
1367 ov_out_of_sync_print(device);
1368 drbd_resync_finished(device);
1371 return err;
1374 /* FIXME
1375 * We need to track the number of pending barrier acks,
1376 * and to be able to wait for them.
1377 * See also comment in drbd_adm_attach before drbd_suspend_io.
1379 static int drbd_send_barrier(struct drbd_connection *connection)
1381 struct p_barrier *p;
1382 struct drbd_socket *sock;
1384 sock = &connection->data;
1385 p = conn_prepare_command(connection, sock);
1386 if (!p)
1387 return -EIO;
1388 p->barrier = connection->send.current_epoch_nr;
1389 p->pad = 0;
1390 connection->send.current_epoch_writes = 0;
1391 connection->send.last_sent_barrier_jif = jiffies;
1393 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1396 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1398 struct drbd_socket *sock = &pd->connection->data;
1399 if (!drbd_prepare_command(pd, sock))
1400 return -EIO;
1401 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1404 int w_send_write_hint(struct drbd_work *w, int cancel)
1406 struct drbd_device *device =
1407 container_of(w, struct drbd_device, unplug_work);
1409 if (cancel)
1410 return 0;
1411 return pd_send_unplug_remote(first_peer_device(device));
1414 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1416 if (!connection->send.seen_any_write_yet) {
1417 connection->send.seen_any_write_yet = true;
1418 connection->send.current_epoch_nr = epoch;
1419 connection->send.current_epoch_writes = 0;
1420 connection->send.last_sent_barrier_jif = jiffies;
1424 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1426 /* re-init if first write on this connection */
1427 if (!connection->send.seen_any_write_yet)
1428 return;
1429 if (connection->send.current_epoch_nr != epoch) {
1430 if (connection->send.current_epoch_writes)
1431 drbd_send_barrier(connection);
1432 connection->send.current_epoch_nr = epoch;
1436 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1438 struct drbd_request *req = container_of(w, struct drbd_request, w);
1439 struct drbd_device *device = req->device;
1440 struct drbd_peer_device *const peer_device = first_peer_device(device);
1441 struct drbd_connection *const connection = peer_device->connection;
1442 int err;
1444 if (unlikely(cancel)) {
1445 req_mod(req, SEND_CANCELED);
1446 return 0;
1448 req->pre_send_jif = jiffies;
1450 /* this time, no connection->send.current_epoch_writes++;
1451 * If it was sent, it was the closing barrier for the last
1452 * replicated epoch, before we went into AHEAD mode.
1453 * No more barriers will be sent, until we leave AHEAD mode again. */
1454 maybe_send_barrier(connection, req->epoch);
1456 err = drbd_send_out_of_sync(peer_device, req);
1457 req_mod(req, OOS_HANDED_TO_NETWORK);
1459 return err;
1463 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1464 * @w: work object.
1465 * @cancel: The connection will be closed anyways
1467 int w_send_dblock(struct drbd_work *w, int cancel)
1469 struct drbd_request *req = container_of(w, struct drbd_request, w);
1470 struct drbd_device *device = req->device;
1471 struct drbd_peer_device *const peer_device = first_peer_device(device);
1472 struct drbd_connection *connection = peer_device->connection;
1473 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474 int err;
1476 if (unlikely(cancel)) {
1477 req_mod(req, SEND_CANCELED);
1478 return 0;
1480 req->pre_send_jif = jiffies;
1482 re_init_if_first_write(connection, req->epoch);
1483 maybe_send_barrier(connection, req->epoch);
1484 connection->send.current_epoch_writes++;
1486 err = drbd_send_dblock(peer_device, req);
1487 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1489 if (do_send_unplug && !err)
1490 pd_send_unplug_remote(peer_device);
1492 return err;
1496 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1497 * @w: work object.
1498 * @cancel: The connection will be closed anyways
1500 int w_send_read_req(struct drbd_work *w, int cancel)
1502 struct drbd_request *req = container_of(w, struct drbd_request, w);
1503 struct drbd_device *device = req->device;
1504 struct drbd_peer_device *const peer_device = first_peer_device(device);
1505 struct drbd_connection *connection = peer_device->connection;
1506 bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507 int err;
1509 if (unlikely(cancel)) {
1510 req_mod(req, SEND_CANCELED);
1511 return 0;
1513 req->pre_send_jif = jiffies;
1515 /* Even read requests may close a write epoch,
1516 * if there was any yet. */
1517 maybe_send_barrier(connection, req->epoch);
1519 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520 (unsigned long)req);
1522 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1524 if (do_send_unplug && !err)
1525 pd_send_unplug_remote(peer_device);
1527 return err;
1530 int w_restart_disk_io(struct drbd_work *w, int cancel)
1532 struct drbd_request *req = container_of(w, struct drbd_request, w);
1533 struct drbd_device *device = req->device;
1535 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536 drbd_al_begin_io(device, &req->i);
1538 drbd_req_make_private_bio(req, req->master_bio);
1539 bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540 generic_make_request(req->private_bio);
1542 return 0;
1545 static int _drbd_may_sync_now(struct drbd_device *device)
1547 struct drbd_device *odev = device;
1548 int resync_after;
1550 while (1) {
1551 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552 return 1;
1553 rcu_read_lock();
1554 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555 rcu_read_unlock();
1556 if (resync_after == -1)
1557 return 1;
1558 odev = minor_to_device(resync_after);
1559 if (!odev)
1560 return 1;
1561 if ((odev->state.conn >= C_SYNC_SOURCE &&
1562 odev->state.conn <= C_PAUSED_SYNC_T) ||
1563 odev->state.aftr_isp || odev->state.peer_isp ||
1564 odev->state.user_isp)
1565 return 0;
1570 * drbd_pause_after() - Pause resync on all devices that may not resync now
1571 * @device: DRBD device.
1573 * Called from process context only (admin command and after_state_ch).
1575 static bool drbd_pause_after(struct drbd_device *device)
1577 bool changed = false;
1578 struct drbd_device *odev;
1579 int i;
1581 rcu_read_lock();
1582 idr_for_each_entry(&drbd_devices, odev, i) {
1583 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584 continue;
1585 if (!_drbd_may_sync_now(odev) &&
1586 _drbd_set_state(_NS(odev, aftr_isp, 1),
1587 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588 changed = true;
1590 rcu_read_unlock();
1592 return changed;
1596 * drbd_resume_next() - Resume resync on all devices that may resync now
1597 * @device: DRBD device.
1599 * Called from process context only (admin command and worker).
1601 static bool drbd_resume_next(struct drbd_device *device)
1603 bool changed = false;
1604 struct drbd_device *odev;
1605 int i;
1607 rcu_read_lock();
1608 idr_for_each_entry(&drbd_devices, odev, i) {
1609 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610 continue;
1611 if (odev->state.aftr_isp) {
1612 if (_drbd_may_sync_now(odev) &&
1613 _drbd_set_state(_NS(odev, aftr_isp, 0),
1614 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615 changed = true;
1618 rcu_read_unlock();
1619 return changed;
1622 void resume_next_sg(struct drbd_device *device)
1624 lock_all_resources();
1625 drbd_resume_next(device);
1626 unlock_all_resources();
1629 void suspend_other_sg(struct drbd_device *device)
1631 lock_all_resources();
1632 drbd_pause_after(device);
1633 unlock_all_resources();
1636 /* caller must lock_all_resources() */
1637 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1639 struct drbd_device *odev;
1640 int resync_after;
1642 if (o_minor == -1)
1643 return NO_ERROR;
1644 if (o_minor < -1 || o_minor > MINORMASK)
1645 return ERR_RESYNC_AFTER;
1647 /* check for loops */
1648 odev = minor_to_device(o_minor);
1649 while (1) {
1650 if (odev == device)
1651 return ERR_RESYNC_AFTER_CYCLE;
1653 /* You are free to depend on diskless, non-existing,
1654 * or not yet/no longer existing minors.
1655 * We only reject dependency loops.
1656 * We cannot follow the dependency chain beyond a detached or
1657 * missing minor.
1659 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660 return NO_ERROR;
1662 rcu_read_lock();
1663 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664 rcu_read_unlock();
1665 /* dependency chain ends here, no cycles. */
1666 if (resync_after == -1)
1667 return NO_ERROR;
1669 /* follow the dependency chain */
1670 odev = minor_to_device(resync_after);
1674 /* caller must lock_all_resources() */
1675 void drbd_resync_after_changed(struct drbd_device *device)
1677 int changed;
1679 do {
1680 changed = drbd_pause_after(device);
1681 changed |= drbd_resume_next(device);
1682 } while (changed);
1685 void drbd_rs_controller_reset(struct drbd_device *device)
1687 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688 struct fifo_buffer *plan;
1690 atomic_set(&device->rs_sect_in, 0);
1691 atomic_set(&device->rs_sect_ev, 0);
1692 device->rs_in_flight = 0;
1693 device->rs_last_events =
1694 (int)part_stat_read(&disk->part0, sectors[0]) +
1695 (int)part_stat_read(&disk->part0, sectors[1]);
1697 /* Updating the RCU protected object in place is necessary since
1698 this function gets called from atomic context.
1699 It is valid since all other updates also lead to an completely
1700 empty fifo */
1701 rcu_read_lock();
1702 plan = rcu_dereference(device->rs_plan_s);
1703 plan->total = 0;
1704 fifo_set(plan, 0);
1705 rcu_read_unlock();
1708 void start_resync_timer_fn(struct timer_list *t)
1710 struct drbd_device *device = from_timer(device, t, start_resync_timer);
1711 drbd_device_post_work(device, RS_START);
1714 static void do_start_resync(struct drbd_device *device)
1716 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1717 drbd_warn(device, "postponing start_resync ...\n");
1718 device->start_resync_timer.expires = jiffies + HZ/10;
1719 add_timer(&device->start_resync_timer);
1720 return;
1723 drbd_start_resync(device, C_SYNC_SOURCE);
1724 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1727 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1729 bool csums_after_crash_only;
1730 rcu_read_lock();
1731 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1732 rcu_read_unlock();
1733 return connection->agreed_pro_version >= 89 && /* supported? */
1734 connection->csums_tfm && /* configured? */
1735 (csums_after_crash_only == false /* use for each resync? */
1736 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1740 * drbd_start_resync() - Start the resync process
1741 * @device: DRBD device.
1742 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1744 * This function might bring you directly into one of the
1745 * C_PAUSED_SYNC_* states.
1747 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1749 struct drbd_peer_device *peer_device = first_peer_device(device);
1750 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1751 union drbd_state ns;
1752 int r;
1754 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1755 drbd_err(device, "Resync already running!\n");
1756 return;
1759 if (!connection) {
1760 drbd_err(device, "No connection to peer, aborting!\n");
1761 return;
1764 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1765 if (side == C_SYNC_TARGET) {
1766 /* Since application IO was locked out during C_WF_BITMAP_T and
1767 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1768 we check that we might make the data inconsistent. */
1769 r = drbd_khelper(device, "before-resync-target");
1770 r = (r >> 8) & 0xff;
1771 if (r > 0) {
1772 drbd_info(device, "before-resync-target handler returned %d, "
1773 "dropping connection.\n", r);
1774 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1775 return;
1777 } else /* C_SYNC_SOURCE */ {
1778 r = drbd_khelper(device, "before-resync-source");
1779 r = (r >> 8) & 0xff;
1780 if (r > 0) {
1781 if (r == 3) {
1782 drbd_info(device, "before-resync-source handler returned %d, "
1783 "ignoring. Old userland tools?", r);
1784 } else {
1785 drbd_info(device, "before-resync-source handler returned %d, "
1786 "dropping connection.\n", r);
1787 conn_request_state(connection,
1788 NS(conn, C_DISCONNECTING), CS_HARD);
1789 return;
1795 if (current == connection->worker.task) {
1796 /* The worker should not sleep waiting for state_mutex,
1797 that can take long */
1798 if (!mutex_trylock(device->state_mutex)) {
1799 set_bit(B_RS_H_DONE, &device->flags);
1800 device->start_resync_timer.expires = jiffies + HZ/5;
1801 add_timer(&device->start_resync_timer);
1802 return;
1804 } else {
1805 mutex_lock(device->state_mutex);
1808 lock_all_resources();
1809 clear_bit(B_RS_H_DONE, &device->flags);
1810 /* Did some connection breakage or IO error race with us? */
1811 if (device->state.conn < C_CONNECTED
1812 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1813 unlock_all_resources();
1814 goto out;
1817 ns = drbd_read_state(device);
1819 ns.aftr_isp = !_drbd_may_sync_now(device);
1821 ns.conn = side;
1823 if (side == C_SYNC_TARGET)
1824 ns.disk = D_INCONSISTENT;
1825 else /* side == C_SYNC_SOURCE */
1826 ns.pdsk = D_INCONSISTENT;
1828 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1829 ns = drbd_read_state(device);
1831 if (ns.conn < C_CONNECTED)
1832 r = SS_UNKNOWN_ERROR;
1834 if (r == SS_SUCCESS) {
1835 unsigned long tw = drbd_bm_total_weight(device);
1836 unsigned long now = jiffies;
1837 int i;
1839 device->rs_failed = 0;
1840 device->rs_paused = 0;
1841 device->rs_same_csum = 0;
1842 device->rs_last_sect_ev = 0;
1843 device->rs_total = tw;
1844 device->rs_start = now;
1845 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1846 device->rs_mark_left[i] = tw;
1847 device->rs_mark_time[i] = now;
1849 drbd_pause_after(device);
1850 /* Forget potentially stale cached per resync extent bit-counts.
1851 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1852 * disabled, and know the disk state is ok. */
1853 spin_lock(&device->al_lock);
1854 lc_reset(device->resync);
1855 device->resync_locked = 0;
1856 device->resync_wenr = LC_FREE;
1857 spin_unlock(&device->al_lock);
1859 unlock_all_resources();
1861 if (r == SS_SUCCESS) {
1862 wake_up(&device->al_wait); /* for lc_reset() above */
1863 /* reset rs_last_bcast when a resync or verify is started,
1864 * to deal with potential jiffies wrap. */
1865 device->rs_last_bcast = jiffies - HZ;
1867 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1868 drbd_conn_str(ns.conn),
1869 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1870 (unsigned long) device->rs_total);
1871 if (side == C_SYNC_TARGET) {
1872 device->bm_resync_fo = 0;
1873 device->use_csums = use_checksum_based_resync(connection, device);
1874 } else {
1875 device->use_csums = false;
1878 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1879 * with w_send_oos, or the sync target will get confused as to
1880 * how much bits to resync. We cannot do that always, because for an
1881 * empty resync and protocol < 95, we need to do it here, as we call
1882 * drbd_resync_finished from here in that case.
1883 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1884 * and from after_state_ch otherwise. */
1885 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1886 drbd_gen_and_send_sync_uuid(peer_device);
1888 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1889 /* This still has a race (about when exactly the peers
1890 * detect connection loss) that can lead to a full sync
1891 * on next handshake. In 8.3.9 we fixed this with explicit
1892 * resync-finished notifications, but the fix
1893 * introduces a protocol change. Sleeping for some
1894 * time longer than the ping interval + timeout on the
1895 * SyncSource, to give the SyncTarget the chance to
1896 * detect connection loss, then waiting for a ping
1897 * response (implicit in drbd_resync_finished) reduces
1898 * the race considerably, but does not solve it. */
1899 if (side == C_SYNC_SOURCE) {
1900 struct net_conf *nc;
1901 int timeo;
1903 rcu_read_lock();
1904 nc = rcu_dereference(connection->net_conf);
1905 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1906 rcu_read_unlock();
1907 schedule_timeout_interruptible(timeo);
1909 drbd_resync_finished(device);
1912 drbd_rs_controller_reset(device);
1913 /* ns.conn may already be != device->state.conn,
1914 * we may have been paused in between, or become paused until
1915 * the timer triggers.
1916 * No matter, that is handled in resync_timer_fn() */
1917 if (ns.conn == C_SYNC_TARGET)
1918 mod_timer(&device->resync_timer, jiffies);
1920 drbd_md_sync(device);
1922 put_ldev(device);
1923 out:
1924 mutex_unlock(device->state_mutex);
1927 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1929 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1930 device->rs_last_bcast = jiffies;
1932 if (!get_ldev(device))
1933 return;
1935 drbd_bm_write_lazy(device, 0);
1936 if (resync_done && is_sync_state(device->state.conn))
1937 drbd_resync_finished(device);
1939 drbd_bcast_event(device, &sib);
1940 /* update timestamp, in case it took a while to write out stuff */
1941 device->rs_last_bcast = jiffies;
1942 put_ldev(device);
1945 static void drbd_ldev_destroy(struct drbd_device *device)
1947 lc_destroy(device->resync);
1948 device->resync = NULL;
1949 lc_destroy(device->act_log);
1950 device->act_log = NULL;
1952 __acquire(local);
1953 drbd_backing_dev_free(device, device->ldev);
1954 device->ldev = NULL;
1955 __release(local);
1957 clear_bit(GOING_DISKLESS, &device->flags);
1958 wake_up(&device->misc_wait);
1961 static void go_diskless(struct drbd_device *device)
1963 D_ASSERT(device, device->state.disk == D_FAILED);
1964 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1965 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1966 * the protected members anymore, though, so once put_ldev reaches zero
1967 * again, it will be safe to free them. */
1969 /* Try to write changed bitmap pages, read errors may have just
1970 * set some bits outside the area covered by the activity log.
1972 * If we have an IO error during the bitmap writeout,
1973 * we will want a full sync next time, just in case.
1974 * (Do we want a specific meta data flag for this?)
1976 * If that does not make it to stable storage either,
1977 * we cannot do anything about that anymore.
1979 * We still need to check if both bitmap and ldev are present, we may
1980 * end up here after a failed attach, before ldev was even assigned.
1982 if (device->bitmap && device->ldev) {
1983 /* An interrupted resync or similar is allowed to recounts bits
1984 * while we detach.
1985 * Any modifications would not be expected anymore, though.
1987 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1988 "detach", BM_LOCKED_TEST_ALLOWED)) {
1989 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1990 drbd_md_set_flag(device, MDF_FULL_SYNC);
1991 drbd_md_sync(device);
1996 drbd_force_state(device, NS(disk, D_DISKLESS));
1999 static int do_md_sync(struct drbd_device *device)
2001 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2002 drbd_md_sync(device);
2003 return 0;
2006 /* only called from drbd_worker thread, no locking */
2007 void __update_timing_details(
2008 struct drbd_thread_timing_details *tdp,
2009 unsigned int *cb_nr,
2010 void *cb,
2011 const char *fn, const unsigned int line)
2013 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2014 struct drbd_thread_timing_details *td = tdp + i;
2016 td->start_jif = jiffies;
2017 td->cb_addr = cb;
2018 td->caller_fn = fn;
2019 td->line = line;
2020 td->cb_nr = *cb_nr;
2022 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2023 td = tdp + i;
2024 memset(td, 0, sizeof(*td));
2026 ++(*cb_nr);
2029 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2031 if (test_bit(MD_SYNC, &todo))
2032 do_md_sync(device);
2033 if (test_bit(RS_DONE, &todo) ||
2034 test_bit(RS_PROGRESS, &todo))
2035 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2036 if (test_bit(GO_DISKLESS, &todo))
2037 go_diskless(device);
2038 if (test_bit(DESTROY_DISK, &todo))
2039 drbd_ldev_destroy(device);
2040 if (test_bit(RS_START, &todo))
2041 do_start_resync(device);
2044 #define DRBD_DEVICE_WORK_MASK \
2045 ((1UL << GO_DISKLESS) \
2046 |(1UL << DESTROY_DISK) \
2047 |(1UL << MD_SYNC) \
2048 |(1UL << RS_START) \
2049 |(1UL << RS_PROGRESS) \
2050 |(1UL << RS_DONE) \
2053 static unsigned long get_work_bits(unsigned long *flags)
2055 unsigned long old, new;
2056 do {
2057 old = *flags;
2058 new = old & ~DRBD_DEVICE_WORK_MASK;
2059 } while (cmpxchg(flags, old, new) != old);
2060 return old & DRBD_DEVICE_WORK_MASK;
2063 static void do_unqueued_work(struct drbd_connection *connection)
2065 struct drbd_peer_device *peer_device;
2066 int vnr;
2068 rcu_read_lock();
2069 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2070 struct drbd_device *device = peer_device->device;
2071 unsigned long todo = get_work_bits(&device->flags);
2072 if (!todo)
2073 continue;
2075 kref_get(&device->kref);
2076 rcu_read_unlock();
2077 do_device_work(device, todo);
2078 kref_put(&device->kref, drbd_destroy_device);
2079 rcu_read_lock();
2081 rcu_read_unlock();
2084 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2086 spin_lock_irq(&queue->q_lock);
2087 list_splice_tail_init(&queue->q, work_list);
2088 spin_unlock_irq(&queue->q_lock);
2089 return !list_empty(work_list);
2092 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2094 DEFINE_WAIT(wait);
2095 struct net_conf *nc;
2096 int uncork, cork;
2098 dequeue_work_batch(&connection->sender_work, work_list);
2099 if (!list_empty(work_list))
2100 return;
2102 /* Still nothing to do?
2103 * Maybe we still need to close the current epoch,
2104 * even if no new requests are queued yet.
2106 * Also, poke TCP, just in case.
2107 * Then wait for new work (or signal). */
2108 rcu_read_lock();
2109 nc = rcu_dereference(connection->net_conf);
2110 uncork = nc ? nc->tcp_cork : 0;
2111 rcu_read_unlock();
2112 if (uncork) {
2113 mutex_lock(&connection->data.mutex);
2114 if (connection->data.socket)
2115 drbd_tcp_uncork(connection->data.socket);
2116 mutex_unlock(&connection->data.mutex);
2119 for (;;) {
2120 int send_barrier;
2121 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2122 spin_lock_irq(&connection->resource->req_lock);
2123 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2124 if (!list_empty(&connection->sender_work.q))
2125 list_splice_tail_init(&connection->sender_work.q, work_list);
2126 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2127 if (!list_empty(work_list) || signal_pending(current)) {
2128 spin_unlock_irq(&connection->resource->req_lock);
2129 break;
2132 /* We found nothing new to do, no to-be-communicated request,
2133 * no other work item. We may still need to close the last
2134 * epoch. Next incoming request epoch will be connection ->
2135 * current transfer log epoch number. If that is different
2136 * from the epoch of the last request we communicated, it is
2137 * safe to send the epoch separating barrier now.
2139 send_barrier =
2140 atomic_read(&connection->current_tle_nr) !=
2141 connection->send.current_epoch_nr;
2142 spin_unlock_irq(&connection->resource->req_lock);
2144 if (send_barrier)
2145 maybe_send_barrier(connection,
2146 connection->send.current_epoch_nr + 1);
2148 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2149 break;
2151 /* drbd_send() may have called flush_signals() */
2152 if (get_t_state(&connection->worker) != RUNNING)
2153 break;
2155 schedule();
2156 /* may be woken up for other things but new work, too,
2157 * e.g. if the current epoch got closed.
2158 * In which case we send the barrier above. */
2160 finish_wait(&connection->sender_work.q_wait, &wait);
2162 /* someone may have changed the config while we have been waiting above. */
2163 rcu_read_lock();
2164 nc = rcu_dereference(connection->net_conf);
2165 cork = nc ? nc->tcp_cork : 0;
2166 rcu_read_unlock();
2167 mutex_lock(&connection->data.mutex);
2168 if (connection->data.socket) {
2169 if (cork)
2170 drbd_tcp_cork(connection->data.socket);
2171 else if (!uncork)
2172 drbd_tcp_uncork(connection->data.socket);
2174 mutex_unlock(&connection->data.mutex);
2177 int drbd_worker(struct drbd_thread *thi)
2179 struct drbd_connection *connection = thi->connection;
2180 struct drbd_work *w = NULL;
2181 struct drbd_peer_device *peer_device;
2182 LIST_HEAD(work_list);
2183 int vnr;
2185 while (get_t_state(thi) == RUNNING) {
2186 drbd_thread_current_set_cpu(thi);
2188 if (list_empty(&work_list)) {
2189 update_worker_timing_details(connection, wait_for_work);
2190 wait_for_work(connection, &work_list);
2193 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2194 update_worker_timing_details(connection, do_unqueued_work);
2195 do_unqueued_work(connection);
2198 if (signal_pending(current)) {
2199 flush_signals(current);
2200 if (get_t_state(thi) == RUNNING) {
2201 drbd_warn(connection, "Worker got an unexpected signal\n");
2202 continue;
2204 break;
2207 if (get_t_state(thi) != RUNNING)
2208 break;
2210 if (!list_empty(&work_list)) {
2211 w = list_first_entry(&work_list, struct drbd_work, list);
2212 list_del_init(&w->list);
2213 update_worker_timing_details(connection, w->cb);
2214 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2215 continue;
2216 if (connection->cstate >= C_WF_REPORT_PARAMS)
2217 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2221 do {
2222 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2223 update_worker_timing_details(connection, do_unqueued_work);
2224 do_unqueued_work(connection);
2226 if (!list_empty(&work_list)) {
2227 w = list_first_entry(&work_list, struct drbd_work, list);
2228 list_del_init(&w->list);
2229 update_worker_timing_details(connection, w->cb);
2230 w->cb(w, 1);
2231 } else
2232 dequeue_work_batch(&connection->sender_work, &work_list);
2233 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2235 rcu_read_lock();
2236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2237 struct drbd_device *device = peer_device->device;
2238 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2239 kref_get(&device->kref);
2240 rcu_read_unlock();
2241 drbd_device_cleanup(device);
2242 kref_put(&device->kref, drbd_destroy_device);
2243 rcu_read_lock();
2245 rcu_read_unlock();
2247 return 0;