block/block-copy: add list of all call-states
[qemu/ar7.git] / block / block-copy.c
blob6bf1735b932752f8f633ae9f985230f73d033579
1 /*
2 * block_copy API
4 * Copyright (C) 2013 Proxmox Server Solutions
5 * Copyright (c) 2019 Virtuozzo International GmbH.
7 * Authors:
8 * Dietmar Maurer (dietmar@proxmox.com)
9 * Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
12 * See the COPYING file in the top-level directory.
15 #include "qemu/osdep.h"
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
30 static coroutine_fn int block_copy_task_entry(AioTask *task);
32 typedef struct BlockCopyCallState {
33 /* IN parameters. Initialized in block_copy_async() and never changed. */
34 BlockCopyState *s;
35 int64_t offset;
36 int64_t bytes;
37 int max_workers;
38 int64_t max_chunk;
39 BlockCopyAsyncCallbackFunc cb;
40 void *cb_opaque;
42 /* Coroutine where async block-copy is running */
43 Coroutine *co;
45 /* To reference all call states from BlockCopyState */
46 QLIST_ENTRY(BlockCopyCallState) list;
48 /* State */
49 int ret;
50 bool finished;
52 /* OUT parameters */
53 bool error_is_read;
54 } BlockCopyCallState;
56 typedef struct BlockCopyTask {
57 AioTask task;
59 BlockCopyState *s;
60 BlockCopyCallState *call_state;
61 int64_t offset;
62 int64_t bytes;
63 bool zeroes;
64 QLIST_ENTRY(BlockCopyTask) list;
65 CoQueue wait_queue; /* coroutines blocked on this task */
66 } BlockCopyTask;
68 static int64_t task_end(BlockCopyTask *task)
70 return task->offset + task->bytes;
73 typedef struct BlockCopyState {
75 * BdrvChild objects are not owned or managed by block-copy. They are
76 * provided by block-copy user and user is responsible for appropriate
77 * permissions on these children.
79 BdrvChild *source;
80 BdrvChild *target;
81 BdrvDirtyBitmap *copy_bitmap;
82 int64_t in_flight_bytes;
83 int64_t cluster_size;
84 bool use_copy_range;
85 int64_t copy_size;
86 uint64_t len;
87 QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
88 QLIST_HEAD(, BlockCopyCallState) calls;
90 BdrvRequestFlags write_flags;
93 * skip_unallocated:
95 * Used by sync=top jobs, which first scan the source node for unallocated
96 * areas and clear them in the copy_bitmap. During this process, the bitmap
97 * is thus not fully initialized: It may still have bits set for areas that
98 * are unallocated and should actually not be copied.
100 * This is indicated by skip_unallocated.
102 * In this case, block_copy() will query the source’s allocation status,
103 * skip unallocated regions, clear them in the copy_bitmap, and invoke
104 * block_copy_reset_unallocated() every time it does.
106 bool skip_unallocated;
108 ProgressMeter *progress;
109 /* progress_bytes_callback: called when some copying progress is done. */
110 ProgressBytesCallbackFunc progress_bytes_callback;
111 void *progress_opaque;
113 SharedResource *mem;
114 } BlockCopyState;
116 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
117 int64_t offset, int64_t bytes)
119 BlockCopyTask *t;
121 QLIST_FOREACH(t, &s->tasks, list) {
122 if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
123 return t;
127 return NULL;
131 * If there are no intersecting tasks return false. Otherwise, wait for the
132 * first found intersecting tasks to finish and return true.
134 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
135 int64_t bytes)
137 BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
139 if (!task) {
140 return false;
143 qemu_co_queue_wait(&task->wait_queue, NULL);
145 return true;
149 * Search for the first dirty area in offset/bytes range and create task at
150 * the beginning of it.
152 static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
153 BlockCopyCallState *call_state,
154 int64_t offset, int64_t bytes)
156 BlockCopyTask *task;
157 int64_t max_chunk = MIN_NON_ZERO(s->copy_size, call_state->max_chunk);
159 if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
160 offset, offset + bytes,
161 max_chunk, &offset, &bytes))
163 return NULL;
166 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
167 bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
169 /* region is dirty, so no existent tasks possible in it */
170 assert(!find_conflicting_task(s, offset, bytes));
172 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
173 s->in_flight_bytes += bytes;
175 task = g_new(BlockCopyTask, 1);
176 *task = (BlockCopyTask) {
177 .task.func = block_copy_task_entry,
178 .s = s,
179 .call_state = call_state,
180 .offset = offset,
181 .bytes = bytes,
183 qemu_co_queue_init(&task->wait_queue);
184 QLIST_INSERT_HEAD(&s->tasks, task, list);
186 return task;
190 * block_copy_task_shrink
192 * Drop the tail of the task to be handled later. Set dirty bits back and
193 * wake up all tasks waiting for us (may be some of them are not intersecting
194 * with shrunk task)
196 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
197 int64_t new_bytes)
199 if (new_bytes == task->bytes) {
200 return;
203 assert(new_bytes > 0 && new_bytes < task->bytes);
205 task->s->in_flight_bytes -= task->bytes - new_bytes;
206 bdrv_set_dirty_bitmap(task->s->copy_bitmap,
207 task->offset + new_bytes, task->bytes - new_bytes);
209 task->bytes = new_bytes;
210 qemu_co_queue_restart_all(&task->wait_queue);
213 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
215 task->s->in_flight_bytes -= task->bytes;
216 if (ret < 0) {
217 bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
219 QLIST_REMOVE(task, list);
220 qemu_co_queue_restart_all(&task->wait_queue);
223 void block_copy_state_free(BlockCopyState *s)
225 if (!s) {
226 return;
229 bdrv_release_dirty_bitmap(s->copy_bitmap);
230 shres_destroy(s->mem);
231 g_free(s);
234 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
236 return MIN_NON_ZERO(INT_MAX,
237 MIN_NON_ZERO(source->bs->bl.max_transfer,
238 target->bs->bl.max_transfer));
241 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
242 int64_t cluster_size, bool use_copy_range,
243 BdrvRequestFlags write_flags, Error **errp)
245 BlockCopyState *s;
246 BdrvDirtyBitmap *copy_bitmap;
248 copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
249 errp);
250 if (!copy_bitmap) {
251 return NULL;
253 bdrv_disable_dirty_bitmap(copy_bitmap);
255 s = g_new(BlockCopyState, 1);
256 *s = (BlockCopyState) {
257 .source = source,
258 .target = target,
259 .copy_bitmap = copy_bitmap,
260 .cluster_size = cluster_size,
261 .len = bdrv_dirty_bitmap_size(copy_bitmap),
262 .write_flags = write_flags,
263 .mem = shres_create(BLOCK_COPY_MAX_MEM),
266 if (block_copy_max_transfer(source, target) < cluster_size) {
268 * copy_range does not respect max_transfer. We don't want to bother
269 * with requests smaller than block-copy cluster size, so fallback to
270 * buffered copying (read and write respect max_transfer on their
271 * behalf).
273 s->use_copy_range = false;
274 s->copy_size = cluster_size;
275 } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
276 /* Compression supports only cluster-size writes and no copy-range. */
277 s->use_copy_range = false;
278 s->copy_size = cluster_size;
279 } else {
281 * We enable copy-range, but keep small copy_size, until first
282 * successful copy_range (look at block_copy_do_copy).
284 s->use_copy_range = use_copy_range;
285 s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
288 QLIST_INIT(&s->tasks);
289 QLIST_INIT(&s->calls);
291 return s;
294 void block_copy_set_progress_callback(
295 BlockCopyState *s,
296 ProgressBytesCallbackFunc progress_bytes_callback,
297 void *progress_opaque)
299 s->progress_bytes_callback = progress_bytes_callback;
300 s->progress_opaque = progress_opaque;
303 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
305 s->progress = pm;
309 * Takes ownership of @task
311 * If pool is NULL directly run the task, otherwise schedule it into the pool.
313 * Returns: task.func return code if pool is NULL
314 * otherwise -ECANCELED if pool status is bad
315 * otherwise 0 (successfully scheduled)
317 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
318 BlockCopyTask *task)
320 if (!pool) {
321 int ret = task->task.func(&task->task);
323 g_free(task);
324 return ret;
327 aio_task_pool_wait_slot(pool);
328 if (aio_task_pool_status(pool) < 0) {
329 co_put_to_shres(task->s->mem, task->bytes);
330 block_copy_task_end(task, -ECANCELED);
331 g_free(task);
332 return -ECANCELED;
335 aio_task_pool_start_task(pool, &task->task);
337 return 0;
341 * block_copy_do_copy
343 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
344 * s->len only to cover last cluster when s->len is not aligned to clusters.
346 * No sync here: nor bitmap neighter intersecting requests handling, only copy.
348 * Returns 0 on success.
350 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
351 int64_t offset, int64_t bytes,
352 bool zeroes, bool *error_is_read)
354 int ret;
355 int64_t nbytes = MIN(offset + bytes, s->len) - offset;
356 void *bounce_buffer = NULL;
358 assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
359 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
360 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
361 assert(offset < s->len);
362 assert(offset + bytes <= s->len ||
363 offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
364 assert(nbytes < INT_MAX);
366 if (zeroes) {
367 ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
368 ~BDRV_REQ_WRITE_COMPRESSED);
369 if (ret < 0) {
370 trace_block_copy_write_zeroes_fail(s, offset, ret);
371 *error_is_read = false;
373 return ret;
376 if (s->use_copy_range) {
377 ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
378 0, s->write_flags);
379 if (ret < 0) {
380 trace_block_copy_copy_range_fail(s, offset, ret);
381 s->use_copy_range = false;
382 s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
383 /* Fallback to read+write with allocated buffer */
384 } else {
385 if (s->use_copy_range) {
387 * Successful copy-range. Now increase copy_size. copy_range
388 * does not respect max_transfer (it's a TODO), so we factor
389 * that in here.
391 * Note: we double-check s->use_copy_range for the case when
392 * parallel block-copy request unsets it during previous
393 * bdrv_co_copy_range call.
395 s->copy_size =
396 MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
397 QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source,
398 s->target),
399 s->cluster_size));
401 goto out;
406 * In case of failed copy_range request above, we may proceed with buffered
407 * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will
408 * be properly limited, so don't care too much. Moreover the most likely
409 * case (copy_range is unsupported for the configuration, so the very first
410 * copy_range request fails) is handled by setting large copy_size only
411 * after first successful copy_range.
414 bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
416 ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
417 if (ret < 0) {
418 trace_block_copy_read_fail(s, offset, ret);
419 *error_is_read = true;
420 goto out;
423 ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
424 s->write_flags);
425 if (ret < 0) {
426 trace_block_copy_write_fail(s, offset, ret);
427 *error_is_read = false;
428 goto out;
431 out:
432 qemu_vfree(bounce_buffer);
434 return ret;
437 static coroutine_fn int block_copy_task_entry(AioTask *task)
439 BlockCopyTask *t = container_of(task, BlockCopyTask, task);
440 bool error_is_read = false;
441 int ret;
443 ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
444 &error_is_read);
445 if (ret < 0 && !t->call_state->ret) {
446 t->call_state->ret = ret;
447 t->call_state->error_is_read = error_is_read;
448 } else {
449 progress_work_done(t->s->progress, t->bytes);
450 t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
452 co_put_to_shres(t->s->mem, t->bytes);
453 block_copy_task_end(t, ret);
455 return ret;
458 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
459 int64_t bytes, int64_t *pnum)
461 int64_t num;
462 BlockDriverState *base;
463 int ret;
465 if (s->skip_unallocated) {
466 base = bdrv_backing_chain_next(s->source->bs);
467 } else {
468 base = NULL;
471 ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
472 NULL, NULL);
473 if (ret < 0 || num < s->cluster_size) {
475 * On error or if failed to obtain large enough chunk just fallback to
476 * copy one cluster.
478 num = s->cluster_size;
479 ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
480 } else if (offset + num == s->len) {
481 num = QEMU_ALIGN_UP(num, s->cluster_size);
482 } else {
483 num = QEMU_ALIGN_DOWN(num, s->cluster_size);
486 *pnum = num;
487 return ret;
491 * Check if the cluster starting at offset is allocated or not.
492 * return via pnum the number of contiguous clusters sharing this allocation.
494 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
495 int64_t *pnum)
497 BlockDriverState *bs = s->source->bs;
498 int64_t count, total_count = 0;
499 int64_t bytes = s->len - offset;
500 int ret;
502 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
504 while (true) {
505 ret = bdrv_is_allocated(bs, offset, bytes, &count);
506 if (ret < 0) {
507 return ret;
510 total_count += count;
512 if (ret || count == 0) {
514 * ret: partial segment(s) are considered allocated.
515 * otherwise: unallocated tail is treated as an entire segment.
517 *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
518 return ret;
521 /* Unallocated segment(s) with uncertain following segment(s) */
522 if (total_count >= s->cluster_size) {
523 *pnum = total_count / s->cluster_size;
524 return 0;
527 offset += count;
528 bytes -= count;
533 * Reset bits in copy_bitmap starting at offset if they represent unallocated
534 * data in the image. May reset subsequent contiguous bits.
535 * @return 0 when the cluster at @offset was unallocated,
536 * 1 otherwise, and -ret on error.
538 int64_t block_copy_reset_unallocated(BlockCopyState *s,
539 int64_t offset, int64_t *count)
541 int ret;
542 int64_t clusters, bytes;
544 ret = block_copy_is_cluster_allocated(s, offset, &clusters);
545 if (ret < 0) {
546 return ret;
549 bytes = clusters * s->cluster_size;
551 if (!ret) {
552 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
553 progress_set_remaining(s->progress,
554 bdrv_get_dirty_count(s->copy_bitmap) +
555 s->in_flight_bytes);
558 *count = bytes;
559 return ret;
563 * block_copy_dirty_clusters
565 * Copy dirty clusters in @offset/@bytes range.
566 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
567 * clusters found and -errno on failure.
569 static int coroutine_fn
570 block_copy_dirty_clusters(BlockCopyCallState *call_state)
572 BlockCopyState *s = call_state->s;
573 int64_t offset = call_state->offset;
574 int64_t bytes = call_state->bytes;
576 int ret = 0;
577 bool found_dirty = false;
578 int64_t end = offset + bytes;
579 AioTaskPool *aio = NULL;
582 * block_copy() user is responsible for keeping source and target in same
583 * aio context
585 assert(bdrv_get_aio_context(s->source->bs) ==
586 bdrv_get_aio_context(s->target->bs));
588 assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
589 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
591 while (bytes && aio_task_pool_status(aio) == 0) {
592 BlockCopyTask *task;
593 int64_t status_bytes;
595 task = block_copy_task_create(s, call_state, offset, bytes);
596 if (!task) {
597 /* No more dirty bits in the bitmap */
598 trace_block_copy_skip_range(s, offset, bytes);
599 break;
601 if (task->offset > offset) {
602 trace_block_copy_skip_range(s, offset, task->offset - offset);
605 found_dirty = true;
607 ret = block_copy_block_status(s, task->offset, task->bytes,
608 &status_bytes);
609 assert(ret >= 0); /* never fail */
610 if (status_bytes < task->bytes) {
611 block_copy_task_shrink(task, status_bytes);
613 if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
614 block_copy_task_end(task, 0);
615 progress_set_remaining(s->progress,
616 bdrv_get_dirty_count(s->copy_bitmap) +
617 s->in_flight_bytes);
618 trace_block_copy_skip_range(s, task->offset, task->bytes);
619 offset = task_end(task);
620 bytes = end - offset;
621 g_free(task);
622 continue;
624 task->zeroes = ret & BDRV_BLOCK_ZERO;
626 trace_block_copy_process(s, task->offset);
628 co_get_from_shres(s->mem, task->bytes);
630 offset = task_end(task);
631 bytes = end - offset;
633 if (!aio && bytes) {
634 aio = aio_task_pool_new(call_state->max_workers);
637 ret = block_copy_task_run(aio, task);
638 if (ret < 0) {
639 goto out;
643 out:
644 if (aio) {
645 aio_task_pool_wait_all(aio);
648 * We are not really interested in -ECANCELED returned from
649 * block_copy_task_run. If it fails, it means some task already failed
650 * for real reason, let's return first failure.
651 * Still, assert that we don't rewrite failure by success.
653 * Note: ret may be positive here because of block-status result.
655 assert(ret >= 0 || aio_task_pool_status(aio) < 0);
656 ret = aio_task_pool_status(aio);
658 aio_task_pool_free(aio);
661 return ret < 0 ? ret : found_dirty;
665 * block_copy_common
667 * Copy requested region, accordingly to dirty bitmap.
668 * Collaborate with parallel block_copy requests: if they succeed it will help
669 * us. If they fail, we will retry not-copied regions. So, if we return error,
670 * it means that some I/O operation failed in context of _this_ block_copy call,
671 * not some parallel operation.
673 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
675 int ret;
677 QLIST_INSERT_HEAD(&call_state->s->calls, call_state, list);
679 do {
680 ret = block_copy_dirty_clusters(call_state);
682 if (ret == 0) {
683 ret = block_copy_wait_one(call_state->s, call_state->offset,
684 call_state->bytes);
688 * We retry in two cases:
689 * 1. Some progress done
690 * Something was copied, which means that there were yield points
691 * and some new dirty bits may have appeared (due to failed parallel
692 * block-copy requests).
693 * 2. We have waited for some intersecting block-copy request
694 * It may have failed and produced new dirty bits.
696 } while (ret > 0);
698 call_state->finished = true;
700 if (call_state->cb) {
701 call_state->cb(call_state->cb_opaque);
704 QLIST_REMOVE(call_state, list);
706 return ret;
709 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
710 bool *error_is_read)
712 BlockCopyCallState call_state = {
713 .s = s,
714 .offset = start,
715 .bytes = bytes,
716 .max_workers = BLOCK_COPY_MAX_WORKERS,
719 int ret = block_copy_common(&call_state);
721 if (error_is_read && ret < 0) {
722 *error_is_read = call_state.error_is_read;
725 return ret;
728 static void coroutine_fn block_copy_async_co_entry(void *opaque)
730 block_copy_common(opaque);
733 BlockCopyCallState *block_copy_async(BlockCopyState *s,
734 int64_t offset, int64_t bytes,
735 int max_workers, int64_t max_chunk,
736 BlockCopyAsyncCallbackFunc cb,
737 void *cb_opaque)
739 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
741 *call_state = (BlockCopyCallState) {
742 .s = s,
743 .offset = offset,
744 .bytes = bytes,
745 .max_workers = max_workers,
746 .max_chunk = max_chunk,
747 .cb = cb,
748 .cb_opaque = cb_opaque,
750 .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
753 qemu_coroutine_enter(call_state->co);
755 return call_state;
758 void block_copy_call_free(BlockCopyCallState *call_state)
760 if (!call_state) {
761 return;
764 assert(call_state->finished);
765 g_free(call_state);
768 bool block_copy_call_finished(BlockCopyCallState *call_state)
770 return call_state->finished;
773 bool block_copy_call_succeeded(BlockCopyCallState *call_state)
775 return call_state->finished && call_state->ret == 0;
778 bool block_copy_call_failed(BlockCopyCallState *call_state)
780 return call_state->finished && call_state->ret < 0;
783 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
785 assert(call_state->finished);
786 if (error_is_read) {
787 *error_is_read = call_state->error_is_read;
789 return call_state->ret;
792 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
794 return s->copy_bitmap;
797 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
799 s->skip_unallocated = skip;