drbd: DIV_ROUND_UP not needed here
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / drivers / block / drbd / drbd_worker.c
blob83ba63ab23580fd24488612f482b979b048f8afa
1 /*
2 drbd_worker.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
39 #include "drbd_int.h"
40 #include "drbd_req.h"
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
46 /* defined here:
47 drbd_md_io_complete
48 drbd_endio_sec
49 drbd_endio_pri
51 * more endio handlers:
52 atodb_endio in drbd_actlog.c
53 drbd_bm_async_io_complete in drbd_bitmap.c
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
63 /* About the global_state_lock
64 Each state transition on an device holds a read lock. In case we have
65 to evaluate the sync after dependencies, we grab a write lock, because
66 we need stable states on all devices for that. */
67 rwlock_t global_state_lock;
69 /* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
72 void drbd_md_io_complete(struct bio *bio, int error)
74 struct drbd_md_io *md_io;
76 md_io = (struct drbd_md_io *)bio->bi_private;
77 md_io->error = error;
79 complete(&md_io->event);
82 /* reads on behalf of the partner,
83 * "submitted" by the receiver
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 unsigned long flags = 0;
88 struct drbd_conf *mdev = e->mdev;
90 D_ASSERT(e->block_id != ID_VACANT);
92 spin_lock_irqsave(&mdev->req_lock, flags);
93 mdev->read_cnt += e->size >> 9;
94 list_del(&e->w.list);
95 if (list_empty(&mdev->read_ee))
96 wake_up(&mdev->ee_wait);
97 if (test_bit(__EE_WAS_ERROR, &e->flags))
98 __drbd_chk_io_error(mdev, FALSE);
99 spin_unlock_irqrestore(&mdev->req_lock, flags);
101 drbd_queue_work(&mdev->data.work, &e->w);
102 put_ldev(mdev);
105 static int is_failed_barrier(int ee_flags)
107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 == (EE_IS_BARRIER|EE_WAS_ERROR);
111 /* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage. */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
115 unsigned long flags = 0;
116 struct drbd_conf *mdev = e->mdev;
117 sector_t e_sector;
118 int do_wake;
119 int is_syncer_req;
120 int do_al_complete_io;
122 /* if this is a failed barrier request, disable use of barriers,
123 * and schedule for resubmission */
124 if (is_failed_barrier(e->flags)) {
125 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 spin_lock_irqsave(&mdev->req_lock, flags);
127 list_del(&e->w.list);
128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 e->w.cb = w_e_reissue;
130 /* put_ldev actually happens below, once we come here again. */
131 __release(local);
132 spin_unlock_irqrestore(&mdev->req_lock, flags);
133 drbd_queue_work(&mdev->data.work, &e->w);
134 return;
137 D_ASSERT(e->block_id != ID_VACANT);
139 /* after we moved e to done_ee,
140 * we may no longer access it,
141 * it may be freed/reused already!
142 * (as soon as we release the req_lock) */
143 e_sector = e->sector;
144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 is_syncer_req = is_syncer_block_id(e->block_id);
147 spin_lock_irqsave(&mdev->req_lock, flags);
148 mdev->writ_cnt += e->size >> 9;
149 list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 list_add_tail(&e->w.list, &mdev->done_ee);
152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 * neither did we wake possibly waiting conflicting requests.
154 * done from "drbd_process_done_ee" within the appropriate w.cb
155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
157 do_wake = is_syncer_req
158 ? list_empty(&mdev->sync_ee)
159 : list_empty(&mdev->active_ee);
161 if (test_bit(__EE_WAS_ERROR, &e->flags))
162 __drbd_chk_io_error(mdev, FALSE);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
165 if (is_syncer_req)
166 drbd_rs_complete_io(mdev, e_sector);
168 if (do_wake)
169 wake_up(&mdev->ee_wait);
171 if (do_al_complete_io)
172 drbd_al_complete_io(mdev, e_sector);
174 wake_asender(mdev);
175 put_ldev(mdev);
178 /* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
181 void drbd_endio_sec(struct bio *bio, int error)
183 struct drbd_epoch_entry *e = bio->bi_private;
184 struct drbd_conf *mdev = e->mdev;
185 int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 int is_write = bio_data_dir(bio) == WRITE;
188 if (error)
189 dev_warn(DEV, "%s: error=%d s=%llus\n",
190 is_write ? "write" : "read", error,
191 (unsigned long long)e->sector);
192 if (!error && !uptodate) {
193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 is_write ? "write" : "read",
195 (unsigned long long)e->sector);
196 /* strange behavior of some lower level drivers...
197 * fail the request by clearing the uptodate flag,
198 * but do not return any error?! */
199 error = -EIO;
202 if (error)
203 set_bit(__EE_WAS_ERROR, &e->flags);
205 bio_put(bio); /* no need for the bio anymore */
206 if (atomic_dec_and_test(&e->pending_bios)) {
207 if (is_write)
208 drbd_endio_write_sec_final(e);
209 else
210 drbd_endio_read_sec_final(e);
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216 void drbd_endio_pri(struct bio *bio, int error)
218 struct drbd_request *req = bio->bi_private;
219 struct drbd_conf *mdev = req->mdev;
220 enum drbd_req_event what;
221 int uptodate = bio_flagged(bio, BIO_UPTODATE);
223 if (!error && !uptodate) {
224 dev_warn(DEV, "p %s: setting error to -EIO\n",
225 bio_data_dir(bio) == WRITE ? "write" : "read");
226 /* strange behavior of some lower level drivers...
227 * fail the request by clearing the uptodate flag,
228 * but do not return any error?! */
229 error = -EIO;
232 /* to avoid recursion in __req_mod */
233 if (unlikely(error)) {
234 what = (bio_data_dir(bio) == WRITE)
235 ? write_completed_with_error
236 : (bio_rw(bio) == READ)
237 ? read_completed_with_error
238 : read_ahead_completed_with_error;
239 } else
240 what = completed_ok;
242 bio_put(req->private_bio);
243 req->private_bio = ERR_PTR(error);
245 req_mod(req, what);
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
250 struct drbd_request *req = container_of(w, struct drbd_request, w);
252 /* We should not detach for read io-error,
253 * but try to WRITE the P_DATA_REPLY to the failed location,
254 * to give the disk the chance to relocate that block */
256 spin_lock_irq(&mdev->req_lock);
257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 _req_mod(req, read_retry_remote_canceled);
259 spin_unlock_irq(&mdev->req_lock);
260 return 1;
262 spin_unlock_irq(&mdev->req_lock);
264 return w_send_read_req(mdev, w, 0);
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
269 ERR_IF(cancel) return 1;
270 dev_err(DEV, "resync inactive, but callback triggered??\n");
271 return 1; /* Simply ignore this! */
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
276 struct hash_desc desc;
277 struct scatterlist sg;
278 struct page *page = e->pages;
279 struct page *tmp;
280 unsigned len;
282 desc.tfm = tfm;
283 desc.flags = 0;
285 sg_init_table(&sg, 1);
286 crypto_hash_init(&desc);
288 while ((tmp = page_chain_next(page))) {
289 /* all but the last page will be fully used */
290 sg_set_page(&sg, page, PAGE_SIZE, 0);
291 crypto_hash_update(&desc, &sg, sg.length);
292 page = tmp;
294 /* and now the last, possibly only partially used page */
295 len = e->size & (PAGE_SIZE - 1);
296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 crypto_hash_update(&desc, &sg, sg.length);
298 crypto_hash_final(&desc, digest);
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
303 struct hash_desc desc;
304 struct scatterlist sg;
305 struct bio_vec *bvec;
306 int i;
308 desc.tfm = tfm;
309 desc.flags = 0;
311 sg_init_table(&sg, 1);
312 crypto_hash_init(&desc);
314 __bio_for_each_segment(bvec, bio, i, 0) {
315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 crypto_hash_update(&desc, &sg, sg.length);
318 crypto_hash_final(&desc, digest);
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 int digest_size;
325 void *digest;
326 int ok;
328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
330 if (unlikely(cancel)) {
331 drbd_free_ee(mdev, e);
332 return 1;
335 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 digest = kmalloc(digest_size, GFP_NOIO);
338 if (digest) {
339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
341 inc_rs_pending(mdev);
342 ok = drbd_send_drequest_csum(mdev,
343 e->sector,
344 e->size,
345 digest,
346 digest_size,
347 P_CSUM_RS_REQUEST);
348 kfree(digest);
349 } else {
350 dev_err(DEV, "kmalloc() of digest failed.\n");
351 ok = 0;
353 } else
354 ok = 1;
356 drbd_free_ee(mdev, e);
358 if (unlikely(!ok))
359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 return ok;
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
367 struct drbd_epoch_entry *e;
369 if (!get_ldev(mdev))
370 return -EIO;
372 if (drbd_rs_should_slow_down(mdev))
373 goto defer;
375 /* GFP_TRY, because if there is no memory available right now, this may
376 * be rescheduled for later. It is "only" background resync, after all. */
377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378 if (!e)
379 goto defer;
381 e->w.cb = w_e_send_csum;
382 spin_lock_irq(&mdev->req_lock);
383 list_add(&e->w.list, &mdev->read_ee);
384 spin_unlock_irq(&mdev->req_lock);
386 atomic_add(size >> 9, &mdev->rs_sect_ev);
387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388 return 0;
390 drbd_free_ee(mdev, e);
391 defer:
392 put_ldev(mdev);
393 return -EAGAIN;
396 void resync_timer_fn(unsigned long data)
398 struct drbd_conf *mdev = (struct drbd_conf *) data;
399 int queue;
401 queue = 1;
402 switch (mdev->state.conn) {
403 case C_VERIFY_S:
404 mdev->resync_work.cb = w_make_ov_request;
405 break;
406 case C_SYNC_TARGET:
407 mdev->resync_work.cb = w_make_resync_request;
408 break;
409 default:
410 queue = 0;
411 mdev->resync_work.cb = w_resync_inactive;
414 /* harmless race: list_empty outside data.work.q_lock */
415 if (list_empty(&mdev->resync_work.list) && queue)
416 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
419 static void fifo_set(struct fifo_buffer *fb, int value)
421 int i;
423 for (i = 0; i < fb->size; i++)
424 fb->values[i] += value;
427 static int fifo_push(struct fifo_buffer *fb, int value)
429 int ov;
431 ov = fb->values[fb->head_index];
432 fb->values[fb->head_index++] = value;
434 if (fb->head_index >= fb->size)
435 fb->head_index = 0;
437 return ov;
440 static void fifo_add_val(struct fifo_buffer *fb, int value)
442 int i;
444 for (i = 0; i < fb->size; i++)
445 fb->values[i] += value;
448 int drbd_rs_controller(struct drbd_conf *mdev)
450 unsigned int sect_in; /* Number of sectors that came in since the last turn */
451 unsigned int want; /* The number of sectors we want in the proxy */
452 int req_sect; /* Number of sectors to request in this turn */
453 int correction; /* Number of sectors more we need in the proxy*/
454 int cps; /* correction per invocation of drbd_rs_controller() */
455 int steps; /* Number of time steps to plan ahead */
456 int curr_corr;
457 int max_sect;
459 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
460 mdev->rs_in_flight -= sect_in;
462 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
464 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
466 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
467 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
468 } else { /* normal path */
469 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
470 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
473 correction = want - mdev->rs_in_flight - mdev->rs_planed;
475 /* Plan ahead */
476 cps = correction / steps;
477 fifo_add_val(&mdev->rs_plan_s, cps);
478 mdev->rs_planed += cps * steps;
480 /* What we do in this step */
481 curr_corr = fifo_push(&mdev->rs_plan_s, 0);
482 spin_unlock(&mdev->peer_seq_lock);
483 mdev->rs_planed -= curr_corr;
485 req_sect = sect_in + curr_corr;
486 if (req_sect < 0)
487 req_sect = 0;
489 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
490 if (req_sect > max_sect)
491 req_sect = max_sect;
494 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
495 sect_in, mdev->rs_in_flight, want, correction,
496 steps, cps, mdev->rs_planed, curr_corr, req_sect);
499 return req_sect;
502 int w_make_resync_request(struct drbd_conf *mdev,
503 struct drbd_work *w, int cancel)
505 unsigned long bit;
506 sector_t sector;
507 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
508 int max_segment_size;
509 int number, rollback_i, size, pe, mx;
510 int align, queued, sndbuf;
511 int i = 0;
513 if (unlikely(cancel))
514 return 1;
516 if (unlikely(mdev->state.conn < C_CONNECTED)) {
517 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
518 return 0;
521 if (mdev->state.conn != C_SYNC_TARGET)
522 dev_err(DEV, "%s in w_make_resync_request\n",
523 drbd_conn_str(mdev->state.conn));
525 if (!get_ldev(mdev)) {
526 /* Since we only need to access mdev->rsync a
527 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528 to continue resync with a broken disk makes no sense at
529 all */
530 dev_err(DEV, "Disk broke down during resync!\n");
531 mdev->resync_work.cb = w_resync_inactive;
532 return 1;
535 /* starting with drbd 8.3.8, we can handle multi-bio EEs,
536 * if it should be necessary */
537 max_segment_size =
538 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
539 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
541 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
542 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
543 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
544 } else {
545 mdev->c_sync_rate = mdev->sync_conf.rate;
546 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
549 /* Throttle resync on lower level disk activity, which may also be
550 * caused by application IO on Primary/SyncTarget.
551 * Keep this after the call to drbd_rs_controller, as that assumes
552 * to be called as precisely as possible every SLEEP_TIME,
553 * and would be confused otherwise. */
554 if (drbd_rs_should_slow_down(mdev))
555 goto requeue;
557 mutex_lock(&mdev->data.mutex);
558 if (mdev->data.socket)
559 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
560 else
561 mx = 1;
562 mutex_unlock(&mdev->data.mutex);
564 /* For resync rates >160MB/sec, allow more pending RS requests */
565 if (number > mx)
566 mx = number;
568 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
569 pe = atomic_read(&mdev->rs_pending_cnt);
570 if ((pe + number) > mx) {
571 number = mx - pe;
574 for (i = 0; i < number; i++) {
575 /* Stop generating RS requests, when half of the send buffer is filled */
576 mutex_lock(&mdev->data.mutex);
577 if (mdev->data.socket) {
578 queued = mdev->data.socket->sk->sk_wmem_queued;
579 sndbuf = mdev->data.socket->sk->sk_sndbuf;
580 } else {
581 queued = 1;
582 sndbuf = 0;
584 mutex_unlock(&mdev->data.mutex);
585 if (queued > sndbuf / 2)
586 goto requeue;
588 next_sector:
589 size = BM_BLOCK_SIZE;
590 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
592 if (bit == -1UL) {
593 mdev->bm_resync_fo = drbd_bm_bits(mdev);
594 mdev->resync_work.cb = w_resync_inactive;
595 put_ldev(mdev);
596 return 1;
599 sector = BM_BIT_TO_SECT(bit);
601 if (drbd_try_rs_begin_io(mdev, sector)) {
602 mdev->bm_resync_fo = bit;
603 goto requeue;
605 mdev->bm_resync_fo = bit + 1;
607 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
608 drbd_rs_complete_io(mdev, sector);
609 goto next_sector;
612 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
613 /* try to find some adjacent bits.
614 * we stop if we have already the maximum req size.
616 * Additionally always align bigger requests, in order to
617 * be prepared for all stripe sizes of software RAIDs.
619 align = 1;
620 rollback_i = i;
621 for (;;) {
622 if (size + BM_BLOCK_SIZE > max_segment_size)
623 break;
625 /* Be always aligned */
626 if (sector & ((1<<(align+3))-1))
627 break;
629 /* do not cross extent boundaries */
630 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
631 break;
632 /* now, is it actually dirty, after all?
633 * caution, drbd_bm_test_bit is tri-state for some
634 * obscure reason; ( b == 0 ) would get the out-of-band
635 * only accidentally right because of the "oddly sized"
636 * adjustment below */
637 if (drbd_bm_test_bit(mdev, bit+1) != 1)
638 break;
639 bit++;
640 size += BM_BLOCK_SIZE;
641 if ((BM_BLOCK_SIZE << align) <= size)
642 align++;
643 i++;
645 /* if we merged some,
646 * reset the offset to start the next drbd_bm_find_next from */
647 if (size > BM_BLOCK_SIZE)
648 mdev->bm_resync_fo = bit + 1;
649 #endif
651 /* adjust very last sectors, in case we are oddly sized */
652 if (sector + (size>>9) > capacity)
653 size = (capacity-sector)<<9;
654 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
655 switch (read_for_csum(mdev, sector, size)) {
656 case -EIO: /* Disk failure */
657 put_ldev(mdev);
658 return 0;
659 case -EAGAIN: /* allocation failed, or ldev busy */
660 drbd_rs_complete_io(mdev, sector);
661 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
662 i = rollback_i;
663 goto requeue;
664 case 0:
665 /* everything ok */
666 break;
667 default:
668 BUG();
670 } else {
671 inc_rs_pending(mdev);
672 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
673 sector, size, ID_SYNCER)) {
674 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
675 dec_rs_pending(mdev);
676 put_ldev(mdev);
677 return 0;
682 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
683 /* last syncer _request_ was sent,
684 * but the P_RS_DATA_REPLY not yet received. sync will end (and
685 * next sync group will resume), as soon as we receive the last
686 * resync data block, and the last bit is cleared.
687 * until then resync "work" is "inactive" ...
689 mdev->resync_work.cb = w_resync_inactive;
690 put_ldev(mdev);
691 return 1;
694 requeue:
695 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
696 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
697 put_ldev(mdev);
698 return 1;
701 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
703 int number, i, size;
704 sector_t sector;
705 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
707 if (unlikely(cancel))
708 return 1;
710 if (unlikely(mdev->state.conn < C_CONNECTED)) {
711 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
712 return 0;
715 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
716 if (atomic_read(&mdev->rs_pending_cnt) > number)
717 goto requeue;
719 number -= atomic_read(&mdev->rs_pending_cnt);
721 sector = mdev->ov_position;
722 for (i = 0; i < number; i++) {
723 if (sector >= capacity) {
724 mdev->resync_work.cb = w_resync_inactive;
725 return 1;
728 size = BM_BLOCK_SIZE;
730 if (drbd_try_rs_begin_io(mdev, sector)) {
731 mdev->ov_position = sector;
732 goto requeue;
735 if (sector + (size>>9) > capacity)
736 size = (capacity-sector)<<9;
738 inc_rs_pending(mdev);
739 if (!drbd_send_ov_request(mdev, sector, size)) {
740 dec_rs_pending(mdev);
741 return 0;
743 sector += BM_SECT_PER_BIT;
745 mdev->ov_position = sector;
747 requeue:
748 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
749 return 1;
753 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 kfree(w);
756 ov_oos_print(mdev);
757 drbd_resync_finished(mdev);
759 return 1;
762 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
764 kfree(w);
766 drbd_resync_finished(mdev);
768 return 1;
771 int drbd_resync_finished(struct drbd_conf *mdev)
773 unsigned long db, dt, dbdt;
774 unsigned long n_oos;
775 union drbd_state os, ns;
776 struct drbd_work *w;
777 char *khelper_cmd = NULL;
779 /* Remove all elements from the resync LRU. Since future actions
780 * might set bits in the (main) bitmap, then the entries in the
781 * resync LRU would be wrong. */
782 if (drbd_rs_del_all(mdev)) {
783 /* In case this is not possible now, most probably because
784 * there are P_RS_DATA_REPLY Packets lingering on the worker's
785 * queue (or even the read operations for those packets
786 * is not finished by now). Retry in 100ms. */
788 drbd_kick_lo(mdev);
789 __set_current_state(TASK_INTERRUPTIBLE);
790 schedule_timeout(HZ / 10);
791 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792 if (w) {
793 w->cb = w_resync_finished;
794 drbd_queue_work(&mdev->data.work, w);
795 return 1;
797 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801 if (dt <= 0)
802 dt = 1;
803 db = mdev->rs_total;
804 dbdt = Bit2KB(db/dt);
805 mdev->rs_paused /= HZ;
807 if (!get_ldev(mdev))
808 goto out;
810 spin_lock_irq(&mdev->req_lock);
811 os = mdev->state;
813 /* This protects us against multiple calls (that can happen in the presence
814 of application IO), and against connectivity loss just before we arrive here. */
815 if (os.conn <= C_CONNECTED)
816 goto out_unlock;
818 ns = os;
819 ns.conn = C_CONNECTED;
821 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
822 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
823 "Online verify " : "Resync",
824 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
826 n_oos = drbd_bm_total_weight(mdev);
828 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
829 if (n_oos) {
830 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
831 n_oos, Bit2KB(1));
832 khelper_cmd = "out-of-sync";
834 } else {
835 D_ASSERT((n_oos - mdev->rs_failed) == 0);
837 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
838 khelper_cmd = "after-resync-target";
840 if (mdev->csums_tfm && mdev->rs_total) {
841 const unsigned long s = mdev->rs_same_csum;
842 const unsigned long t = mdev->rs_total;
843 const int ratio =
844 (t == 0) ? 0 :
845 (t < 100000) ? ((s*100)/t) : (s/(t/100));
846 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
847 "transferred %luK total %luK\n",
848 ratio,
849 Bit2KB(mdev->rs_same_csum),
850 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
851 Bit2KB(mdev->rs_total));
855 if (mdev->rs_failed) {
856 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
858 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
859 ns.disk = D_INCONSISTENT;
860 ns.pdsk = D_UP_TO_DATE;
861 } else {
862 ns.disk = D_UP_TO_DATE;
863 ns.pdsk = D_INCONSISTENT;
865 } else {
866 ns.disk = D_UP_TO_DATE;
867 ns.pdsk = D_UP_TO_DATE;
869 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
870 if (mdev->p_uuid) {
871 int i;
872 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
873 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
874 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
875 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
876 } else {
877 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881 drbd_uuid_set_bm(mdev, 0UL);
883 if (mdev->p_uuid) {
884 /* Now the two UUID sets are equal, update what we
885 * know of the peer. */
886 int i;
887 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
888 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
892 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
893 out_unlock:
894 spin_unlock_irq(&mdev->req_lock);
895 put_ldev(mdev);
896 out:
897 mdev->rs_total = 0;
898 mdev->rs_failed = 0;
899 mdev->rs_paused = 0;
900 mdev->ov_start_sector = 0;
902 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
903 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
904 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
907 if (khelper_cmd)
908 drbd_khelper(mdev, khelper_cmd);
910 return 1;
913 /* helper */
914 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
916 if (drbd_ee_has_active_page(e)) {
917 /* This might happen if sendpage() has not finished */
918 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
919 atomic_add(i, &mdev->pp_in_use_by_net);
920 atomic_sub(i, &mdev->pp_in_use);
921 spin_lock_irq(&mdev->req_lock);
922 list_add_tail(&e->w.list, &mdev->net_ee);
923 spin_unlock_irq(&mdev->req_lock);
924 wake_up(&drbd_pp_wait);
925 } else
926 drbd_free_ee(mdev, e);
930 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
931 * @mdev: DRBD device.
932 * @w: work object.
933 * @cancel: The connection will be closed anyways
935 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
937 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
938 int ok;
940 if (unlikely(cancel)) {
941 drbd_free_ee(mdev, e);
942 dec_unacked(mdev);
943 return 1;
946 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
947 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
948 } else {
949 if (__ratelimit(&drbd_ratelimit_state))
950 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
951 (unsigned long long)e->sector);
953 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
956 dec_unacked(mdev);
958 move_to_net_ee_or_free(mdev, e);
960 if (unlikely(!ok))
961 dev_err(DEV, "drbd_send_block() failed\n");
962 return ok;
966 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
967 * @mdev: DRBD device.
968 * @w: work object.
969 * @cancel: The connection will be closed anyways
971 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
973 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
974 int ok;
976 if (unlikely(cancel)) {
977 drbd_free_ee(mdev, e);
978 dec_unacked(mdev);
979 return 1;
982 if (get_ldev_if_state(mdev, D_FAILED)) {
983 drbd_rs_complete_io(mdev, e->sector);
984 put_ldev(mdev);
987 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
988 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
989 inc_rs_pending(mdev);
990 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
991 } else {
992 if (__ratelimit(&drbd_ratelimit_state))
993 dev_err(DEV, "Not sending RSDataReply, "
994 "partner DISKLESS!\n");
995 ok = 1;
997 } else {
998 if (__ratelimit(&drbd_ratelimit_state))
999 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1000 (unsigned long long)e->sector);
1002 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1004 /* update resync data with failure */
1005 drbd_rs_failed_io(mdev, e->sector, e->size);
1008 dec_unacked(mdev);
1010 move_to_net_ee_or_free(mdev, e);
1012 if (unlikely(!ok))
1013 dev_err(DEV, "drbd_send_block() failed\n");
1014 return ok;
1017 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1019 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1020 struct digest_info *di;
1021 int digest_size;
1022 void *digest = NULL;
1023 int ok, eq = 0;
1025 if (unlikely(cancel)) {
1026 drbd_free_ee(mdev, e);
1027 dec_unacked(mdev);
1028 return 1;
1031 if (get_ldev(mdev)) {
1032 drbd_rs_complete_io(mdev, e->sector);
1033 put_ldev(mdev);
1036 di = e->digest;
1038 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1039 /* quick hack to try to avoid a race against reconfiguration.
1040 * a real fix would be much more involved,
1041 * introducing more locking mechanisms */
1042 if (mdev->csums_tfm) {
1043 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1044 D_ASSERT(digest_size == di->digest_size);
1045 digest = kmalloc(digest_size, GFP_NOIO);
1047 if (digest) {
1048 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1049 eq = !memcmp(digest, di->digest, digest_size);
1050 kfree(digest);
1053 if (eq) {
1054 drbd_set_in_sync(mdev, e->sector, e->size);
1055 /* rs_same_csums unit is BM_BLOCK_SIZE */
1056 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1057 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1058 } else {
1059 inc_rs_pending(mdev);
1060 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1061 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1062 kfree(di);
1063 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1065 } else {
1066 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1067 if (__ratelimit(&drbd_ratelimit_state))
1068 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1071 dec_unacked(mdev);
1072 move_to_net_ee_or_free(mdev, e);
1074 if (unlikely(!ok))
1075 dev_err(DEV, "drbd_send_block/ack() failed\n");
1076 return ok;
1079 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1081 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1082 int digest_size;
1083 void *digest;
1084 int ok = 1;
1086 if (unlikely(cancel))
1087 goto out;
1089 if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1090 goto out;
1092 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1093 /* FIXME if this allocation fails, online verify will not terminate! */
1094 digest = kmalloc(digest_size, GFP_NOIO);
1095 if (digest) {
1096 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097 inc_rs_pending(mdev);
1098 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1099 digest, digest_size, P_OV_REPLY);
1100 if (!ok)
1101 dec_rs_pending(mdev);
1102 kfree(digest);
1105 out:
1106 drbd_free_ee(mdev, e);
1108 dec_unacked(mdev);
1110 return ok;
1113 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1115 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1116 mdev->ov_last_oos_size += size>>9;
1117 } else {
1118 mdev->ov_last_oos_start = sector;
1119 mdev->ov_last_oos_size = size>>9;
1121 drbd_set_out_of_sync(mdev, sector, size);
1122 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1125 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1127 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1128 struct digest_info *di;
1129 int digest_size;
1130 void *digest;
1131 int ok, eq = 0;
1133 if (unlikely(cancel)) {
1134 drbd_free_ee(mdev, e);
1135 dec_unacked(mdev);
1136 return 1;
1139 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1140 * the resync lru has been cleaned up already */
1141 if (get_ldev(mdev)) {
1142 drbd_rs_complete_io(mdev, e->sector);
1143 put_ldev(mdev);
1146 di = e->digest;
1148 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1149 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1150 digest = kmalloc(digest_size, GFP_NOIO);
1151 if (digest) {
1152 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1154 D_ASSERT(digest_size == di->digest_size);
1155 eq = !memcmp(digest, di->digest, digest_size);
1156 kfree(digest);
1158 } else {
1159 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1160 if (__ratelimit(&drbd_ratelimit_state))
1161 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1164 dec_unacked(mdev);
1165 if (!eq)
1166 drbd_ov_oos_found(mdev, e->sector, e->size);
1167 else
1168 ov_oos_print(mdev);
1170 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1171 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1173 drbd_free_ee(mdev, e);
1175 if (--mdev->ov_left == 0) {
1176 ov_oos_print(mdev);
1177 drbd_resync_finished(mdev);
1180 return ok;
1183 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1185 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1186 complete(&b->done);
1187 return 1;
1190 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1192 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1193 struct p_barrier *p = &mdev->data.sbuf.barrier;
1194 int ok = 1;
1196 /* really avoid racing with tl_clear. w.cb may have been referenced
1197 * just before it was reassigned and re-queued, so double check that.
1198 * actually, this race was harmless, since we only try to send the
1199 * barrier packet here, and otherwise do nothing with the object.
1200 * but compare with the head of w_clear_epoch */
1201 spin_lock_irq(&mdev->req_lock);
1202 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1203 cancel = 1;
1204 spin_unlock_irq(&mdev->req_lock);
1205 if (cancel)
1206 return 1;
1208 if (!drbd_get_data_sock(mdev))
1209 return 0;
1210 p->barrier = b->br_number;
1211 /* inc_ap_pending was done where this was queued.
1212 * dec_ap_pending will be done in got_BarrierAck
1213 * or (on connection loss) in w_clear_epoch. */
1214 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1215 (struct p_header80 *)p, sizeof(*p), 0);
1216 drbd_put_data_sock(mdev);
1218 return ok;
1221 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1223 if (cancel)
1224 return 1;
1225 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1229 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1230 * @mdev: DRBD device.
1231 * @w: work object.
1232 * @cancel: The connection will be closed anyways
1234 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1236 struct drbd_request *req = container_of(w, struct drbd_request, w);
1237 int ok;
1239 if (unlikely(cancel)) {
1240 req_mod(req, send_canceled);
1241 return 1;
1244 ok = drbd_send_dblock(mdev, req);
1245 req_mod(req, ok ? handed_over_to_network : send_failed);
1247 return ok;
1251 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1252 * @mdev: DRBD device.
1253 * @w: work object.
1254 * @cancel: The connection will be closed anyways
1256 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1258 struct drbd_request *req = container_of(w, struct drbd_request, w);
1259 int ok;
1261 if (unlikely(cancel)) {
1262 req_mod(req, send_canceled);
1263 return 1;
1266 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1267 (unsigned long)req);
1269 if (!ok) {
1270 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1271 * so this is probably redundant */
1272 if (mdev->state.conn >= C_CONNECTED)
1273 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1275 req_mod(req, ok ? handed_over_to_network : send_failed);
1277 return ok;
1280 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1282 struct drbd_request *req = container_of(w, struct drbd_request, w);
1284 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1285 drbd_al_begin_io(mdev, req->sector);
1286 /* Calling drbd_al_begin_io() out of the worker might deadlocks
1287 theoretically. Practically it can not deadlock, since this is
1288 only used when unfreezing IOs. All the extents of the requests
1289 that made it into the TL are already active */
1291 drbd_req_make_private_bio(req, req->master_bio);
1292 req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1293 generic_make_request(req->private_bio);
1295 return 1;
1298 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1300 struct drbd_conf *odev = mdev;
1302 while (1) {
1303 if (odev->sync_conf.after == -1)
1304 return 1;
1305 odev = minor_to_mdev(odev->sync_conf.after);
1306 ERR_IF(!odev) return 1;
1307 if ((odev->state.conn >= C_SYNC_SOURCE &&
1308 odev->state.conn <= C_PAUSED_SYNC_T) ||
1309 odev->state.aftr_isp || odev->state.peer_isp ||
1310 odev->state.user_isp)
1311 return 0;
1316 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1317 * @mdev: DRBD device.
1319 * Called from process context only (admin command and after_state_ch).
1321 static int _drbd_pause_after(struct drbd_conf *mdev)
1323 struct drbd_conf *odev;
1324 int i, rv = 0;
1326 for (i = 0; i < minor_count; i++) {
1327 odev = minor_to_mdev(i);
1328 if (!odev)
1329 continue;
1330 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1331 continue;
1332 if (!_drbd_may_sync_now(odev))
1333 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1334 != SS_NOTHING_TO_DO);
1337 return rv;
1341 * _drbd_resume_next() - Resume resync on all devices that may resync now
1342 * @mdev: DRBD device.
1344 * Called from process context only (admin command and worker).
1346 static int _drbd_resume_next(struct drbd_conf *mdev)
1348 struct drbd_conf *odev;
1349 int i, rv = 0;
1351 for (i = 0; i < minor_count; i++) {
1352 odev = minor_to_mdev(i);
1353 if (!odev)
1354 continue;
1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356 continue;
1357 if (odev->state.aftr_isp) {
1358 if (_drbd_may_sync_now(odev))
1359 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1360 CS_HARD, NULL)
1361 != SS_NOTHING_TO_DO) ;
1364 return rv;
1367 void resume_next_sg(struct drbd_conf *mdev)
1369 write_lock_irq(&global_state_lock);
1370 _drbd_resume_next(mdev);
1371 write_unlock_irq(&global_state_lock);
1374 void suspend_other_sg(struct drbd_conf *mdev)
1376 write_lock_irq(&global_state_lock);
1377 _drbd_pause_after(mdev);
1378 write_unlock_irq(&global_state_lock);
1381 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1383 struct drbd_conf *odev;
1385 if (o_minor == -1)
1386 return NO_ERROR;
1387 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1388 return ERR_SYNC_AFTER;
1390 /* check for loops */
1391 odev = minor_to_mdev(o_minor);
1392 while (1) {
1393 if (odev == mdev)
1394 return ERR_SYNC_AFTER_CYCLE;
1396 /* dependency chain ends here, no cycles. */
1397 if (odev->sync_conf.after == -1)
1398 return NO_ERROR;
1400 /* follow the dependency chain */
1401 odev = minor_to_mdev(odev->sync_conf.after);
1405 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1407 int changes;
1408 int retcode;
1410 write_lock_irq(&global_state_lock);
1411 retcode = sync_after_error(mdev, na);
1412 if (retcode == NO_ERROR) {
1413 mdev->sync_conf.after = na;
1414 do {
1415 changes = _drbd_pause_after(mdev);
1416 changes |= _drbd_resume_next(mdev);
1417 } while (changes);
1419 write_unlock_irq(&global_state_lock);
1420 return retcode;
1423 static void ping_peer(struct drbd_conf *mdev)
1425 clear_bit(GOT_PING_ACK, &mdev->flags);
1426 request_ping(mdev);
1427 wait_event(mdev->misc_wait,
1428 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1432 * drbd_start_resync() - Start the resync process
1433 * @mdev: DRBD device.
1434 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1436 * This function might bring you directly into one of the
1437 * C_PAUSED_SYNC_* states.
1439 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1441 union drbd_state ns;
1442 int r;
1444 if (mdev->state.conn >= C_SYNC_SOURCE) {
1445 dev_err(DEV, "Resync already running!\n");
1446 return;
1449 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1450 drbd_rs_cancel_all(mdev);
1452 if (side == C_SYNC_TARGET) {
1453 /* Since application IO was locked out during C_WF_BITMAP_T and
1454 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1455 we check that we might make the data inconsistent. */
1456 r = drbd_khelper(mdev, "before-resync-target");
1457 r = (r >> 8) & 0xff;
1458 if (r > 0) {
1459 dev_info(DEV, "before-resync-target handler returned %d, "
1460 "dropping connection.\n", r);
1461 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1462 return;
1466 drbd_state_lock(mdev);
1468 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1469 drbd_state_unlock(mdev);
1470 return;
1473 if (side == C_SYNC_TARGET) {
1474 mdev->bm_resync_fo = 0;
1475 } else /* side == C_SYNC_SOURCE */ {
1476 u64 uuid;
1478 get_random_bytes(&uuid, sizeof(u64));
1479 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1480 drbd_send_sync_uuid(mdev, uuid);
1482 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1485 write_lock_irq(&global_state_lock);
1486 ns = mdev->state;
1488 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1490 ns.conn = side;
1492 if (side == C_SYNC_TARGET)
1493 ns.disk = D_INCONSISTENT;
1494 else /* side == C_SYNC_SOURCE */
1495 ns.pdsk = D_INCONSISTENT;
1497 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1498 ns = mdev->state;
1500 if (ns.conn < C_CONNECTED)
1501 r = SS_UNKNOWN_ERROR;
1503 if (r == SS_SUCCESS) {
1504 unsigned long tw = drbd_bm_total_weight(mdev);
1505 unsigned long now = jiffies;
1506 int i;
1508 mdev->rs_failed = 0;
1509 mdev->rs_paused = 0;
1510 mdev->rs_same_csum = 0;
1511 mdev->rs_last_events = 0;
1512 mdev->rs_last_sect_ev = 0;
1513 mdev->rs_total = tw;
1514 mdev->rs_start = now;
1515 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1516 mdev->rs_mark_left[i] = tw;
1517 mdev->rs_mark_time[i] = now;
1519 _drbd_pause_after(mdev);
1521 write_unlock_irq(&global_state_lock);
1522 put_ldev(mdev);
1524 if (r == SS_SUCCESS) {
1525 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1526 drbd_conn_str(ns.conn),
1527 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1528 (unsigned long) mdev->rs_total);
1530 if (mdev->rs_total == 0) {
1531 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1532 ping_peer(mdev);
1533 drbd_resync_finished(mdev);
1536 atomic_set(&mdev->rs_sect_in, 0);
1537 atomic_set(&mdev->rs_sect_ev, 0);
1538 mdev->rs_in_flight = 0;
1539 mdev->rs_planed = 0;
1540 spin_lock(&mdev->peer_seq_lock);
1541 fifo_set(&mdev->rs_plan_s, 0);
1542 spin_unlock(&mdev->peer_seq_lock);
1543 /* ns.conn may already be != mdev->state.conn,
1544 * we may have been paused in between, or become paused until
1545 * the timer triggers.
1546 * No matter, that is handled in resync_timer_fn() */
1547 if (ns.conn == C_SYNC_TARGET)
1548 mod_timer(&mdev->resync_timer, jiffies);
1550 drbd_md_sync(mdev);
1552 drbd_state_unlock(mdev);
1555 int drbd_worker(struct drbd_thread *thi)
1557 struct drbd_conf *mdev = thi->mdev;
1558 struct drbd_work *w = NULL;
1559 LIST_HEAD(work_list);
1560 int intr = 0, i;
1562 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1564 while (get_t_state(thi) == Running) {
1565 drbd_thread_current_set_cpu(mdev);
1567 if (down_trylock(&mdev->data.work.s)) {
1568 mutex_lock(&mdev->data.mutex);
1569 if (mdev->data.socket && !mdev->net_conf->no_cork)
1570 drbd_tcp_uncork(mdev->data.socket);
1571 mutex_unlock(&mdev->data.mutex);
1573 intr = down_interruptible(&mdev->data.work.s);
1575 mutex_lock(&mdev->data.mutex);
1576 if (mdev->data.socket && !mdev->net_conf->no_cork)
1577 drbd_tcp_cork(mdev->data.socket);
1578 mutex_unlock(&mdev->data.mutex);
1581 if (intr) {
1582 D_ASSERT(intr == -EINTR);
1583 flush_signals(current);
1584 ERR_IF (get_t_state(thi) == Running)
1585 continue;
1586 break;
1589 if (get_t_state(thi) != Running)
1590 break;
1591 /* With this break, we have done a down() but not consumed
1592 the entry from the list. The cleanup code takes care of
1593 this... */
1595 w = NULL;
1596 spin_lock_irq(&mdev->data.work.q_lock);
1597 ERR_IF(list_empty(&mdev->data.work.q)) {
1598 /* something terribly wrong in our logic.
1599 * we were able to down() the semaphore,
1600 * but the list is empty... doh.
1602 * what is the best thing to do now?
1603 * try again from scratch, restarting the receiver,
1604 * asender, whatnot? could break even more ugly,
1605 * e.g. when we are primary, but no good local data.
1607 * I'll try to get away just starting over this loop.
1609 spin_unlock_irq(&mdev->data.work.q_lock);
1610 continue;
1612 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1613 list_del_init(&w->list);
1614 spin_unlock_irq(&mdev->data.work.q_lock);
1616 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1617 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1618 if (mdev->state.conn >= C_CONNECTED)
1619 drbd_force_state(mdev,
1620 NS(conn, C_NETWORK_FAILURE));
1623 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1624 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1626 spin_lock_irq(&mdev->data.work.q_lock);
1627 i = 0;
1628 while (!list_empty(&mdev->data.work.q)) {
1629 list_splice_init(&mdev->data.work.q, &work_list);
1630 spin_unlock_irq(&mdev->data.work.q_lock);
1632 while (!list_empty(&work_list)) {
1633 w = list_entry(work_list.next, struct drbd_work, list);
1634 list_del_init(&w->list);
1635 w->cb(mdev, w, 1);
1636 i++; /* dead debugging code */
1639 spin_lock_irq(&mdev->data.work.q_lock);
1641 sema_init(&mdev->data.work.s, 0);
1642 /* DANGEROUS race: if someone did queue his work within the spinlock,
1643 * but up() ed outside the spinlock, we could get an up() on the
1644 * semaphore without corresponding list entry.
1645 * So don't do that.
1647 spin_unlock_irq(&mdev->data.work.q_lock);
1649 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1650 /* _drbd_set_state only uses stop_nowait.
1651 * wait here for the Exiting receiver. */
1652 drbd_thread_stop(&mdev->receiver);
1653 drbd_mdev_cleanup(mdev);
1655 dev_info(DEV, "worker terminated\n");
1657 clear_bit(DEVICE_DYING, &mdev->flags);
1658 clear_bit(CONFIG_PENDING, &mdev->flags);
1659 wake_up(&mdev->state_wait);
1661 return 0;