migration: fix id leak regression
[qemu/ar7.git] / block / archipelago.c
blob2449cfc702b70317d072cdc3f701a50a0eaaad5e
1 /*
2 * QEMU Block driver for Archipelago
4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
9 */
12 * VM Image on Archipelago volume is specified like this:
14 * file.driver=archipelago,file.volume=<volumename>
15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16 * [,file.segment=<segment_name>]]
18 * or
20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21 * segment=<segment_name>]]
23 * 'archipelago' is the protocol.
25 * 'mport' is the port number on which mapperd is listening. This is optional
26 * and if not specified, QEMU will make Archipelago to use the default port.
28 * 'vport' is the port number on which vlmcd is listening. This is optional
29 * and if not specified, QEMU will make Archipelago to use the default port.
31 * 'segment' is the name of the shared memory segment Archipelago stack
32 * is using. This is optional and if not specified, QEMU will make Archipelago
33 * to use the default value, 'archipelago'.
35 * Examples:
37 * file.driver=archipelago,file.volume=my_vm_volume
38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40 * file.vport=1234
41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42 * file.vport=1234,file.segment=my_segment
44 * or
46 * file=archipelago:my_vm_volume
47 * file=archipelago:my_vm_volume/mport=123
48 * file=archipelago:my_vm_volume/mport=123:vport=1234
49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
53 #include "qemu/osdep.h"
54 #include "qemu/cutils.h"
55 #include "block/block_int.h"
56 #include "qemu/error-report.h"
57 #include "qemu/thread.h"
58 #include "qapi/qmp/qint.h"
59 #include "qapi/qmp/qstring.h"
60 #include "qapi/qmp/qjson.h"
61 #include "qemu/atomic.h"
63 #include <xseg/xseg.h>
64 #include <xseg/protocol.h>
66 #define MAX_REQUEST_SIZE 524288
68 #define ARCHIPELAGO_OPT_VOLUME "volume"
69 #define ARCHIPELAGO_OPT_SEGMENT "segment"
70 #define ARCHIPELAGO_OPT_MPORT "mport"
71 #define ARCHIPELAGO_OPT_VPORT "vport"
72 #define ARCHIPELAGO_DFL_MPORT 1001
73 #define ARCHIPELAGO_DFL_VPORT 501
75 #define archipelagolog(fmt, ...) \
76 do { \
77 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78 } while (0)
80 typedef enum {
81 ARCHIP_OP_READ,
82 ARCHIP_OP_WRITE,
83 ARCHIP_OP_FLUSH,
84 ARCHIP_OP_VOLINFO,
85 ARCHIP_OP_TRUNCATE,
86 } ARCHIPCmd;
88 typedef struct ArchipelagoAIOCB {
89 BlockAIOCB common;
90 struct BDRVArchipelagoState *s;
91 QEMUIOVector *qiov;
92 ARCHIPCmd cmd;
93 int status;
94 int64_t size;
95 int64_t ret;
96 } ArchipelagoAIOCB;
98 typedef struct BDRVArchipelagoState {
99 ArchipelagoAIOCB *event_acb;
100 char *volname;
101 char *segment_name;
102 uint64_t size;
103 /* Archipelago specific */
104 struct xseg *xseg;
105 struct xseg_port *port;
106 xport srcport;
107 xport sport;
108 xport mportno;
109 xport vportno;
110 QemuMutex archip_mutex;
111 QemuCond archip_cond;
112 bool is_signaled;
113 /* Request handler specific */
114 QemuThread request_th;
115 QemuCond request_cond;
116 QemuMutex request_mutex;
117 bool th_is_signaled;
118 bool stopping;
119 } BDRVArchipelagoState;
121 typedef struct ArchipelagoSegmentedRequest {
122 size_t count;
123 size_t total;
124 int ref;
125 int failed;
126 } ArchipelagoSegmentedRequest;
128 typedef struct AIORequestData {
129 const char *volname;
130 off_t offset;
131 size_t size;
132 uint64_t bufidx;
133 int ret;
134 int op;
135 ArchipelagoAIOCB *aio_cb;
136 ArchipelagoSegmentedRequest *segreq;
137 } AIORequestData;
139 static void qemu_archipelago_complete_aio(void *opaque);
141 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
143 if (xseg && (sport != srcport)) {
144 xseg_init_local_signal(xseg, srcport);
145 sport = srcport;
149 static void archipelago_finish_aiocb(AIORequestData *reqdata)
151 if (reqdata->aio_cb->ret != reqdata->segreq->total) {
152 reqdata->aio_cb->ret = -EIO;
153 } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
154 reqdata->aio_cb->ret = 0;
156 aio_bh_schedule_oneshot(
157 bdrv_get_aio_context(reqdata->aio_cb->common.bs),
158 qemu_archipelago_complete_aio, reqdata
162 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
163 struct xseg_request *expected_req)
165 struct xseg_request *req;
166 xseg_prepare_wait(xseg, srcport);
167 void *psd = xseg_get_signal_desc(xseg, port);
168 while (1) {
169 req = xseg_receive(xseg, srcport, X_NONBLOCK);
170 if (req) {
171 if (req != expected_req) {
172 archipelagolog("Unknown received request\n");
173 xseg_put_request(xseg, req, srcport);
174 } else if (!(req->state & XS_SERVED)) {
175 return -1;
176 } else {
177 break;
180 xseg_wait_signal(xseg, psd, 100000UL);
182 xseg_cancel_wait(xseg, srcport);
183 return 0;
186 static void xseg_request_handler(void *state)
188 BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
189 void *psd = xseg_get_signal_desc(s->xseg, s->port);
190 qemu_mutex_lock(&s->request_mutex);
192 while (!s->stopping) {
193 struct xseg_request *req;
194 void *data;
195 xseg_prepare_wait(s->xseg, s->srcport);
196 req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
197 if (req) {
198 AIORequestData *reqdata;
199 ArchipelagoSegmentedRequest *segreq;
200 xseg_get_req_data(s->xseg, req, (void **)&reqdata);
202 switch (reqdata->op) {
203 case ARCHIP_OP_READ:
204 data = xseg_get_data(s->xseg, req);
205 segreq = reqdata->segreq;
206 segreq->count += req->serviced;
208 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
209 data,
210 req->serviced);
212 xseg_put_request(s->xseg, req, s->srcport);
214 if (atomic_fetch_dec(&segreq->ref) == 1) {
215 if (!segreq->failed) {
216 reqdata->aio_cb->ret = segreq->count;
217 archipelago_finish_aiocb(reqdata);
218 g_free(segreq);
219 } else {
220 g_free(segreq);
221 g_free(reqdata);
223 } else {
224 g_free(reqdata);
226 break;
227 case ARCHIP_OP_WRITE:
228 case ARCHIP_OP_FLUSH:
229 segreq = reqdata->segreq;
230 segreq->count += req->serviced;
231 xseg_put_request(s->xseg, req, s->srcport);
233 if (atomic_fetch_dec(&segreq->ref) == 1) {
234 if (!segreq->failed) {
235 reqdata->aio_cb->ret = segreq->count;
236 archipelago_finish_aiocb(reqdata);
237 g_free(segreq);
238 } else {
239 g_free(segreq);
240 g_free(reqdata);
242 } else {
243 g_free(reqdata);
245 break;
246 case ARCHIP_OP_VOLINFO:
247 case ARCHIP_OP_TRUNCATE:
248 s->is_signaled = true;
249 qemu_cond_signal(&s->archip_cond);
250 break;
252 } else {
253 xseg_wait_signal(s->xseg, psd, 100000UL);
255 xseg_cancel_wait(s->xseg, s->srcport);
258 s->th_is_signaled = true;
259 qemu_cond_signal(&s->request_cond);
260 qemu_mutex_unlock(&s->request_mutex);
261 qemu_thread_exit(NULL);
264 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
266 if (xseg_initialize()) {
267 archipelagolog("Cannot initialize XSEG\n");
268 goto err_exit;
271 s->xseg = xseg_join("posix", s->segment_name,
272 "posixfd", NULL);
273 if (!s->xseg) {
274 archipelagolog("Cannot join XSEG shared memory segment\n");
275 goto err_exit;
277 s->port = xseg_bind_dynport(s->xseg);
278 s->srcport = s->port->portno;
279 init_local_signal(s->xseg, s->sport, s->srcport);
280 return 0;
282 err_exit:
283 return -1;
286 static int qemu_archipelago_init(BDRVArchipelagoState *s)
288 int ret;
290 ret = qemu_archipelago_xseg_init(s);
291 if (ret < 0) {
292 error_report("Cannot initialize XSEG. Aborting...");
293 goto err_exit;
296 qemu_cond_init(&s->archip_cond);
297 qemu_mutex_init(&s->archip_mutex);
298 qemu_cond_init(&s->request_cond);
299 qemu_mutex_init(&s->request_mutex);
300 s->th_is_signaled = false;
301 qemu_thread_create(&s->request_th, "xseg_io_th",
302 (void *) xseg_request_handler,
303 (void *) s, QEMU_THREAD_JOINABLE);
305 err_exit:
306 return ret;
309 static void qemu_archipelago_complete_aio(void *opaque)
311 AIORequestData *reqdata = (AIORequestData *) opaque;
312 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
314 aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
315 aio_cb->status = 0;
317 qemu_aio_unref(aio_cb);
318 g_free(reqdata);
321 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
323 const char *a;
324 char *endptr = NULL;
325 unsigned long port;
326 if (strstart(pstr, needle, &a)) {
327 if (strlen(a) > 0) {
328 port = strtoul(a, &endptr, 10);
329 if (strlen(endptr)) {
330 *aport = -2;
331 return;
333 *aport = (xport) port;
338 static void xseg_find_segment(char *pstr, const char *needle,
339 char **segment_name)
341 const char *a;
342 if (strstart(pstr, needle, &a)) {
343 if (strlen(a) > 0) {
344 *segment_name = g_strdup(a);
349 static void parse_filename_opts(const char *filename, Error **errp,
350 char **volume, char **segment_name,
351 xport *mport, xport *vport)
353 const char *start;
354 char *tokens[4], *ds;
355 int idx;
356 xport lmport = NoPort, lvport = NoPort;
358 strstart(filename, "archipelago:", &start);
360 ds = g_strdup(start);
361 tokens[0] = strtok(ds, "/");
362 tokens[1] = strtok(NULL, ":");
363 tokens[2] = strtok(NULL, ":");
364 tokens[3] = strtok(NULL, "\0");
366 if (!strlen(tokens[0])) {
367 error_setg(errp, "volume name must be specified first");
368 g_free(ds);
369 return;
372 for (idx = 1; idx < 4; idx++) {
373 if (tokens[idx] != NULL) {
374 if (strstart(tokens[idx], "mport=", NULL)) {
375 xseg_find_port(tokens[idx], "mport=", &lmport);
377 if (strstart(tokens[idx], "vport=", NULL)) {
378 xseg_find_port(tokens[idx], "vport=", &lvport);
380 if (strstart(tokens[idx], "segment=", NULL)) {
381 xseg_find_segment(tokens[idx], "segment=", segment_name);
386 if ((lmport == -2) || (lvport == -2)) {
387 error_setg(errp, "mport and/or vport must be set");
388 g_free(ds);
389 return;
391 *volume = g_strdup(tokens[0]);
392 *mport = lmport;
393 *vport = lvport;
394 g_free(ds);
397 static void archipelago_parse_filename(const char *filename, QDict *options,
398 Error **errp)
400 const char *start;
401 char *volume = NULL, *segment_name = NULL;
402 xport mport = NoPort, vport = NoPort;
404 if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
405 || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
406 || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
407 || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
408 error_setg(errp, "volume/mport/vport/segment and a file name may not"
409 " be specified at the same time");
410 return;
413 if (!strstart(filename, "archipelago:", &start)) {
414 error_setg(errp, "File name must start with 'archipelago:'");
415 return;
418 if (!strlen(start) || strstart(start, "/", NULL)) {
419 error_setg(errp, "volume name must be specified");
420 return;
423 parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
425 if (volume) {
426 qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
427 g_free(volume);
429 if (segment_name) {
430 qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
431 qstring_from_str(segment_name));
432 g_free(segment_name);
434 if (mport != NoPort) {
435 qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
437 if (vport != NoPort) {
438 qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
442 static QemuOptsList archipelago_runtime_opts = {
443 .name = "archipelago",
444 .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
445 .desc = {
447 .name = ARCHIPELAGO_OPT_VOLUME,
448 .type = QEMU_OPT_STRING,
449 .help = "Name of the volume image",
452 .name = ARCHIPELAGO_OPT_SEGMENT,
453 .type = QEMU_OPT_STRING,
454 .help = "Name of the Archipelago shared memory segment",
457 .name = ARCHIPELAGO_OPT_MPORT,
458 .type = QEMU_OPT_NUMBER,
459 .help = "Archipelago mapperd port number"
462 .name = ARCHIPELAGO_OPT_VPORT,
463 .type = QEMU_OPT_NUMBER,
464 .help = "Archipelago vlmcd port number"
467 { /* end of list */ }
471 static int qemu_archipelago_open(BlockDriverState *bs,
472 QDict *options,
473 int bdrv_flags,
474 Error **errp)
476 int ret = 0;
477 const char *volume, *segment_name;
478 QemuOpts *opts;
479 Error *local_err = NULL;
480 BDRVArchipelagoState *s = bs->opaque;
482 opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
483 qemu_opts_absorb_qdict(opts, options, &local_err);
484 if (local_err) {
485 error_propagate(errp, local_err);
486 ret = -EINVAL;
487 goto err_exit;
490 s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
491 ARCHIPELAGO_DFL_MPORT);
492 s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
493 ARCHIPELAGO_DFL_VPORT);
495 segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
496 if (segment_name == NULL) {
497 s->segment_name = g_strdup("archipelago");
498 } else {
499 s->segment_name = g_strdup(segment_name);
502 volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
503 if (volume == NULL) {
504 error_setg(errp, "archipelago block driver requires the 'volume'"
505 " option");
506 ret = -EINVAL;
507 goto err_exit;
509 s->volname = g_strdup(volume);
511 /* Initialize XSEG, join shared memory segment */
512 ret = qemu_archipelago_init(s);
513 if (ret < 0) {
514 error_setg(errp, "cannot initialize XSEG and join shared "
515 "memory segment");
516 goto err_exit;
519 qemu_opts_del(opts);
520 return 0;
522 err_exit:
523 g_free(s->volname);
524 g_free(s->segment_name);
525 qemu_opts_del(opts);
526 return ret;
529 static void qemu_archipelago_close(BlockDriverState *bs)
531 int r, targetlen;
532 char *target;
533 struct xseg_request *req;
534 BDRVArchipelagoState *s = bs->opaque;
536 s->stopping = true;
538 qemu_mutex_lock(&s->request_mutex);
539 while (!s->th_is_signaled) {
540 qemu_cond_wait(&s->request_cond,
541 &s->request_mutex);
543 qemu_mutex_unlock(&s->request_mutex);
544 qemu_thread_join(&s->request_th);
545 qemu_cond_destroy(&s->request_cond);
546 qemu_mutex_destroy(&s->request_mutex);
548 qemu_cond_destroy(&s->archip_cond);
549 qemu_mutex_destroy(&s->archip_mutex);
551 targetlen = strlen(s->volname);
552 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
553 if (!req) {
554 archipelagolog("Cannot get XSEG request\n");
555 goto err_exit;
557 r = xseg_prep_request(s->xseg, req, targetlen, 0);
558 if (r < 0) {
559 xseg_put_request(s->xseg, req, s->srcport);
560 archipelagolog("Cannot prepare XSEG close request\n");
561 goto err_exit;
564 target = xseg_get_target(s->xseg, req);
565 memcpy(target, s->volname, targetlen);
566 req->size = req->datalen;
567 req->offset = 0;
568 req->op = X_CLOSE;
570 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
571 if (p == NoPort) {
572 xseg_put_request(s->xseg, req, s->srcport);
573 archipelagolog("Cannot submit XSEG close request\n");
574 goto err_exit;
577 xseg_signal(s->xseg, p);
578 wait_reply(s->xseg, s->srcport, s->port, req);
580 xseg_put_request(s->xseg, req, s->srcport);
582 err_exit:
583 g_free(s->volname);
584 g_free(s->segment_name);
585 xseg_quit_local_signal(s->xseg, s->srcport);
586 xseg_leave_dynport(s->xseg, s->port);
587 xseg_leave(s->xseg);
590 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
591 char *segment_name,
592 uint64_t size, xport mportno,
593 xport vportno)
595 int ret, targetlen;
596 struct xseg *xseg = NULL;
597 struct xseg_request *req;
598 struct xseg_request_clone *xclone;
599 struct xseg_port *port;
600 xport srcport = NoPort, sport = NoPort;
601 char *target;
603 /* Try default values if none has been set */
604 if (mportno == (xport) -1) {
605 mportno = ARCHIPELAGO_DFL_MPORT;
608 if (vportno == (xport) -1) {
609 vportno = ARCHIPELAGO_DFL_VPORT;
612 if (xseg_initialize()) {
613 error_setg(errp, "Cannot initialize XSEG");
614 return -1;
617 xseg = xseg_join("posix", segment_name,
618 "posixfd", NULL);
620 if (!xseg) {
621 error_setg(errp, "Cannot join XSEG shared memory segment");
622 return -1;
625 port = xseg_bind_dynport(xseg);
626 srcport = port->portno;
627 init_local_signal(xseg, sport, srcport);
629 req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
630 if (!req) {
631 error_setg(errp, "Cannot get XSEG request");
632 return -1;
635 targetlen = strlen(volname);
636 ret = xseg_prep_request(xseg, req, targetlen,
637 sizeof(struct xseg_request_clone));
638 if (ret < 0) {
639 error_setg(errp, "Cannot prepare XSEG request");
640 goto err_exit;
643 target = xseg_get_target(xseg, req);
644 if (!target) {
645 error_setg(errp, "Cannot get XSEG target.");
646 goto err_exit;
648 memcpy(target, volname, targetlen);
649 xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
650 memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
651 xclone->targetlen = 0;
652 xclone->size = size;
653 req->offset = 0;
654 req->size = req->datalen;
655 req->op = X_CLONE;
657 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
658 if (p == NoPort) {
659 error_setg(errp, "Could not submit XSEG request");
660 goto err_exit;
662 xseg_signal(xseg, p);
664 ret = wait_reply(xseg, srcport, port, req);
665 if (ret < 0) {
666 error_setg(errp, "wait_reply() error.");
669 xseg_put_request(xseg, req, srcport);
670 xseg_quit_local_signal(xseg, srcport);
671 xseg_leave_dynport(xseg, port);
672 xseg_leave(xseg);
673 return ret;
675 err_exit:
676 xseg_put_request(xseg, req, srcport);
677 xseg_quit_local_signal(xseg, srcport);
678 xseg_leave_dynport(xseg, port);
679 xseg_leave(xseg);
680 return -1;
683 static int qemu_archipelago_create(const char *filename,
684 QemuOpts *options,
685 Error **errp)
687 int ret = 0;
688 uint64_t total_size = 0;
689 char *volname = NULL, *segment_name = NULL;
690 const char *start;
691 xport mport = NoPort, vport = NoPort;
693 if (!strstart(filename, "archipelago:", &start)) {
694 error_setg(errp, "File name must start with 'archipelago:'");
695 return -1;
698 if (!strlen(start) || strstart(start, "/", NULL)) {
699 error_setg(errp, "volume name must be specified");
700 return -1;
703 parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
704 &vport);
705 total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
706 BDRV_SECTOR_SIZE);
708 if (segment_name == NULL) {
709 segment_name = g_strdup("archipelago");
712 /* Create an Archipelago volume */
713 ret = qemu_archipelago_create_volume(errp, volname, segment_name,
714 total_size, mport,
715 vport);
717 g_free(volname);
718 g_free(segment_name);
719 return ret;
722 static const AIOCBInfo archipelago_aiocb_info = {
723 .aiocb_size = sizeof(ArchipelagoAIOCB),
726 static int archipelago_submit_request(BDRVArchipelagoState *s,
727 uint64_t bufidx,
728 size_t count,
729 off_t offset,
730 ArchipelagoAIOCB *aio_cb,
731 ArchipelagoSegmentedRequest *segreq,
732 int op)
734 int ret, targetlen;
735 char *target;
736 void *data = NULL;
737 struct xseg_request *req;
738 AIORequestData *reqdata = g_new(AIORequestData, 1);
740 targetlen = strlen(s->volname);
741 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
742 if (!req) {
743 archipelagolog("Cannot get XSEG request\n");
744 goto err_exit2;
746 ret = xseg_prep_request(s->xseg, req, targetlen, count);
747 if (ret < 0) {
748 archipelagolog("Cannot prepare XSEG request\n");
749 goto err_exit;
751 target = xseg_get_target(s->xseg, req);
752 if (!target) {
753 archipelagolog("Cannot get XSEG target\n");
754 goto err_exit;
756 memcpy(target, s->volname, targetlen);
757 req->size = count;
758 req->offset = offset;
760 switch (op) {
761 case ARCHIP_OP_READ:
762 req->op = X_READ;
763 break;
764 case ARCHIP_OP_WRITE:
765 req->op = X_WRITE;
766 break;
767 case ARCHIP_OP_FLUSH:
768 req->op = X_FLUSH;
769 break;
771 reqdata->volname = s->volname;
772 reqdata->offset = offset;
773 reqdata->size = count;
774 reqdata->bufidx = bufidx;
775 reqdata->aio_cb = aio_cb;
776 reqdata->segreq = segreq;
777 reqdata->op = op;
779 xseg_set_req_data(s->xseg, req, reqdata);
780 if (op == ARCHIP_OP_WRITE) {
781 data = xseg_get_data(s->xseg, req);
782 if (!data) {
783 archipelagolog("Cannot get XSEG data\n");
784 goto err_exit;
786 qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
789 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
790 if (p == NoPort) {
791 archipelagolog("Could not submit XSEG request\n");
792 goto err_exit;
794 xseg_signal(s->xseg, p);
795 return 0;
797 err_exit:
798 g_free(reqdata);
799 xseg_put_request(s->xseg, req, s->srcport);
800 return -EIO;
801 err_exit2:
802 g_free(reqdata);
803 return -EIO;
806 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
807 size_t count,
808 off_t offset,
809 ArchipelagoAIOCB *aio_cb,
810 int op)
812 int ret, segments_nr;
813 size_t pos = 0;
814 ArchipelagoSegmentedRequest *segreq;
816 segreq = g_new0(ArchipelagoSegmentedRequest, 1);
818 if (op == ARCHIP_OP_FLUSH) {
819 segments_nr = 1;
820 } else {
821 segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
822 ((count % MAX_REQUEST_SIZE) ? 1 : 0);
824 segreq->total = count;
825 atomic_mb_set(&segreq->ref, segments_nr);
827 while (segments_nr > 1) {
828 ret = archipelago_submit_request(s, pos,
829 MAX_REQUEST_SIZE,
830 offset + pos,
831 aio_cb, segreq, op);
833 if (ret < 0) {
834 goto err_exit;
836 count -= MAX_REQUEST_SIZE;
837 pos += MAX_REQUEST_SIZE;
838 segments_nr--;
840 ret = archipelago_submit_request(s, pos, count, offset + pos,
841 aio_cb, segreq, op);
843 if (ret < 0) {
844 goto err_exit;
846 return 0;
848 err_exit:
849 segreq->failed = 1;
850 if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
851 g_free(segreq);
853 return ret;
856 static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
857 int64_t sector_num,
858 QEMUIOVector *qiov,
859 int nb_sectors,
860 BlockCompletionFunc *cb,
861 void *opaque,
862 int op)
864 ArchipelagoAIOCB *aio_cb;
865 BDRVArchipelagoState *s = bs->opaque;
866 int64_t size, off;
867 int ret;
869 aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
870 aio_cb->cmd = op;
871 aio_cb->qiov = qiov;
873 aio_cb->ret = 0;
874 aio_cb->s = s;
875 aio_cb->status = -EINPROGRESS;
877 off = sector_num * BDRV_SECTOR_SIZE;
878 size = nb_sectors * BDRV_SECTOR_SIZE;
879 aio_cb->size = size;
881 ret = archipelago_aio_segmented_rw(s, size, off,
882 aio_cb, op);
883 if (ret < 0) {
884 goto err_exit;
886 return &aio_cb->common;
888 err_exit:
889 error_report("qemu_archipelago_aio_rw(): I/O Error");
890 qemu_aio_unref(aio_cb);
891 return NULL;
894 static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
895 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
896 BlockCompletionFunc *cb, void *opaque)
898 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
899 opaque, ARCHIP_OP_READ);
902 static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
903 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
904 BlockCompletionFunc *cb, void *opaque)
906 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
907 opaque, ARCHIP_OP_WRITE);
910 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
912 uint64_t size;
913 int ret, targetlen;
914 struct xseg_request *req;
915 struct xseg_reply_info *xinfo;
916 AIORequestData *reqdata = g_new(AIORequestData, 1);
918 const char *volname = s->volname;
919 targetlen = strlen(volname);
920 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
921 if (!req) {
922 archipelagolog("Cannot get XSEG request\n");
923 goto err_exit2;
925 ret = xseg_prep_request(s->xseg, req, targetlen,
926 sizeof(struct xseg_reply_info));
927 if (ret < 0) {
928 archipelagolog("Cannot prepare XSEG request\n");
929 goto err_exit;
931 char *target = xseg_get_target(s->xseg, req);
932 if (!target) {
933 archipelagolog("Cannot get XSEG target\n");
934 goto err_exit;
936 memcpy(target, volname, targetlen);
937 req->size = req->datalen;
938 req->offset = 0;
939 req->op = X_INFO;
941 reqdata->op = ARCHIP_OP_VOLINFO;
942 reqdata->volname = volname;
943 xseg_set_req_data(s->xseg, req, reqdata);
945 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
946 if (p == NoPort) {
947 archipelagolog("Cannot submit XSEG request\n");
948 goto err_exit;
950 xseg_signal(s->xseg, p);
951 qemu_mutex_lock(&s->archip_mutex);
952 while (!s->is_signaled) {
953 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
955 s->is_signaled = false;
956 qemu_mutex_unlock(&s->archip_mutex);
958 xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
959 size = xinfo->size;
960 xseg_put_request(s->xseg, req, s->srcport);
961 g_free(reqdata);
962 s->size = size;
963 return size;
965 err_exit:
966 xseg_put_request(s->xseg, req, s->srcport);
967 err_exit2:
968 g_free(reqdata);
969 return -EIO;
972 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
974 BDRVArchipelagoState *s = bs->opaque;
976 return archipelago_volume_info(s);
979 static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
981 int ret, targetlen;
982 struct xseg_request *req;
983 BDRVArchipelagoState *s = bs->opaque;
984 AIORequestData *reqdata = g_new(AIORequestData, 1);
986 const char *volname = s->volname;
987 targetlen = strlen(volname);
988 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
989 if (!req) {
990 archipelagolog("Cannot get XSEG request\n");
991 goto err_exit2;
994 ret = xseg_prep_request(s->xseg, req, targetlen, 0);
995 if (ret < 0) {
996 archipelagolog("Cannot prepare XSEG request\n");
997 goto err_exit;
999 char *target = xseg_get_target(s->xseg, req);
1000 if (!target) {
1001 archipelagolog("Cannot get XSEG target\n");
1002 goto err_exit;
1004 memcpy(target, volname, targetlen);
1005 req->offset = offset;
1006 req->op = X_TRUNCATE;
1008 reqdata->op = ARCHIP_OP_TRUNCATE;
1009 reqdata->volname = volname;
1011 xseg_set_req_data(s->xseg, req, reqdata);
1013 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1014 if (p == NoPort) {
1015 archipelagolog("Cannot submit XSEG request\n");
1016 goto err_exit;
1019 xseg_signal(s->xseg, p);
1020 qemu_mutex_lock(&s->archip_mutex);
1021 while (!s->is_signaled) {
1022 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1024 s->is_signaled = false;
1025 qemu_mutex_unlock(&s->archip_mutex);
1026 xseg_put_request(s->xseg, req, s->srcport);
1027 g_free(reqdata);
1028 return 0;
1030 err_exit:
1031 xseg_put_request(s->xseg, req, s->srcport);
1032 err_exit2:
1033 g_free(reqdata);
1034 return -EIO;
1037 static QemuOptsList qemu_archipelago_create_opts = {
1038 .name = "archipelago-create-opts",
1039 .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1040 .desc = {
1042 .name = BLOCK_OPT_SIZE,
1043 .type = QEMU_OPT_SIZE,
1044 .help = "Virtual disk size"
1046 { /* end of list */ }
1050 static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1051 BlockCompletionFunc *cb, void *opaque)
1053 return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1054 ARCHIP_OP_FLUSH);
1057 static BlockDriver bdrv_archipelago = {
1058 .format_name = "archipelago",
1059 .protocol_name = "archipelago",
1060 .instance_size = sizeof(BDRVArchipelagoState),
1061 .bdrv_parse_filename = archipelago_parse_filename,
1062 .bdrv_file_open = qemu_archipelago_open,
1063 .bdrv_close = qemu_archipelago_close,
1064 .bdrv_create = qemu_archipelago_create,
1065 .bdrv_getlength = qemu_archipelago_getlength,
1066 .bdrv_truncate = qemu_archipelago_truncate,
1067 .bdrv_aio_readv = qemu_archipelago_aio_readv,
1068 .bdrv_aio_writev = qemu_archipelago_aio_writev,
1069 .bdrv_aio_flush = qemu_archipelago_aio_flush,
1070 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1071 .create_opts = &qemu_archipelago_create_opts,
1074 static void bdrv_archipelago_init(void)
1076 bdrv_register(&bdrv_archipelago);
1079 block_init(bdrv_archipelago_init);