block/nbd-client: rename read_reply_co to connection_co
[qemu/ar7.git] / block / nbd-client.c
blobf0ad54ce21d3a844277dae52f938738f9213eadb
1 /*
2 * QEMU Block driver for NBD
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
30 #include "qemu/osdep.h"
32 #include "trace.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
39 static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
41 int i;
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
44 NBDClientRequest *req = &s->requests[i];
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
52 static void nbd_teardown_connection(BlockDriverState *bs)
54 NBDClientSession *client = nbd_get_client_session(bs);
56 assert(client->ioc);
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client->ioc,
60 QIO_CHANNEL_SHUTDOWN_BOTH,
61 NULL);
62 BDRV_POLL_WHILE(bs, client->connection_co);
64 nbd_client_detach_aio_context(bs);
65 object_unref(OBJECT(client->sioc));
66 client->sioc = NULL;
67 object_unref(OBJECT(client->ioc));
68 client->ioc = NULL;
71 static coroutine_fn void nbd_connection_entry(void *opaque)
73 NBDClientSession *s = opaque;
74 uint64_t i;
75 int ret = 0;
76 Error *local_err = NULL;
78 while (!s->quit) {
79 assert(s->reply.handle == 0);
80 ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
81 if (local_err) {
82 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
83 error_free(local_err);
85 if (ret <= 0) {
86 break;
89 /* There's no need for a mutex on the receive side, because the
90 * handler acts as a synchronization point and ensures that only
91 * one coroutine is called until the reply finishes.
93 i = HANDLE_TO_INDEX(s, s->reply.handle);
94 if (i >= MAX_NBD_REQUESTS ||
95 !s->requests[i].coroutine ||
96 !s->requests[i].receiving ||
97 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
99 break;
102 /* We're woken up again by the request itself. Note that there
103 * is no race between yielding and reentering connection_co. This
104 * is because:
106 * - if the request runs on the same AioContext, it is only
107 * entered after we yield
109 * - if the request runs on a different AioContext, reentering
110 * connection_co happens through a bottom half, which can only
111 * run after we yield.
113 aio_co_wake(s->requests[i].coroutine);
114 qemu_coroutine_yield();
117 s->quit = true;
118 nbd_recv_coroutines_wake_all(s);
119 s->connection_co = NULL;
120 aio_wait_kick();
123 static int nbd_co_send_request(BlockDriverState *bs,
124 NBDRequest *request,
125 QEMUIOVector *qiov)
127 NBDClientSession *s = nbd_get_client_session(bs);
128 int rc, i;
130 qemu_co_mutex_lock(&s->send_mutex);
131 while (s->in_flight == MAX_NBD_REQUESTS) {
132 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
134 s->in_flight++;
136 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
137 if (s->requests[i].coroutine == NULL) {
138 break;
142 g_assert(qemu_in_coroutine());
143 assert(i < MAX_NBD_REQUESTS);
145 s->requests[i].coroutine = qemu_coroutine_self();
146 s->requests[i].offset = request->from;
147 s->requests[i].receiving = false;
149 request->handle = INDEX_TO_HANDLE(s, i);
151 if (s->quit) {
152 rc = -EIO;
153 goto err;
155 assert(s->ioc);
157 if (qiov) {
158 qio_channel_set_cork(s->ioc, true);
159 rc = nbd_send_request(s->ioc, request);
160 if (rc >= 0 && !s->quit) {
161 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
162 NULL) < 0) {
163 rc = -EIO;
165 } else if (rc >= 0) {
166 rc = -EIO;
168 qio_channel_set_cork(s->ioc, false);
169 } else {
170 rc = nbd_send_request(s->ioc, request);
173 err:
174 if (rc < 0) {
175 s->quit = true;
176 s->requests[i].coroutine = NULL;
177 s->in_flight--;
178 qemu_co_queue_next(&s->free_sema);
180 qemu_co_mutex_unlock(&s->send_mutex);
181 return rc;
184 static inline uint16_t payload_advance16(uint8_t **payload)
186 *payload += 2;
187 return lduw_be_p(*payload - 2);
190 static inline uint32_t payload_advance32(uint8_t **payload)
192 *payload += 4;
193 return ldl_be_p(*payload - 4);
196 static inline uint64_t payload_advance64(uint8_t **payload)
198 *payload += 8;
199 return ldq_be_p(*payload - 8);
202 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
203 uint8_t *payload, uint64_t orig_offset,
204 QEMUIOVector *qiov, Error **errp)
206 uint64_t offset;
207 uint32_t hole_size;
209 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
210 error_setg(errp, "Protocol error: invalid payload for "
211 "NBD_REPLY_TYPE_OFFSET_HOLE");
212 return -EINVAL;
215 offset = payload_advance64(&payload);
216 hole_size = payload_advance32(&payload);
218 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
219 offset > orig_offset + qiov->size - hole_size) {
220 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
221 " region");
222 return -EINVAL;
225 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
227 return 0;
230 /* nbd_parse_blockstatus_payload
231 * support only one extent in reply and only for
232 * base:allocation context
234 static int nbd_parse_blockstatus_payload(NBDClientSession *client,
235 NBDStructuredReplyChunk *chunk,
236 uint8_t *payload, uint64_t orig_length,
237 NBDExtent *extent, Error **errp)
239 uint32_t context_id;
241 if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
242 error_setg(errp, "Protocol error: invalid payload for "
243 "NBD_REPLY_TYPE_BLOCK_STATUS");
244 return -EINVAL;
247 context_id = payload_advance32(&payload);
248 if (client->info.context_id != context_id) {
249 error_setg(errp, "Protocol error: unexpected context id %d for "
250 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
251 "id is %d", context_id,
252 client->info.context_id);
253 return -EINVAL;
256 extent->length = payload_advance32(&payload);
257 extent->flags = payload_advance32(&payload);
259 if (extent->length == 0 ||
260 (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
261 client->info.min_block))) {
262 error_setg(errp, "Protocol error: server sent status chunk with "
263 "invalid length");
264 return -EINVAL;
267 /* The server is allowed to send us extra information on the final
268 * extent; just clamp it to the length we requested. */
269 if (extent->length > orig_length) {
270 extent->length = orig_length;
273 return 0;
276 /* nbd_parse_error_payload
277 * on success @errp contains message describing nbd error reply
279 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
280 uint8_t *payload, int *request_ret,
281 Error **errp)
283 uint32_t error;
284 uint16_t message_size;
286 assert(chunk->type & (1 << 15));
288 if (chunk->length < sizeof(error) + sizeof(message_size)) {
289 error_setg(errp,
290 "Protocol error: invalid payload for structured error");
291 return -EINVAL;
294 error = nbd_errno_to_system_errno(payload_advance32(&payload));
295 if (error == 0) {
296 error_setg(errp, "Protocol error: server sent structured error chunk "
297 "with error = 0");
298 return -EINVAL;
301 *request_ret = -error;
302 message_size = payload_advance16(&payload);
304 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
305 error_setg(errp, "Protocol error: server sent structured error chunk "
306 "with incorrect message size");
307 return -EINVAL;
310 /* TODO: Add a trace point to mention the server complaint */
312 /* TODO handle ERROR_OFFSET */
314 return 0;
317 static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
318 uint64_t orig_offset,
319 QEMUIOVector *qiov, Error **errp)
321 QEMUIOVector sub_qiov;
322 uint64_t offset;
323 size_t data_size;
324 int ret;
325 NBDStructuredReplyChunk *chunk = &s->reply.structured;
327 assert(nbd_reply_is_structured(&s->reply));
329 /* The NBD spec requires at least one byte of payload */
330 if (chunk->length <= sizeof(offset)) {
331 error_setg(errp, "Protocol error: invalid payload for "
332 "NBD_REPLY_TYPE_OFFSET_DATA");
333 return -EINVAL;
336 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
337 return -EIO;
340 data_size = chunk->length - sizeof(offset);
341 assert(data_size);
342 if (offset < orig_offset || data_size > qiov->size ||
343 offset > orig_offset + qiov->size - data_size) {
344 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
345 " region");
346 return -EINVAL;
349 qemu_iovec_init(&sub_qiov, qiov->niov);
350 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
351 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
352 qemu_iovec_destroy(&sub_qiov);
354 return ret < 0 ? -EIO : 0;
357 #define NBD_MAX_MALLOC_PAYLOAD 1000
358 /* nbd_co_receive_structured_payload
360 static coroutine_fn int nbd_co_receive_structured_payload(
361 NBDClientSession *s, void **payload, Error **errp)
363 int ret;
364 uint32_t len;
366 assert(nbd_reply_is_structured(&s->reply));
368 len = s->reply.structured.length;
370 if (len == 0) {
371 return 0;
374 if (payload == NULL) {
375 error_setg(errp, "Unexpected structured payload");
376 return -EINVAL;
379 if (len > NBD_MAX_MALLOC_PAYLOAD) {
380 error_setg(errp, "Payload too large");
381 return -EINVAL;
384 *payload = g_new(char, len);
385 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
386 if (ret < 0) {
387 g_free(*payload);
388 *payload = NULL;
389 return ret;
392 return 0;
395 /* nbd_co_do_receive_one_chunk
396 * for simple reply:
397 * set request_ret to received reply error
398 * if qiov is not NULL: read payload to @qiov
399 * for structured reply chunk:
400 * if error chunk: read payload, set @request_ret, do not set @payload
401 * else if offset_data chunk: read payload data to @qiov, do not set @payload
402 * else: read payload to @payload
404 * If function fails, @errp contains corresponding error message, and the
405 * connection with the server is suspect. If it returns 0, then the
406 * transaction succeeded (although @request_ret may be a negative errno
407 * corresponding to the server's error reply), and errp is unchanged.
409 static coroutine_fn int nbd_co_do_receive_one_chunk(
410 NBDClientSession *s, uint64_t handle, bool only_structured,
411 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
413 int ret;
414 int i = HANDLE_TO_INDEX(s, handle);
415 void *local_payload = NULL;
416 NBDStructuredReplyChunk *chunk;
418 if (payload) {
419 *payload = NULL;
421 *request_ret = 0;
423 /* Wait until we're woken up by nbd_connection_entry. */
424 s->requests[i].receiving = true;
425 qemu_coroutine_yield();
426 s->requests[i].receiving = false;
427 if (s->quit) {
428 error_setg(errp, "Connection closed");
429 return -EIO;
431 assert(s->ioc);
433 assert(s->reply.handle == handle);
435 if (nbd_reply_is_simple(&s->reply)) {
436 if (only_structured) {
437 error_setg(errp, "Protocol error: simple reply when structured "
438 "reply chunk was expected");
439 return -EINVAL;
442 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
443 if (*request_ret < 0 || !qiov) {
444 return 0;
447 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
448 errp) < 0 ? -EIO : 0;
451 /* handle structured reply chunk */
452 assert(s->info.structured_reply);
453 chunk = &s->reply.structured;
455 if (chunk->type == NBD_REPLY_TYPE_NONE) {
456 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
457 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
458 " NBD_REPLY_FLAG_DONE flag set");
459 return -EINVAL;
461 if (chunk->length) {
462 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
463 " nonzero length");
464 return -EINVAL;
466 return 0;
469 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
470 if (!qiov) {
471 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
472 return -EINVAL;
475 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
476 qiov, errp);
479 if (nbd_reply_type_is_error(chunk->type)) {
480 payload = &local_payload;
483 ret = nbd_co_receive_structured_payload(s, payload, errp);
484 if (ret < 0) {
485 return ret;
488 if (nbd_reply_type_is_error(chunk->type)) {
489 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
490 g_free(local_payload);
491 return ret;
494 return 0;
497 /* nbd_co_receive_one_chunk
498 * Read reply, wake up connection_co and set s->quit if needed.
499 * Return value is a fatal error code or normal nbd reply error code
501 static coroutine_fn int nbd_co_receive_one_chunk(
502 NBDClientSession *s, uint64_t handle, bool only_structured,
503 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
504 Error **errp)
506 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
507 request_ret, qiov, payload, errp);
509 if (ret < 0) {
510 s->quit = true;
511 } else {
512 /* For assert at loop start in nbd_connection_entry */
513 if (reply) {
514 *reply = s->reply;
516 s->reply.handle = 0;
519 if (s->connection_co) {
520 aio_co_wake(s->connection_co);
523 return ret;
526 typedef struct NBDReplyChunkIter {
527 int ret;
528 int request_ret;
529 Error *err;
530 bool done, only_structured;
531 } NBDReplyChunkIter;
533 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
534 int ret, Error **local_err)
536 assert(ret < 0);
538 if (!iter->ret) {
539 iter->ret = ret;
540 error_propagate(&iter->err, *local_err);
541 } else {
542 error_free(*local_err);
545 *local_err = NULL;
548 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
550 assert(ret < 0);
552 if (!iter->request_ret) {
553 iter->request_ret = ret;
557 /* NBD_FOREACH_REPLY_CHUNK
559 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
560 qiov, reply, payload) \
561 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
562 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
564 /* nbd_reply_chunk_iter_receive
566 static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
567 NBDReplyChunkIter *iter,
568 uint64_t handle,
569 QEMUIOVector *qiov, NBDReply *reply,
570 void **payload)
572 int ret, request_ret;
573 NBDReply local_reply;
574 NBDStructuredReplyChunk *chunk;
575 Error *local_err = NULL;
576 if (s->quit) {
577 error_setg(&local_err, "Connection closed");
578 nbd_iter_channel_error(iter, -EIO, &local_err);
579 goto break_loop;
582 if (iter->done) {
583 /* Previous iteration was last. */
584 goto break_loop;
587 if (reply == NULL) {
588 reply = &local_reply;
591 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
592 &request_ret, qiov, reply, payload,
593 &local_err);
594 if (ret < 0) {
595 nbd_iter_channel_error(iter, ret, &local_err);
596 } else if (request_ret < 0) {
597 nbd_iter_request_error(iter, request_ret);
600 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
601 if (nbd_reply_is_simple(reply) || s->quit) {
602 goto break_loop;
605 chunk = &reply->structured;
606 iter->only_structured = true;
608 if (chunk->type == NBD_REPLY_TYPE_NONE) {
609 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
610 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
611 goto break_loop;
614 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
615 /* This iteration is last. */
616 iter->done = true;
619 /* Execute the loop body */
620 return true;
622 break_loop:
623 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
625 qemu_co_mutex_lock(&s->send_mutex);
626 s->in_flight--;
627 qemu_co_queue_next(&s->free_sema);
628 qemu_co_mutex_unlock(&s->send_mutex);
630 return false;
633 static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
634 int *request_ret, Error **errp)
636 NBDReplyChunkIter iter;
638 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
639 /* nbd_reply_chunk_iter_receive does all the work */
642 error_propagate(errp, iter.err);
643 *request_ret = iter.request_ret;
644 return iter.ret;
647 static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
648 uint64_t offset, QEMUIOVector *qiov,
649 int *request_ret, Error **errp)
651 NBDReplyChunkIter iter;
652 NBDReply reply;
653 void *payload = NULL;
654 Error *local_err = NULL;
656 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
657 qiov, &reply, &payload)
659 int ret;
660 NBDStructuredReplyChunk *chunk = &reply.structured;
662 assert(nbd_reply_is_structured(&reply));
664 switch (chunk->type) {
665 case NBD_REPLY_TYPE_OFFSET_DATA:
666 /* special cased in nbd_co_receive_one_chunk, data is already
667 * in qiov */
668 break;
669 case NBD_REPLY_TYPE_OFFSET_HOLE:
670 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
671 offset, qiov, &local_err);
672 if (ret < 0) {
673 s->quit = true;
674 nbd_iter_channel_error(&iter, ret, &local_err);
676 break;
677 default:
678 if (!nbd_reply_type_is_error(chunk->type)) {
679 /* not allowed reply type */
680 s->quit = true;
681 error_setg(&local_err,
682 "Unexpected reply type: %d (%s) for CMD_READ",
683 chunk->type, nbd_reply_type_lookup(chunk->type));
684 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
688 g_free(payload);
689 payload = NULL;
692 error_propagate(errp, iter.err);
693 *request_ret = iter.request_ret;
694 return iter.ret;
697 static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
698 uint64_t handle, uint64_t length,
699 NBDExtent *extent,
700 int *request_ret, Error **errp)
702 NBDReplyChunkIter iter;
703 NBDReply reply;
704 void *payload = NULL;
705 Error *local_err = NULL;
706 bool received = false;
708 assert(!extent->length);
709 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
710 NULL, &reply, &payload)
712 int ret;
713 NBDStructuredReplyChunk *chunk = &reply.structured;
715 assert(nbd_reply_is_structured(&reply));
717 switch (chunk->type) {
718 case NBD_REPLY_TYPE_BLOCK_STATUS:
719 if (received) {
720 s->quit = true;
721 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
722 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
724 received = true;
726 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
727 payload, length, extent,
728 &local_err);
729 if (ret < 0) {
730 s->quit = true;
731 nbd_iter_channel_error(&iter, ret, &local_err);
733 break;
734 default:
735 if (!nbd_reply_type_is_error(chunk->type)) {
736 s->quit = true;
737 error_setg(&local_err,
738 "Unexpected reply type: %d (%s) "
739 "for CMD_BLOCK_STATUS",
740 chunk->type, nbd_reply_type_lookup(chunk->type));
741 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
745 g_free(payload);
746 payload = NULL;
749 if (!extent->length && !iter.err) {
750 error_setg(&iter.err,
751 "Server did not reply with any status extents");
752 if (!iter.ret) {
753 iter.ret = -EIO;
757 error_propagate(errp, iter.err);
758 *request_ret = iter.request_ret;
759 return iter.ret;
762 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
763 QEMUIOVector *write_qiov)
765 int ret, request_ret;
766 Error *local_err = NULL;
767 NBDClientSession *client = nbd_get_client_session(bs);
769 assert(request->type != NBD_CMD_READ);
770 if (write_qiov) {
771 assert(request->type == NBD_CMD_WRITE);
772 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
773 } else {
774 assert(request->type != NBD_CMD_WRITE);
776 ret = nbd_co_send_request(bs, request, write_qiov);
777 if (ret < 0) {
778 return ret;
781 ret = nbd_co_receive_return_code(client, request->handle,
782 &request_ret, &local_err);
783 if (local_err) {
784 trace_nbd_co_request_fail(request->from, request->len, request->handle,
785 request->flags, request->type,
786 nbd_cmd_lookup(request->type),
787 ret, error_get_pretty(local_err));
788 error_free(local_err);
790 return ret ? ret : request_ret;
793 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
794 uint64_t bytes, QEMUIOVector *qiov, int flags)
796 int ret, request_ret;
797 Error *local_err = NULL;
798 NBDClientSession *client = nbd_get_client_session(bs);
799 NBDRequest request = {
800 .type = NBD_CMD_READ,
801 .from = offset,
802 .len = bytes,
805 assert(bytes <= NBD_MAX_BUFFER_SIZE);
806 assert(!flags);
808 if (!bytes) {
809 return 0;
811 ret = nbd_co_send_request(bs, &request, NULL);
812 if (ret < 0) {
813 return ret;
816 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
817 &request_ret, &local_err);
818 if (local_err) {
819 trace_nbd_co_request_fail(request.from, request.len, request.handle,
820 request.flags, request.type,
821 nbd_cmd_lookup(request.type),
822 ret, error_get_pretty(local_err));
823 error_free(local_err);
825 return ret ? ret : request_ret;
828 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
829 uint64_t bytes, QEMUIOVector *qiov, int flags)
831 NBDClientSession *client = nbd_get_client_session(bs);
832 NBDRequest request = {
833 .type = NBD_CMD_WRITE,
834 .from = offset,
835 .len = bytes,
838 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
839 if (flags & BDRV_REQ_FUA) {
840 assert(client->info.flags & NBD_FLAG_SEND_FUA);
841 request.flags |= NBD_CMD_FLAG_FUA;
844 assert(bytes <= NBD_MAX_BUFFER_SIZE);
846 if (!bytes) {
847 return 0;
849 return nbd_co_request(bs, &request, qiov);
852 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
853 int bytes, BdrvRequestFlags flags)
855 NBDClientSession *client = nbd_get_client_session(bs);
856 NBDRequest request = {
857 .type = NBD_CMD_WRITE_ZEROES,
858 .from = offset,
859 .len = bytes,
862 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
863 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
864 return -ENOTSUP;
867 if (flags & BDRV_REQ_FUA) {
868 assert(client->info.flags & NBD_FLAG_SEND_FUA);
869 request.flags |= NBD_CMD_FLAG_FUA;
871 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
872 request.flags |= NBD_CMD_FLAG_NO_HOLE;
875 if (!bytes) {
876 return 0;
878 return nbd_co_request(bs, &request, NULL);
881 int nbd_client_co_flush(BlockDriverState *bs)
883 NBDClientSession *client = nbd_get_client_session(bs);
884 NBDRequest request = { .type = NBD_CMD_FLUSH };
886 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
887 return 0;
890 request.from = 0;
891 request.len = 0;
893 return nbd_co_request(bs, &request, NULL);
896 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
898 NBDClientSession *client = nbd_get_client_session(bs);
899 NBDRequest request = {
900 .type = NBD_CMD_TRIM,
901 .from = offset,
902 .len = bytes,
905 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
906 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
907 return 0;
910 return nbd_co_request(bs, &request, NULL);
913 int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
914 bool want_zero,
915 int64_t offset, int64_t bytes,
916 int64_t *pnum, int64_t *map,
917 BlockDriverState **file)
919 int ret, request_ret;
920 NBDExtent extent = { 0 };
921 NBDClientSession *client = nbd_get_client_session(bs);
922 Error *local_err = NULL;
924 NBDRequest request = {
925 .type = NBD_CMD_BLOCK_STATUS,
926 .from = offset,
927 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
928 bs->bl.request_alignment),
929 client->info.max_block), bytes),
930 .flags = NBD_CMD_FLAG_REQ_ONE,
933 if (!client->info.base_allocation) {
934 *pnum = bytes;
935 return BDRV_BLOCK_DATA;
938 ret = nbd_co_send_request(bs, &request, NULL);
939 if (ret < 0) {
940 return ret;
943 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
944 &extent, &request_ret, &local_err);
945 if (local_err) {
946 trace_nbd_co_request_fail(request.from, request.len, request.handle,
947 request.flags, request.type,
948 nbd_cmd_lookup(request.type),
949 ret, error_get_pretty(local_err));
950 error_free(local_err);
952 if (ret < 0 || request_ret < 0) {
953 return ret ? ret : request_ret;
956 assert(extent.length);
957 *pnum = extent.length;
958 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
959 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
962 void nbd_client_detach_aio_context(BlockDriverState *bs)
964 NBDClientSession *client = nbd_get_client_session(bs);
965 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
968 void nbd_client_attach_aio_context(BlockDriverState *bs,
969 AioContext *new_context)
971 NBDClientSession *client = nbd_get_client_session(bs);
972 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
973 aio_co_schedule(new_context, client->connection_co);
976 void nbd_client_close(BlockDriverState *bs)
978 NBDClientSession *client = nbd_get_client_session(bs);
979 NBDRequest request = { .type = NBD_CMD_DISC };
981 assert(client->ioc);
983 nbd_send_request(client->ioc, &request);
985 nbd_teardown_connection(bs);
988 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
989 Error **errp)
991 QIOChannelSocket *sioc;
992 Error *local_err = NULL;
994 sioc = qio_channel_socket_new();
995 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
997 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
998 if (local_err) {
999 object_unref(OBJECT(sioc));
1000 error_propagate(errp, local_err);
1001 return NULL;
1004 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1006 return sioc;
1009 static int nbd_client_connect(BlockDriverState *bs,
1010 SocketAddress *saddr,
1011 const char *export,
1012 QCryptoTLSCreds *tlscreds,
1013 const char *hostname,
1014 const char *x_dirty_bitmap,
1015 Error **errp)
1017 NBDClientSession *client = nbd_get_client_session(bs);
1018 int ret;
1021 * establish TCP connection, return error if it fails
1022 * TODO: Configurable retry-until-timeout behaviour.
1024 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1026 if (!sioc) {
1027 return -ECONNREFUSED;
1030 /* NBD handshake */
1031 logout("session init %s\n", export);
1032 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1034 client->info.request_sizes = true;
1035 client->info.structured_reply = true;
1036 client->info.base_allocation = true;
1037 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1038 client->info.name = g_strdup(export ?: "");
1039 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1040 &client->ioc, &client->info, errp);
1041 g_free(client->info.x_dirty_bitmap);
1042 g_free(client->info.name);
1043 if (ret < 0) {
1044 logout("Failed to negotiate with the NBD server\n");
1045 object_unref(OBJECT(sioc));
1046 return ret;
1048 if (x_dirty_bitmap && !client->info.base_allocation) {
1049 error_setg(errp, "requested x-dirty-bitmap %s not found",
1050 x_dirty_bitmap);
1051 ret = -EINVAL;
1052 goto fail;
1054 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1055 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1056 if (ret < 0) {
1057 goto fail;
1060 if (client->info.flags & NBD_FLAG_SEND_FUA) {
1061 bs->supported_write_flags = BDRV_REQ_FUA;
1062 bs->supported_zero_flags |= BDRV_REQ_FUA;
1064 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1065 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1068 client->sioc = sioc;
1070 if (!client->ioc) {
1071 client->ioc = QIO_CHANNEL(sioc);
1072 object_ref(OBJECT(client->ioc));
1075 /* Now that we're connected, set the socket to be non-blocking and
1076 * kick the reply mechanism. */
1077 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1078 client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1079 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1081 logout("Established connection with NBD server\n");
1082 return 0;
1084 fail:
1086 * We have connected, but must fail for other reasons. The
1087 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1088 * to the server.
1091 NBDRequest request = { .type = NBD_CMD_DISC };
1093 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1095 object_unref(OBJECT(sioc));
1097 return ret;
1101 int nbd_client_init(BlockDriverState *bs,
1102 SocketAddress *saddr,
1103 const char *export,
1104 QCryptoTLSCreds *tlscreds,
1105 const char *hostname,
1106 const char *x_dirty_bitmap,
1107 Error **errp)
1109 NBDClientSession *client = nbd_get_client_session(bs);
1111 qemu_co_mutex_init(&client->send_mutex);
1112 qemu_co_queue_init(&client->free_sema);
1114 return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1115 x_dirty_bitmap, errp);