Merge tag 'v9.0.0-rc3'
[qemu/ar7.git] / block / fvd-store.c
blob0a615f8ae25c1d0bd8997235c8c18b96924b8c42
1 /*
2 * Copyright (c) 2010-2011 IBM
4 * Authors:
5 * Chunqiang Tang <ctang@us.ibm.com>
7 * This work is licensed under the terms of the GNU GPL, version 2.
8 * See the COPYING file in the top-level directory.
9 */
11 /*=============================================================================
12 * A short description: this FVD module implements storing data to a
13 * compact image.
14 *===========================================================================*/
16 static uint32_t allocate_chunk (BlockDriverState * bs);
17 static inline FvdAIOCB *init_store_acb (int soft_write,
18 QEMUIOVector * orig_qiov,
19 BlockDriverState * bs,
20 int64_t sector_num, int nb_sectors,
21 FvdAIOCB * parent_acb,
22 BlockDriverCompletionFunc * cb,
23 void *opaque);
24 static void finish_store_data_in_compact_image (void *opaque, int ret);
26 static inline BlockDriverAIOCB *store_data (int soft_write,
27 FvdAIOCB * parent_acb,
28 BlockDriverState * bs,
29 int64_t sector_num,
30 QEMUIOVector * orig_qiov,
31 int nb_sectors,
32 BlockDriverCompletionFunc * cb,
33 void *opaque)
35 BDRVFvdState *s = bs->opaque;
37 TRACE_STORE_IN_FVD ("store_data", sector_num, nb_sectors);
39 if (!s->table) {
40 /* Write directly since it is not a compact image. */
41 return bdrv_aio_writev (s->fvd_data, s->data_offset + sector_num,
42 orig_qiov, nb_sectors, cb, opaque);
43 } else {
44 return store_data_in_compact_image (NULL, soft_write, parent_acb, bs,
45 sector_num, orig_qiov, nb_sectors,
46 cb, opaque);
50 /* Store data in the compact image. The argument 'soft_write' means
51 * the store was caused by copy-on-read or prefetching, which need not
52 * update metadata immediately. */
53 static BlockDriverAIOCB *store_data_in_compact_image (FvdAIOCB * acb,
54 int soft_write,
55 FvdAIOCB * parent_acb,
56 BlockDriverState * bs,
57 int64_t sector_num,
58 QEMUIOVector * orig_qiov,
59 const int nb_sectors,
60 BlockDriverCompletionFunc
61 * cb, void *opaque)
63 BDRVFvdState *s = bs->opaque;
65 const uint32_t first_chunk = sector_num / s->chunk_size;
66 const uint32_t last_chunk = (sector_num + nb_sectors - 1) / s->chunk_size;
67 int table_dirty = FALSE;
68 uint32_t chunk;
69 int64_t start_sec;
71 /* Check if storag space is allocated. */
72 for (chunk = first_chunk; chunk <= last_chunk; chunk++) {
73 if (IS_EMPTY (s->table[chunk])) {
74 uint32_t id = allocate_chunk (bs);
75 if (IS_EMPTY (id)) {
76 return NULL;
78 id |= DIRTY_TABLE;
79 WRITE_TABLE (s->table[chunk], id);
81 table_dirty = TRUE;
82 } else if (IS_DIRTY (s->table[chunk])) {
83 /* This is possible if a previous soft-write allocated the storage
84 * space but did not flush the table entry change to the journal
85 * and hence did not clean the dirty bit. This is also possible
86 * with two concurrent hard-writes. The first hard-write allocated
87 * the storage space but has not flushed the table entry change to
88 * the journal yet and hence the table entry remains dirty. In
89 * this case, the second hard-write will also try to flush this
90 * dirty table entry to the journal. The outcome is correct since
91 * they store the same metadata change in the journal (although
92 * twice). For this race condition, we prefer to have two writes
93 * to the journal rather than introducing a locking mechanism,
94 * because this happens rarely and those two writes to the journal
95 * are likely to be merged by the kernel into a single write since
96 * they are likely to update back-to-back sectors in the journal.
97 * A locking mechanism would be less efficient, because the large
98 * size of chunks would cause unnecessary locking due to ``false
99 * sharing'' of a chunk by two writes. */
100 table_dirty = TRUE;
104 const int update_table = (!soft_write && table_dirty);
105 size_t iov_left;
106 uint8_t *iov_buf;
107 int nb, iov_index, nqiov, niov;
108 uint32_t prev;
110 if (first_chunk == last_chunk) {
111 goto handle_one_continuous_region;
114 /* Count the number of qiov and iov needed to cover the continuous regions
115 * of the compact image. */
116 iov_left = orig_qiov->iov[0].iov_len;
117 iov_buf = orig_qiov->iov[0].iov_base;
118 iov_index = 0;
119 nqiov = 0;
120 niov = 0;
121 prev = READ_TABLE (s->table[first_chunk]);
123 /* Data in the first chunk. */
124 nb = s->chunk_size - (sector_num % s->chunk_size);
126 for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
127 uint32_t current = READ_TABLE (s->table[chunk]);
128 int64_t data_size;
129 if (chunk < last_chunk) {
130 data_size = s->chunk_size;
131 } else {
132 data_size = (sector_num + nb_sectors) % s->chunk_size;
133 if (data_size == 0) {
134 data_size = s->chunk_size;
138 if (current == prev + 1) {
139 nb += data_size; /* Continue the previous region. */
140 } else {
141 /* Terminate the previous region. */
142 niov +=
143 count_iov (orig_qiov->iov, &iov_index, &iov_buf, &iov_left,
144 nb * 512);
145 nqiov++;
146 nb = data_size; /* Data in the new region. */
148 prev = current;
151 if (nqiov == 0) {
152 handle_one_continuous_region:
153 /* A simple case. All data can be written out in one qiov and no new
154 * chunks are allocated. */
155 start_sec = READ_TABLE (s->table[first_chunk]) * s->chunk_size +
156 (sector_num % s->chunk_size);
158 if (!update_table && !acb) {
159 if (parent_acb) {
160 QDEBUG ("STORE: acb%llu-%p "
161 "store_directly_without_table_update\n",
162 parent_acb->uuid, parent_acb);
164 return bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec,
165 orig_qiov, nb_sectors, cb, opaque);
168 if (!acb && !(acb = init_store_acb (soft_write, orig_qiov, bs,
169 sector_num, nb_sectors, parent_acb, cb, opaque))) {
170 return NULL;
173 QDEBUG ("STORE: acb%llu-%p store_directly sector_num=%" PRId64
174 " nb_sectors=%d\n", acb->uuid, acb, acb->sector_num,
175 acb->nb_sectors);
177 acb->store.update_table = update_table;
178 acb->store.num_children = 1;
179 acb->store.one_child.hd_acb =
180 bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, orig_qiov,
181 nb_sectors, finish_store_data_in_compact_image,
182 &acb->store.one_child);
183 if (acb->store.one_child.hd_acb) {
184 acb->store.one_child.acb = acb;
185 return &acb->common;
186 } else {
187 my_qemu_aio_unref (acb);
188 return NULL;
192 /* qiov for the last continuous region. */
193 niov += count_iov (orig_qiov->iov, &iov_index, &iov_buf,
194 &iov_left, nb * 512);
195 nqiov++;
196 ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
198 /* Need to submit multiple requests to the lower layer. */
199 if (!acb && !(acb = init_store_acb (soft_write, orig_qiov, bs, sector_num,
200 nb_sectors, parent_acb, cb, opaque))) {
201 return NULL;
203 acb->store.update_table = update_table;
204 acb->store.num_children = nqiov;
206 if (!parent_acb) {
207 QDEBUG ("STORE: acb%llu-%p start sector_num=%" PRId64
208 " nb_sectors=%d\n", acb->uuid, acb, acb->sector_num,
209 acb->nb_sectors);
212 /* Allocate memory and create multiple requests. */
213 const size_t metadata_size = nqiov * (sizeof (CompactChildCB) +
214 sizeof (QEMUIOVector))
215 + niov * sizeof (struct iovec);
216 acb->store.children = (CompactChildCB *) my_qemu_malloc (metadata_size);
217 QEMUIOVector *q = (QEMUIOVector *) (acb->store.children + nqiov);
218 struct iovec *v = (struct iovec *) (q + nqiov);
220 start_sec = READ_TABLE (s->table[first_chunk]) * s->chunk_size +
221 (sector_num % s->chunk_size);
222 nqiov = 0;
223 iov_index = 0;
224 iov_left = orig_qiov->iov[0].iov_len;
225 iov_buf = orig_qiov->iov[0].iov_base;
226 prev = READ_TABLE (s->table[first_chunk]);
228 /* Data in the first chunk. */
229 if (first_chunk == last_chunk) {
230 nb = nb_sectors;
232 else {
233 nb = s->chunk_size - (sector_num % s->chunk_size);
236 for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
237 uint32_t current = READ_TABLE (s->table[chunk]);
238 int64_t data_size;
239 if (chunk < last_chunk) {
240 data_size = s->chunk_size;
241 } else {
242 data_size = (sector_num + nb_sectors) % s->chunk_size;
243 if (data_size == 0) {
244 data_size = s->chunk_size;
248 if (current == prev + 1) {
249 nb += data_size; /* Continue the previous region. */
250 } else {
251 /* Terminate the previous continuous region. */
252 niov = setup_iov (orig_qiov->iov, v, &iov_index,
253 &iov_buf, &iov_left, nb * 512);
254 qemu_iovec_init_external (q, v, niov);
255 QDEBUG ("STORE: acb%llu-%p create_child %d sector_num=%" PRId64
256 " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov,
257 start_sec, q->size / 512, q->niov);
258 acb->store.children[nqiov].hd_acb =
259 bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, q,
260 q->size / 512,
261 finish_store_data_in_compact_image,
262 &acb->store.children[nqiov]);
263 if (!acb->store.children[nqiov].hd_acb) {
264 goto fail;
266 acb->store.children[nqiov].acb = acb;
267 v += niov;
268 q++;
269 nqiov++;
270 start_sec = current * s->chunk_size; /* Begin of the new region. */
271 nb = data_size; /* Data in the new region. */
273 prev = current;
276 /* Requst for the last chunk. */
277 niov = setup_iov (orig_qiov->iov, v, &iov_index, &iov_buf,
278 &iov_left, nb * 512);
279 ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
280 qemu_iovec_init_external (q, v, niov);
282 QDEBUG ("STORE: acb%llu-%p create_child_last %d sector_num=%" PRId64
283 " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov, start_sec,
284 q->size / 512, q->niov);
285 acb->store.children[nqiov].hd_acb =
286 bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, q,
287 q->size / 512, finish_store_data_in_compact_image,
288 &acb->store.children[nqiov]);
289 if (acb->store.children[nqiov].hd_acb) {
290 acb->store.children[nqiov].acb = acb;
291 return &acb->common;
294 int i;
295 fail:
296 QDEBUG ("STORE: acb%llu-%p failed\n", acb->uuid, acb);
297 for (i = 0; i < nqiov; i++) {
298 bdrv_aio_cancel (acb->store.children[i].hd_acb);
300 my_qemu_free (acb->store.children);
301 my_qemu_aio_unref (acb);
302 return NULL;
305 static uint32_t allocate_chunk (BlockDriverState * bs)
307 BDRVFvdState *s = bs->opaque;
309 /* Check if there is sufficient storage space. */
310 if (s->used_storage + s->chunk_size > s->data_storage) {
311 if (s->add_storage_cmd) {
312 if (system (s->add_storage_cmd)) {
313 fprintf (stderr, "Error in executing %s\n", s->add_storage_cmd);
315 } else {
316 /* If the image is stored on a file system, the image file size
317 * can be increased by bdrv_truncate. */
318 int64_t new_size = (s->data_offset + s->used_storage +
319 s->storage_grow_unit) * 512;
320 bdrv_truncate (s->fvd_data, new_size);
323 /* Check how much storage is available now. */
324 int64_t size = bdrv_getlength (s->fvd_data);
325 if (size < 0) {
326 fprintf (stderr, "Error in bdrv_getlength(%s)\n", bs->filename);
327 return EMPTY_TABLE;
329 s->data_storage = size / 512 - s->data_offset;
330 if (s->used_storage + s->chunk_size > s->data_storage) {
331 fprintf (stderr, "Could not allocate more storage space.\n");
332 return EMPTY_TABLE;
335 QDEBUG ("Increased storage to %" PRId64 " bytes.\n", size);
338 uint32_t allocated_chunk_id = s->used_storage / s->chunk_size;
339 s->used_storage += s->chunk_size;
340 return allocated_chunk_id;
343 static void finish_store_data_in_compact_image (void *opaque, int ret)
345 CompactChildCB *child = opaque;
346 FvdAIOCB *acb = child->acb;
348 /* Now fvd_store_compact_cancel(), if invoked, won't cancel this child
349 * request. */
350 child->hd_acb = NULL;
352 if (acb->store.ret == 0) {
353 acb->store.ret = ret;
354 } else {
355 QDEBUG ("STORE: acb%llu-%p store_child=%d total_children=%d error "
356 "ret=%d\n", acb->uuid, acb, acb->store.finished_children,
357 acb->store.num_children, ret);
360 acb->store.finished_children++;
361 if (acb->store.finished_children < acb->store.num_children) {
362 QDEBUG ("STORE: acb%llu-%p store_finished_children=%d "
363 "total_children=%d\n", acb->uuid, acb,
364 acb->store.finished_children, acb->store.num_children);
365 return;
368 /* All child requests finished. Free buffers. */
369 if (acb->store.children) {
370 my_qemu_free (acb->store.children);
371 acb->store.children = NULL;
374 if (acb->store.ret) { /* error */
375 QDEBUG ("STORE: acb%llu-%p "
376 "store_last_child_finished_with_error ret=%d\n",
377 acb->uuid, acb, acb->store.ret);
378 acb->common.cb (acb->common.opaque, acb->store.ret);
379 my_qemu_aio_unref (acb);
380 return;
383 if (!acb->store.update_table) {
384 QDEBUG ("STORE: acb%llu-%p "
385 "store_last_child_finished_without_table_update\n",
386 acb->uuid, acb);
387 acb->common.cb (acb->common.opaque, acb->store.ret);
388 my_qemu_aio_unref (acb);
389 return;
392 /* Check whether the table entries are still dirty. Note that while saving
393 * this write to disk, other writes might have already flushed the dirty
394 * table entries to the journal. If those table entries are no longer
395 * dirty, depending on the behavior of parent_acb, it might be able to
396 * skip a journal update. */
397 BlockDriverState *bs = acb->common.bs;
398 BDRVFvdState *s = bs->opaque;
399 uint32_t first_chunk = acb->sector_num / s->chunk_size;
400 const uint32_t last_chunk =
401 (acb->sector_num + acb->nb_sectors - 1) / s->chunk_size;
402 int update_table = FALSE;
403 uint32_t chunk;
404 for (chunk = first_chunk; chunk <= last_chunk; chunk++) {
405 if (IS_DIRTY (s->table[chunk])) {
406 update_table = TRUE;
407 break;
411 if (acb->store.parent_acb) {
412 /* Metadata update will be handled by the parent write. */
413 ASSERT (acb->store.parent_acb->type == OP_WRITE);
414 QDEBUG ("STORE: acb%llu-%p "
415 "store_last_child_finished_with_parent_do_table_update\n",
416 acb->uuid, acb);
417 acb->store.parent_acb->write.update_table = update_table;
418 acb->common.cb (acb->common.opaque, acb->store.ret);
419 my_qemu_aio_unref (acb);
420 return;
423 if (update_table) {
424 QDEBUG ("STORE: acb%llu-%p "
425 "store_last_child_finished_and_start_table_update\n",
426 acb->uuid, acb);
427 write_metadata_to_journal (acb);
428 } else {
429 QDEBUG ("STORE: acb%llu-%p "
430 "store_last_child_finished_without_table_update\n",
431 acb->uuid, acb);
432 acb->common.cb (acb->common.opaque, acb->store.ret);
433 my_qemu_aio_unref (acb);
437 static inline FvdAIOCB *init_store_acb (int soft_write,
438 QEMUIOVector * orig_qiov,
439 BlockDriverState * bs,
440 int64_t sector_num, int nb_sectors,
441 FvdAIOCB * parent_acb,
442 BlockDriverCompletionFunc * cb,
443 void *opaque)
445 FvdAIOCB *acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
446 if (!acb) {
447 return NULL;
449 acb->type = OP_STORE_COMPACT;
450 acb->sector_num = sector_num;
451 acb->nb_sectors = nb_sectors;
452 acb->store.soft_write = soft_write;
453 acb->store.orig_qiov = orig_qiov;
454 acb->store.parent_acb = parent_acb;
455 acb->store.finished_children = 0;
456 acb->store.num_children = 0;
457 acb->store.one_child.hd_acb = NULL;
458 acb->store.children = NULL;
459 acb->store.ret = 0;
460 acb->jcb.iov.iov_base = NULL;
461 acb->jcb.hd_acb = NULL;
462 acb->jcb.next_wait_for_journal.le_prev = NULL;
463 COPY_UUID (acb, parent_acb);
465 return acb;
468 #if 0
469 static void fvd_store_compact_cancel (FvdAIOCB * acb)
471 if (acb->store.children) {
472 int i;
473 for (i = 0; i < acb->store.num_children; i++) {
474 if (acb->store.children[i].hd_acb) {
475 bdrv_aio_cancel (acb->store.children[i].hd_acb);
478 my_qemu_free (acb->store.children);
480 if (acb->store.one_child.hd_acb) {
481 bdrv_aio_cancel (acb->store.one_child.hd_acb);
483 if (acb->jcb.hd_acb) {
484 bdrv_aio_cancel (acb->jcb.hd_acb);
485 free_journal_sectors (acb->common.bs->opaque);
487 if (acb->jcb.iov.iov_base != NULL) {
488 my_qemu_vfree (acb->jcb.iov.iov_base);
490 if (acb->jcb.next_wait_for_journal.le_prev) {
491 QLIST_REMOVE (acb, jcb.next_wait_for_journal);
494 my_qemu_aio_unref (acb);
496 #endif