Merge remote-tracking branch 'remotes/kvm/tags/for-upstream' into staging
[qemu/cris-port.git] / block / archipelago.c
blob34f72dc5a562c31f437c55d01aa52ceb96a39c1b
1 /*
2 * QEMU Block driver for Archipelago
4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
9 */
12 * VM Image on Archipelago volume is specified like this:
14 * file.driver=archipelago,file.volume=<volumename>
15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16 * [,file.segment=<segment_name>]]
18 * or
20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21 * segment=<segment_name>]]
23 * 'archipelago' is the protocol.
25 * 'mport' is the port number on which mapperd is listening. This is optional
26 * and if not specified, QEMU will make Archipelago to use the default port.
28 * 'vport' is the port number on which vlmcd is listening. This is optional
29 * and if not specified, QEMU will make Archipelago to use the default port.
31 * 'segment' is the name of the shared memory segment Archipelago stack
32 * is using. This is optional and if not specified, QEMU will make Archipelago
33 * to use the default value, 'archipelago'.
35 * Examples:
37 * file.driver=archipelago,file.volume=my_vm_volume
38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40 * file.vport=1234
41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42 * file.vport=1234,file.segment=my_segment
44 * or
46 * file=archipelago:my_vm_volume
47 * file=archipelago:my_vm_volume/mport=123
48 * file=archipelago:my_vm_volume/mport=123:vport=1234
49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
53 #include "qemu-common.h"
54 #include "block/block_int.h"
55 #include "qemu/error-report.h"
56 #include "qemu/thread.h"
57 #include "qapi/qmp/qint.h"
58 #include "qapi/qmp/qstring.h"
59 #include "qapi/qmp/qjson.h"
61 #include <inttypes.h>
62 #include <xseg/xseg.h>
63 #include <xseg/protocol.h>
65 #define ARCHIP_FD_READ 0
66 #define ARCHIP_FD_WRITE 1
67 #define MAX_REQUEST_SIZE 524288
69 #define ARCHIPELAGO_OPT_VOLUME "volume"
70 #define ARCHIPELAGO_OPT_SEGMENT "segment"
71 #define ARCHIPELAGO_OPT_MPORT "mport"
72 #define ARCHIPELAGO_OPT_VPORT "vport"
73 #define ARCHIPELAGO_DFL_MPORT 1001
74 #define ARCHIPELAGO_DFL_VPORT 501
76 #define archipelagolog(fmt, ...) \
77 do { \
78 fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
79 } while (0)
81 typedef enum {
82 ARCHIP_OP_READ,
83 ARCHIP_OP_WRITE,
84 ARCHIP_OP_FLUSH,
85 ARCHIP_OP_VOLINFO,
86 } ARCHIPCmd;
88 typedef struct ArchipelagoAIOCB {
89 BlockDriverAIOCB common;
90 QEMUBH *bh;
91 struct BDRVArchipelagoState *s;
92 QEMUIOVector *qiov;
93 ARCHIPCmd cmd;
94 bool cancelled;
95 int status;
96 int64_t size;
97 int64_t ret;
98 } ArchipelagoAIOCB;
100 typedef struct BDRVArchipelagoState {
101 ArchipelagoAIOCB *event_acb;
102 char *volname;
103 char *segment_name;
104 uint64_t size;
105 /* Archipelago specific */
106 struct xseg *xseg;
107 struct xseg_port *port;
108 xport srcport;
109 xport sport;
110 xport mportno;
111 xport vportno;
112 QemuMutex archip_mutex;
113 QemuCond archip_cond;
114 bool is_signaled;
115 /* Request handler specific */
116 QemuThread request_th;
117 QemuCond request_cond;
118 QemuMutex request_mutex;
119 bool th_is_signaled;
120 bool stopping;
121 } BDRVArchipelagoState;
123 typedef struct ArchipelagoSegmentedRequest {
124 size_t count;
125 size_t total;
126 int ref;
127 int failed;
128 } ArchipelagoSegmentedRequest;
130 typedef struct AIORequestData {
131 const char *volname;
132 off_t offset;
133 size_t size;
134 uint64_t bufidx;
135 int ret;
136 int op;
137 ArchipelagoAIOCB *aio_cb;
138 ArchipelagoSegmentedRequest *segreq;
139 } AIORequestData;
141 static void qemu_archipelago_complete_aio(void *opaque);
143 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
145 if (xseg && (sport != srcport)) {
146 xseg_init_local_signal(xseg, srcport);
147 sport = srcport;
151 static void archipelago_finish_aiocb(AIORequestData *reqdata)
153 if (reqdata->aio_cb->ret != reqdata->segreq->total) {
154 reqdata->aio_cb->ret = -EIO;
155 } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
156 reqdata->aio_cb->ret = 0;
158 reqdata->aio_cb->bh = aio_bh_new(
159 bdrv_get_aio_context(reqdata->aio_cb->common.bs),
160 qemu_archipelago_complete_aio, reqdata
162 qemu_bh_schedule(reqdata->aio_cb->bh);
165 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
166 struct xseg_request *expected_req)
168 struct xseg_request *req;
169 xseg_prepare_wait(xseg, srcport);
170 void *psd = xseg_get_signal_desc(xseg, port);
171 while (1) {
172 req = xseg_receive(xseg, srcport, X_NONBLOCK);
173 if (req) {
174 if (req != expected_req) {
175 archipelagolog("Unknown received request\n");
176 xseg_put_request(xseg, req, srcport);
177 } else if (!(req->state & XS_SERVED)) {
178 return -1;
179 } else {
180 break;
183 xseg_wait_signal(xseg, psd, 100000UL);
185 xseg_cancel_wait(xseg, srcport);
186 return 0;
189 static void xseg_request_handler(void *state)
191 BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
192 void *psd = xseg_get_signal_desc(s->xseg, s->port);
193 qemu_mutex_lock(&s->request_mutex);
195 while (!s->stopping) {
196 struct xseg_request *req;
197 void *data;
198 xseg_prepare_wait(s->xseg, s->srcport);
199 req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
200 if (req) {
201 AIORequestData *reqdata;
202 ArchipelagoSegmentedRequest *segreq;
203 xseg_get_req_data(s->xseg, req, (void **)&reqdata);
205 switch (reqdata->op) {
206 case ARCHIP_OP_READ:
207 data = xseg_get_data(s->xseg, req);
208 segreq = reqdata->segreq;
209 segreq->count += req->serviced;
211 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
212 data,
213 req->serviced);
215 xseg_put_request(s->xseg, req, s->srcport);
217 if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
218 if (!segreq->failed) {
219 reqdata->aio_cb->ret = segreq->count;
220 archipelago_finish_aiocb(reqdata);
221 g_free(segreq);
222 } else {
223 g_free(segreq);
224 g_free(reqdata);
226 } else {
227 g_free(reqdata);
229 break;
230 case ARCHIP_OP_WRITE:
231 case ARCHIP_OP_FLUSH:
232 segreq = reqdata->segreq;
233 segreq->count += req->serviced;
234 xseg_put_request(s->xseg, req, s->srcport);
236 if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
237 if (!segreq->failed) {
238 reqdata->aio_cb->ret = segreq->count;
239 archipelago_finish_aiocb(reqdata);
240 g_free(segreq);
241 } else {
242 g_free(segreq);
243 g_free(reqdata);
245 } else {
246 g_free(reqdata);
248 break;
249 case ARCHIP_OP_VOLINFO:
250 s->is_signaled = true;
251 qemu_cond_signal(&s->archip_cond);
252 break;
254 } else {
255 xseg_wait_signal(s->xseg, psd, 100000UL);
257 xseg_cancel_wait(s->xseg, s->srcport);
260 s->th_is_signaled = true;
261 qemu_cond_signal(&s->request_cond);
262 qemu_mutex_unlock(&s->request_mutex);
263 qemu_thread_exit(NULL);
266 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
268 if (xseg_initialize()) {
269 archipelagolog("Cannot initialize XSEG\n");
270 goto err_exit;
273 s->xseg = xseg_join("posix", s->segment_name,
274 "posixfd", NULL);
275 if (!s->xseg) {
276 archipelagolog("Cannot join XSEG shared memory segment\n");
277 goto err_exit;
279 s->port = xseg_bind_dynport(s->xseg);
280 s->srcport = s->port->portno;
281 init_local_signal(s->xseg, s->sport, s->srcport);
282 return 0;
284 err_exit:
285 return -1;
288 static int qemu_archipelago_init(BDRVArchipelagoState *s)
290 int ret;
292 ret = qemu_archipelago_xseg_init(s);
293 if (ret < 0) {
294 error_report("Cannot initialize XSEG. Aborting...\n");
295 goto err_exit;
298 qemu_cond_init(&s->archip_cond);
299 qemu_mutex_init(&s->archip_mutex);
300 qemu_cond_init(&s->request_cond);
301 qemu_mutex_init(&s->request_mutex);
302 s->th_is_signaled = false;
303 qemu_thread_create(&s->request_th, "xseg_io_th",
304 (void *) xseg_request_handler,
305 (void *) s, QEMU_THREAD_JOINABLE);
307 err_exit:
308 return ret;
311 static void qemu_archipelago_complete_aio(void *opaque)
313 AIORequestData *reqdata = (AIORequestData *) opaque;
314 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
316 qemu_bh_delete(aio_cb->bh);
317 aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
318 aio_cb->status = 0;
320 if (!aio_cb->cancelled) {
321 qemu_aio_release(aio_cb);
323 g_free(reqdata);
326 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
328 const char *a;
329 char *endptr = NULL;
330 unsigned long port;
331 if (strstart(pstr, needle, &a)) {
332 if (strlen(a) > 0) {
333 port = strtoul(a, &endptr, 10);
334 if (strlen(endptr)) {
335 *aport = -2;
336 return;
338 *aport = (xport) port;
343 static void xseg_find_segment(char *pstr, const char *needle,
344 char **segment_name)
346 const char *a;
347 if (strstart(pstr, needle, &a)) {
348 if (strlen(a) > 0) {
349 *segment_name = g_strdup(a);
354 static void parse_filename_opts(const char *filename, Error **errp,
355 char **volume, char **segment_name,
356 xport *mport, xport *vport)
358 const char *start;
359 char *tokens[4], *ds;
360 int idx;
361 xport lmport = NoPort, lvport = NoPort;
363 strstart(filename, "archipelago:", &start);
365 ds = g_strdup(start);
366 tokens[0] = strtok(ds, "/");
367 tokens[1] = strtok(NULL, ":");
368 tokens[2] = strtok(NULL, ":");
369 tokens[3] = strtok(NULL, "\0");
371 if (!strlen(tokens[0])) {
372 error_setg(errp, "volume name must be specified first");
373 g_free(ds);
374 return;
377 for (idx = 1; idx < 4; idx++) {
378 if (tokens[idx] != NULL) {
379 if (strstart(tokens[idx], "mport=", NULL)) {
380 xseg_find_port(tokens[idx], "mport=", &lmport);
382 if (strstart(tokens[idx], "vport=", NULL)) {
383 xseg_find_port(tokens[idx], "vport=", &lvport);
385 if (strstart(tokens[idx], "segment=", NULL)) {
386 xseg_find_segment(tokens[idx], "segment=", segment_name);
391 if ((lmport == -2) || (lvport == -2)) {
392 error_setg(errp, "mport and/or vport must be set");
393 g_free(ds);
394 return;
396 *volume = g_strdup(tokens[0]);
397 *mport = lmport;
398 *vport = lvport;
399 g_free(ds);
402 static void archipelago_parse_filename(const char *filename, QDict *options,
403 Error **errp)
405 const char *start;
406 char *volume = NULL, *segment_name = NULL;
407 xport mport = NoPort, vport = NoPort;
409 if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
410 || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
411 || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
412 || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
413 error_setg(errp, "volume/mport/vport/segment and a file name may not"
414 " be specified at the same time");
415 return;
418 if (!strstart(filename, "archipelago:", &start)) {
419 error_setg(errp, "File name must start with 'archipelago:'");
420 return;
423 if (!strlen(start) || strstart(start, "/", NULL)) {
424 error_setg(errp, "volume name must be specified");
425 return;
428 parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
430 if (volume) {
431 qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
432 g_free(volume);
434 if (segment_name) {
435 qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
436 qstring_from_str(segment_name));
437 g_free(segment_name);
439 if (mport != NoPort) {
440 qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
442 if (vport != NoPort) {
443 qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
447 static QemuOptsList archipelago_runtime_opts = {
448 .name = "archipelago",
449 .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
450 .desc = {
452 .name = ARCHIPELAGO_OPT_VOLUME,
453 .type = QEMU_OPT_STRING,
454 .help = "Name of the volume image",
457 .name = ARCHIPELAGO_OPT_SEGMENT,
458 .type = QEMU_OPT_STRING,
459 .help = "Name of the Archipelago shared memory segment",
462 .name = ARCHIPELAGO_OPT_MPORT,
463 .type = QEMU_OPT_NUMBER,
464 .help = "Archipelago mapperd port number"
467 .name = ARCHIPELAGO_OPT_VPORT,
468 .type = QEMU_OPT_NUMBER,
469 .help = "Archipelago vlmcd port number"
472 { /* end of list */ }
476 static int qemu_archipelago_open(BlockDriverState *bs,
477 QDict *options,
478 int bdrv_flags,
479 Error **errp)
481 int ret = 0;
482 const char *volume, *segment_name;
483 QemuOpts *opts;
484 Error *local_err = NULL;
485 BDRVArchipelagoState *s = bs->opaque;
487 opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
488 qemu_opts_absorb_qdict(opts, options, &local_err);
489 if (local_err) {
490 error_propagate(errp, local_err);
491 ret = -EINVAL;
492 goto err_exit;
495 s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
496 ARCHIPELAGO_DFL_MPORT);
497 s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
498 ARCHIPELAGO_DFL_VPORT);
500 segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
501 if (segment_name == NULL) {
502 s->segment_name = g_strdup("archipelago");
503 } else {
504 s->segment_name = g_strdup(segment_name);
507 volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
508 if (volume == NULL) {
509 error_setg(errp, "archipelago block driver requires the 'volume'"
510 " option");
511 ret = -EINVAL;
512 goto err_exit;
514 s->volname = g_strdup(volume);
516 /* Initialize XSEG, join shared memory segment */
517 ret = qemu_archipelago_init(s);
518 if (ret < 0) {
519 error_setg(errp, "cannot initialize XSEG and join shared "
520 "memory segment");
521 goto err_exit;
524 qemu_opts_del(opts);
525 return 0;
527 err_exit:
528 g_free(s->volname);
529 g_free(s->segment_name);
530 qemu_opts_del(opts);
531 return ret;
534 static void qemu_archipelago_close(BlockDriverState *bs)
536 int r, targetlen;
537 char *target;
538 struct xseg_request *req;
539 BDRVArchipelagoState *s = bs->opaque;
541 s->stopping = true;
543 qemu_mutex_lock(&s->request_mutex);
544 while (!s->th_is_signaled) {
545 qemu_cond_wait(&s->request_cond,
546 &s->request_mutex);
548 qemu_mutex_unlock(&s->request_mutex);
549 qemu_thread_join(&s->request_th);
550 qemu_cond_destroy(&s->request_cond);
551 qemu_mutex_destroy(&s->request_mutex);
553 qemu_cond_destroy(&s->archip_cond);
554 qemu_mutex_destroy(&s->archip_mutex);
556 targetlen = strlen(s->volname);
557 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
558 if (!req) {
559 archipelagolog("Cannot get XSEG request\n");
560 goto err_exit;
562 r = xseg_prep_request(s->xseg, req, targetlen, 0);
563 if (r < 0) {
564 xseg_put_request(s->xseg, req, s->srcport);
565 archipelagolog("Cannot prepare XSEG close request\n");
566 goto err_exit;
569 target = xseg_get_target(s->xseg, req);
570 memcpy(target, s->volname, targetlen);
571 req->size = req->datalen;
572 req->offset = 0;
573 req->op = X_CLOSE;
575 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
576 if (p == NoPort) {
577 xseg_put_request(s->xseg, req, s->srcport);
578 archipelagolog("Cannot submit XSEG close request\n");
579 goto err_exit;
582 xseg_signal(s->xseg, p);
583 wait_reply(s->xseg, s->srcport, s->port, req);
585 xseg_put_request(s->xseg, req, s->srcport);
587 err_exit:
588 g_free(s->volname);
589 g_free(s->segment_name);
590 xseg_quit_local_signal(s->xseg, s->srcport);
591 xseg_leave_dynport(s->xseg, s->port);
592 xseg_leave(s->xseg);
595 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
596 char *segment_name,
597 uint64_t size, xport mportno,
598 xport vportno)
600 int ret, targetlen;
601 struct xseg *xseg = NULL;
602 struct xseg_request *req;
603 struct xseg_request_clone *xclone;
604 struct xseg_port *port;
605 xport srcport = NoPort, sport = NoPort;
606 char *target;
608 /* Try default values if none has been set */
609 if (mportno == (xport) -1) {
610 mportno = ARCHIPELAGO_DFL_MPORT;
613 if (vportno == (xport) -1) {
614 vportno = ARCHIPELAGO_DFL_VPORT;
617 if (xseg_initialize()) {
618 error_setg(errp, "Cannot initialize XSEG");
619 return -1;
622 xseg = xseg_join("posix", segment_name,
623 "posixfd", NULL);
625 if (!xseg) {
626 error_setg(errp, "Cannot join XSEG shared memory segment");
627 return -1;
630 port = xseg_bind_dynport(xseg);
631 srcport = port->portno;
632 init_local_signal(xseg, sport, srcport);
634 req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
635 if (!req) {
636 error_setg(errp, "Cannot get XSEG request");
637 return -1;
640 targetlen = strlen(volname);
641 ret = xseg_prep_request(xseg, req, targetlen,
642 sizeof(struct xseg_request_clone));
643 if (ret < 0) {
644 error_setg(errp, "Cannot prepare XSEG request");
645 goto err_exit;
648 target = xseg_get_target(xseg, req);
649 if (!target) {
650 error_setg(errp, "Cannot get XSEG target.\n");
651 goto err_exit;
653 memcpy(target, volname, targetlen);
654 xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
655 memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
656 xclone->targetlen = 0;
657 xclone->size = size;
658 req->offset = 0;
659 req->size = req->datalen;
660 req->op = X_CLONE;
662 xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
663 if (p == NoPort) {
664 error_setg(errp, "Could not submit XSEG request");
665 goto err_exit;
667 xseg_signal(xseg, p);
669 ret = wait_reply(xseg, srcport, port, req);
670 if (ret < 0) {
671 error_setg(errp, "wait_reply() error.");
674 xseg_put_request(xseg, req, srcport);
675 xseg_quit_local_signal(xseg, srcport);
676 xseg_leave_dynport(xseg, port);
677 xseg_leave(xseg);
678 return ret;
680 err_exit:
681 xseg_put_request(xseg, req, srcport);
682 xseg_quit_local_signal(xseg, srcport);
683 xseg_leave_dynport(xseg, port);
684 xseg_leave(xseg);
685 return -1;
688 static int qemu_archipelago_create(const char *filename,
689 QemuOpts *options,
690 Error **errp)
692 int ret = 0;
693 uint64_t total_size = 0;
694 char *volname = NULL, *segment_name = NULL;
695 const char *start;
696 xport mport = NoPort, vport = NoPort;
698 if (!strstart(filename, "archipelago:", &start)) {
699 error_setg(errp, "File name must start with 'archipelago:'");
700 return -1;
703 if (!strlen(start) || strstart(start, "/", NULL)) {
704 error_setg(errp, "volume name must be specified");
705 return -1;
708 parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
709 &vport);
710 total_size = qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0);
712 if (segment_name == NULL) {
713 segment_name = g_strdup("archipelago");
716 /* Create an Archipelago volume */
717 ret = qemu_archipelago_create_volume(errp, volname, segment_name,
718 total_size, mport,
719 vport);
721 g_free(volname);
722 g_free(segment_name);
723 return ret;
726 static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
728 ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
729 aio_cb->cancelled = true;
730 while (aio_cb->status == -EINPROGRESS) {
731 aio_poll(bdrv_get_aio_context(aio_cb->common.bs), true);
733 qemu_aio_release(aio_cb);
736 static const AIOCBInfo archipelago_aiocb_info = {
737 .aiocb_size = sizeof(ArchipelagoAIOCB),
738 .cancel = qemu_archipelago_aio_cancel,
741 static int archipelago_submit_request(BDRVArchipelagoState *s,
742 uint64_t bufidx,
743 size_t count,
744 off_t offset,
745 ArchipelagoAIOCB *aio_cb,
746 ArchipelagoSegmentedRequest *segreq,
747 int op)
749 int ret, targetlen;
750 char *target;
751 void *data = NULL;
752 struct xseg_request *req;
753 AIORequestData *reqdata = g_new(AIORequestData, 1);
755 targetlen = strlen(s->volname);
756 req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
757 if (!req) {
758 archipelagolog("Cannot get XSEG request\n");
759 goto err_exit2;
761 ret = xseg_prep_request(s->xseg, req, targetlen, count);
762 if (ret < 0) {
763 archipelagolog("Cannot prepare XSEG request\n");
764 goto err_exit;
766 target = xseg_get_target(s->xseg, req);
767 if (!target) {
768 archipelagolog("Cannot get XSEG target\n");
769 goto err_exit;
771 memcpy(target, s->volname, targetlen);
772 req->size = count;
773 req->offset = offset;
775 switch (op) {
776 case ARCHIP_OP_READ:
777 req->op = X_READ;
778 break;
779 case ARCHIP_OP_WRITE:
780 req->op = X_WRITE;
781 break;
782 case ARCHIP_OP_FLUSH:
783 req->op = X_FLUSH;
784 break;
786 reqdata->volname = s->volname;
787 reqdata->offset = offset;
788 reqdata->size = count;
789 reqdata->bufidx = bufidx;
790 reqdata->aio_cb = aio_cb;
791 reqdata->segreq = segreq;
792 reqdata->op = op;
794 xseg_set_req_data(s->xseg, req, reqdata);
795 if (op == ARCHIP_OP_WRITE) {
796 data = xseg_get_data(s->xseg, req);
797 if (!data) {
798 archipelagolog("Cannot get XSEG data\n");
799 goto err_exit;
801 qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
804 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
805 if (p == NoPort) {
806 archipelagolog("Could not submit XSEG request\n");
807 goto err_exit;
809 xseg_signal(s->xseg, p);
810 return 0;
812 err_exit:
813 g_free(reqdata);
814 xseg_put_request(s->xseg, req, s->srcport);
815 return -EIO;
816 err_exit2:
817 g_free(reqdata);
818 return -EIO;
821 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
822 size_t count,
823 off_t offset,
824 ArchipelagoAIOCB *aio_cb,
825 int op)
827 int i, ret, segments_nr, last_segment_size;
828 ArchipelagoSegmentedRequest *segreq;
830 segreq = g_new(ArchipelagoSegmentedRequest, 1);
832 if (op == ARCHIP_OP_FLUSH) {
833 segments_nr = 1;
834 segreq->ref = segments_nr;
835 segreq->total = count;
836 segreq->count = 0;
837 segreq->failed = 0;
838 ret = archipelago_submit_request(s, 0, count, offset, aio_cb,
839 segreq, ARCHIP_OP_FLUSH);
840 if (ret < 0) {
841 goto err_exit;
843 return 0;
846 segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
847 ((count % MAX_REQUEST_SIZE) ? 1 : 0);
848 last_segment_size = (int)(count % MAX_REQUEST_SIZE);
850 segreq->ref = segments_nr;
851 segreq->total = count;
852 segreq->count = 0;
853 segreq->failed = 0;
855 for (i = 0; i < segments_nr - 1; i++) {
856 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
857 MAX_REQUEST_SIZE,
858 offset + i * MAX_REQUEST_SIZE,
859 aio_cb, segreq, op);
861 if (ret < 0) {
862 goto err_exit;
866 if ((segments_nr > 1) && last_segment_size) {
867 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
868 last_segment_size,
869 offset + i * MAX_REQUEST_SIZE,
870 aio_cb, segreq, op);
871 } else if ((segments_nr > 1) && !last_segment_size) {
872 ret = archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
873 MAX_REQUEST_SIZE,
874 offset + i * MAX_REQUEST_SIZE,
875 aio_cb, segreq, op);
876 } else if (segments_nr == 1) {
877 ret = archipelago_submit_request(s, 0, count, offset, aio_cb,
878 segreq, op);
881 if (ret < 0) {
882 goto err_exit;
885 return 0;
887 err_exit:
888 __sync_add_and_fetch(&segreq->failed, 1);
889 if (segments_nr == 1) {
890 if (__sync_add_and_fetch(&segreq->ref, -1) == 0) {
891 g_free(segreq);
893 } else {
894 if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) == 0) {
895 g_free(segreq);
899 return ret;
902 static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
903 int64_t sector_num,
904 QEMUIOVector *qiov,
905 int nb_sectors,
906 BlockDriverCompletionFunc *cb,
907 void *opaque,
908 int op)
910 ArchipelagoAIOCB *aio_cb;
911 BDRVArchipelagoState *s = bs->opaque;
912 int64_t size, off;
913 int ret;
915 aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
916 aio_cb->cmd = op;
917 aio_cb->qiov = qiov;
919 aio_cb->ret = 0;
920 aio_cb->s = s;
921 aio_cb->cancelled = false;
922 aio_cb->status = -EINPROGRESS;
924 off = sector_num * BDRV_SECTOR_SIZE;
925 size = nb_sectors * BDRV_SECTOR_SIZE;
926 aio_cb->size = size;
928 ret = archipelago_aio_segmented_rw(s, size, off,
929 aio_cb, op);
930 if (ret < 0) {
931 goto err_exit;
933 return &aio_cb->common;
935 err_exit:
936 error_report("qemu_archipelago_aio_rw(): I/O Error\n");
937 qemu_aio_release(aio_cb);
938 return NULL;
941 static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
942 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
943 BlockDriverCompletionFunc *cb, void *opaque)
945 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
946 opaque, ARCHIP_OP_READ);
949 static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
950 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
951 BlockDriverCompletionFunc *cb, void *opaque)
953 return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
954 opaque, ARCHIP_OP_WRITE);
957 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
959 uint64_t size;
960 int ret, targetlen;
961 struct xseg_request *req;
962 struct xseg_reply_info *xinfo;
963 AIORequestData *reqdata = g_new(AIORequestData, 1);
965 const char *volname = s->volname;
966 targetlen = strlen(volname);
967 req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
968 if (!req) {
969 archipelagolog("Cannot get XSEG request\n");
970 goto err_exit2;
972 ret = xseg_prep_request(s->xseg, req, targetlen,
973 sizeof(struct xseg_reply_info));
974 if (ret < 0) {
975 archipelagolog("Cannot prepare XSEG request\n");
976 goto err_exit;
978 char *target = xseg_get_target(s->xseg, req);
979 if (!target) {
980 archipelagolog("Cannot get XSEG target\n");
981 goto err_exit;
983 memcpy(target, volname, targetlen);
984 req->size = req->datalen;
985 req->offset = 0;
986 req->op = X_INFO;
988 reqdata->op = ARCHIP_OP_VOLINFO;
989 reqdata->volname = volname;
990 xseg_set_req_data(s->xseg, req, reqdata);
992 xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
993 if (p == NoPort) {
994 archipelagolog("Cannot submit XSEG request\n");
995 goto err_exit;
997 xseg_signal(s->xseg, p);
998 qemu_mutex_lock(&s->archip_mutex);
999 while (!s->is_signaled) {
1000 qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1002 s->is_signaled = false;
1003 qemu_mutex_unlock(&s->archip_mutex);
1005 xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
1006 size = xinfo->size;
1007 xseg_put_request(s->xseg, req, s->srcport);
1008 g_free(reqdata);
1009 s->size = size;
1010 return size;
1012 err_exit:
1013 xseg_put_request(s->xseg, req, s->srcport);
1014 err_exit2:
1015 g_free(reqdata);
1016 return -EIO;
1019 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
1021 int64_t ret;
1022 BDRVArchipelagoState *s = bs->opaque;
1024 ret = archipelago_volume_info(s);
1025 return ret;
1028 static QemuOptsList qemu_archipelago_create_opts = {
1029 .name = "archipelago-create-opts",
1030 .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1031 .desc = {
1033 .name = BLOCK_OPT_SIZE,
1034 .type = QEMU_OPT_SIZE,
1035 .help = "Virtual disk size"
1037 { /* end of list */ }
1041 static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1042 BlockDriverCompletionFunc *cb, void *opaque)
1044 return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1045 ARCHIP_OP_FLUSH);
1048 static BlockDriver bdrv_archipelago = {
1049 .format_name = "archipelago",
1050 .protocol_name = "archipelago",
1051 .instance_size = sizeof(BDRVArchipelagoState),
1052 .bdrv_parse_filename = archipelago_parse_filename,
1053 .bdrv_file_open = qemu_archipelago_open,
1054 .bdrv_close = qemu_archipelago_close,
1055 .bdrv_create = qemu_archipelago_create,
1056 .bdrv_getlength = qemu_archipelago_getlength,
1057 .bdrv_aio_readv = qemu_archipelago_aio_readv,
1058 .bdrv_aio_writev = qemu_archipelago_aio_writev,
1059 .bdrv_aio_flush = qemu_archipelago_aio_flush,
1060 .bdrv_has_zero_init = bdrv_has_zero_init_1,
1061 .create_opts = &qemu_archipelago_create_opts,
1064 static void bdrv_archipelago_init(void)
1066 bdrv_register(&bdrv_archipelago);
1069 block_init(bdrv_archipelago_init);