Merge remote-tracking branch 'remotes/rth/tags/pull-hppa-20190307' into staging
[qemu/ar7.git] / block / nbd-client.c
blobbfbaf7ebe9450cb282e0bf3a294e28d9a0808b1c
1 /*
2 * QEMU Block driver for NBD
4 * Copyright (C) 2016 Red Hat, Inc.
5 * Copyright (C) 2008 Bull S.A.S.
6 * Author: Laurent Vivier <Laurent.Vivier@bull.net>
8 * Some parts:
9 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
30 #include "qemu/osdep.h"
32 #include "trace.h"
33 #include "qapi/error.h"
34 #include "nbd-client.h"
36 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
37 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs))
39 static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
41 int i;
43 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
44 NBDClientRequest *req = &s->requests[i];
46 if (req->coroutine && req->receiving) {
47 aio_co_wake(req->coroutine);
52 static void nbd_teardown_connection(BlockDriverState *bs)
54 NBDClientSession *client = nbd_get_client_session(bs);
56 assert(client->ioc);
58 /* finish any pending coroutines */
59 qio_channel_shutdown(client->ioc,
60 QIO_CHANNEL_SHUTDOWN_BOTH,
61 NULL);
62 BDRV_POLL_WHILE(bs, client->connection_co);
64 nbd_client_detach_aio_context(bs);
65 object_unref(OBJECT(client->sioc));
66 client->sioc = NULL;
67 object_unref(OBJECT(client->ioc));
68 client->ioc = NULL;
71 static coroutine_fn void nbd_connection_entry(void *opaque)
73 NBDClientSession *s = opaque;
74 uint64_t i;
75 int ret = 0;
76 Error *local_err = NULL;
78 while (!s->quit) {
80 * The NBD client can only really be considered idle when it has
81 * yielded from qio_channel_readv_all_eof(), waiting for data. This is
82 * the point where the additional scheduled coroutine entry happens
83 * after nbd_client_attach_aio_context().
85 * Therefore we keep an additional in_flight reference all the time and
86 * only drop it temporarily here.
88 assert(s->reply.handle == 0);
89 ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
91 if (local_err) {
92 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
93 error_free(local_err);
95 if (ret <= 0) {
96 break;
99 /* There's no need for a mutex on the receive side, because the
100 * handler acts as a synchronization point and ensures that only
101 * one coroutine is called until the reply finishes.
103 i = HANDLE_TO_INDEX(s, s->reply.handle);
104 if (i >= MAX_NBD_REQUESTS ||
105 !s->requests[i].coroutine ||
106 !s->requests[i].receiving ||
107 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
109 break;
112 /* We're woken up again by the request itself. Note that there
113 * is no race between yielding and reentering connection_co. This
114 * is because:
116 * - if the request runs on the same AioContext, it is only
117 * entered after we yield
119 * - if the request runs on a different AioContext, reentering
120 * connection_co happens through a bottom half, which can only
121 * run after we yield.
123 aio_co_wake(s->requests[i].coroutine);
124 qemu_coroutine_yield();
127 s->quit = true;
128 nbd_recv_coroutines_wake_all(s);
129 bdrv_dec_in_flight(s->bs);
131 s->connection_co = NULL;
132 aio_wait_kick();
135 static int nbd_co_send_request(BlockDriverState *bs,
136 NBDRequest *request,
137 QEMUIOVector *qiov)
139 NBDClientSession *s = nbd_get_client_session(bs);
140 int rc, i;
142 qemu_co_mutex_lock(&s->send_mutex);
143 while (s->in_flight == MAX_NBD_REQUESTS) {
144 qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
146 s->in_flight++;
148 for (i = 0; i < MAX_NBD_REQUESTS; i++) {
149 if (s->requests[i].coroutine == NULL) {
150 break;
154 g_assert(qemu_in_coroutine());
155 assert(i < MAX_NBD_REQUESTS);
157 s->requests[i].coroutine = qemu_coroutine_self();
158 s->requests[i].offset = request->from;
159 s->requests[i].receiving = false;
161 request->handle = INDEX_TO_HANDLE(s, i);
163 if (s->quit) {
164 rc = -EIO;
165 goto err;
167 assert(s->ioc);
169 if (qiov) {
170 qio_channel_set_cork(s->ioc, true);
171 rc = nbd_send_request(s->ioc, request);
172 if (rc >= 0 && !s->quit) {
173 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
174 NULL) < 0) {
175 rc = -EIO;
177 } else if (rc >= 0) {
178 rc = -EIO;
180 qio_channel_set_cork(s->ioc, false);
181 } else {
182 rc = nbd_send_request(s->ioc, request);
185 err:
186 if (rc < 0) {
187 s->quit = true;
188 s->requests[i].coroutine = NULL;
189 s->in_flight--;
190 qemu_co_queue_next(&s->free_sema);
192 qemu_co_mutex_unlock(&s->send_mutex);
193 return rc;
196 static inline uint16_t payload_advance16(uint8_t **payload)
198 *payload += 2;
199 return lduw_be_p(*payload - 2);
202 static inline uint32_t payload_advance32(uint8_t **payload)
204 *payload += 4;
205 return ldl_be_p(*payload - 4);
208 static inline uint64_t payload_advance64(uint8_t **payload)
210 *payload += 8;
211 return ldq_be_p(*payload - 8);
214 static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
215 uint8_t *payload, uint64_t orig_offset,
216 QEMUIOVector *qiov, Error **errp)
218 uint64_t offset;
219 uint32_t hole_size;
221 if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
222 error_setg(errp, "Protocol error: invalid payload for "
223 "NBD_REPLY_TYPE_OFFSET_HOLE");
224 return -EINVAL;
227 offset = payload_advance64(&payload);
228 hole_size = payload_advance32(&payload);
230 if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
231 offset > orig_offset + qiov->size - hole_size) {
232 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
233 " region");
234 return -EINVAL;
237 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
239 return 0;
242 /* nbd_parse_blockstatus_payload
243 * support only one extent in reply and only for
244 * base:allocation context
246 static int nbd_parse_blockstatus_payload(NBDClientSession *client,
247 NBDStructuredReplyChunk *chunk,
248 uint8_t *payload, uint64_t orig_length,
249 NBDExtent *extent, Error **errp)
251 uint32_t context_id;
253 if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
254 error_setg(errp, "Protocol error: invalid payload for "
255 "NBD_REPLY_TYPE_BLOCK_STATUS");
256 return -EINVAL;
259 context_id = payload_advance32(&payload);
260 if (client->info.context_id != context_id) {
261 error_setg(errp, "Protocol error: unexpected context id %d for "
262 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
263 "id is %d", context_id,
264 client->info.context_id);
265 return -EINVAL;
268 extent->length = payload_advance32(&payload);
269 extent->flags = payload_advance32(&payload);
271 if (extent->length == 0 ||
272 (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
273 client->info.min_block))) {
274 error_setg(errp, "Protocol error: server sent status chunk with "
275 "invalid length");
276 return -EINVAL;
279 /* The server is allowed to send us extra information on the final
280 * extent; just clamp it to the length we requested. */
281 if (extent->length > orig_length) {
282 extent->length = orig_length;
285 return 0;
288 /* nbd_parse_error_payload
289 * on success @errp contains message describing nbd error reply
291 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
292 uint8_t *payload, int *request_ret,
293 Error **errp)
295 uint32_t error;
296 uint16_t message_size;
298 assert(chunk->type & (1 << 15));
300 if (chunk->length < sizeof(error) + sizeof(message_size)) {
301 error_setg(errp,
302 "Protocol error: invalid payload for structured error");
303 return -EINVAL;
306 error = nbd_errno_to_system_errno(payload_advance32(&payload));
307 if (error == 0) {
308 error_setg(errp, "Protocol error: server sent structured error chunk "
309 "with error = 0");
310 return -EINVAL;
313 *request_ret = -error;
314 message_size = payload_advance16(&payload);
316 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
317 error_setg(errp, "Protocol error: server sent structured error chunk "
318 "with incorrect message size");
319 return -EINVAL;
322 /* TODO: Add a trace point to mention the server complaint */
324 /* TODO handle ERROR_OFFSET */
326 return 0;
329 static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
330 uint64_t orig_offset,
331 QEMUIOVector *qiov, Error **errp)
333 QEMUIOVector sub_qiov;
334 uint64_t offset;
335 size_t data_size;
336 int ret;
337 NBDStructuredReplyChunk *chunk = &s->reply.structured;
339 assert(nbd_reply_is_structured(&s->reply));
341 /* The NBD spec requires at least one byte of payload */
342 if (chunk->length <= sizeof(offset)) {
343 error_setg(errp, "Protocol error: invalid payload for "
344 "NBD_REPLY_TYPE_OFFSET_DATA");
345 return -EINVAL;
348 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
349 return -EIO;
352 data_size = chunk->length - sizeof(offset);
353 assert(data_size);
354 if (offset < orig_offset || data_size > qiov->size ||
355 offset > orig_offset + qiov->size - data_size) {
356 error_setg(errp, "Protocol error: server sent chunk exceeding requested"
357 " region");
358 return -EINVAL;
361 qemu_iovec_init(&sub_qiov, qiov->niov);
362 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
363 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
364 qemu_iovec_destroy(&sub_qiov);
366 return ret < 0 ? -EIO : 0;
369 #define NBD_MAX_MALLOC_PAYLOAD 1000
370 /* nbd_co_receive_structured_payload
372 static coroutine_fn int nbd_co_receive_structured_payload(
373 NBDClientSession *s, void **payload, Error **errp)
375 int ret;
376 uint32_t len;
378 assert(nbd_reply_is_structured(&s->reply));
380 len = s->reply.structured.length;
382 if (len == 0) {
383 return 0;
386 if (payload == NULL) {
387 error_setg(errp, "Unexpected structured payload");
388 return -EINVAL;
391 if (len > NBD_MAX_MALLOC_PAYLOAD) {
392 error_setg(errp, "Payload too large");
393 return -EINVAL;
396 *payload = g_new(char, len);
397 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
398 if (ret < 0) {
399 g_free(*payload);
400 *payload = NULL;
401 return ret;
404 return 0;
407 /* nbd_co_do_receive_one_chunk
408 * for simple reply:
409 * set request_ret to received reply error
410 * if qiov is not NULL: read payload to @qiov
411 * for structured reply chunk:
412 * if error chunk: read payload, set @request_ret, do not set @payload
413 * else if offset_data chunk: read payload data to @qiov, do not set @payload
414 * else: read payload to @payload
416 * If function fails, @errp contains corresponding error message, and the
417 * connection with the server is suspect. If it returns 0, then the
418 * transaction succeeded (although @request_ret may be a negative errno
419 * corresponding to the server's error reply), and errp is unchanged.
421 static coroutine_fn int nbd_co_do_receive_one_chunk(
422 NBDClientSession *s, uint64_t handle, bool only_structured,
423 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
425 int ret;
426 int i = HANDLE_TO_INDEX(s, handle);
427 void *local_payload = NULL;
428 NBDStructuredReplyChunk *chunk;
430 if (payload) {
431 *payload = NULL;
433 *request_ret = 0;
435 /* Wait until we're woken up by nbd_connection_entry. */
436 s->requests[i].receiving = true;
437 qemu_coroutine_yield();
438 s->requests[i].receiving = false;
439 if (s->quit) {
440 error_setg(errp, "Connection closed");
441 return -EIO;
443 assert(s->ioc);
445 assert(s->reply.handle == handle);
447 if (nbd_reply_is_simple(&s->reply)) {
448 if (only_structured) {
449 error_setg(errp, "Protocol error: simple reply when structured "
450 "reply chunk was expected");
451 return -EINVAL;
454 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
455 if (*request_ret < 0 || !qiov) {
456 return 0;
459 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
460 errp) < 0 ? -EIO : 0;
463 /* handle structured reply chunk */
464 assert(s->info.structured_reply);
465 chunk = &s->reply.structured;
467 if (chunk->type == NBD_REPLY_TYPE_NONE) {
468 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
469 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
470 " NBD_REPLY_FLAG_DONE flag set");
471 return -EINVAL;
473 if (chunk->length) {
474 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
475 " nonzero length");
476 return -EINVAL;
478 return 0;
481 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
482 if (!qiov) {
483 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
484 return -EINVAL;
487 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
488 qiov, errp);
491 if (nbd_reply_type_is_error(chunk->type)) {
492 payload = &local_payload;
495 ret = nbd_co_receive_structured_payload(s, payload, errp);
496 if (ret < 0) {
497 return ret;
500 if (nbd_reply_type_is_error(chunk->type)) {
501 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
502 g_free(local_payload);
503 return ret;
506 return 0;
509 /* nbd_co_receive_one_chunk
510 * Read reply, wake up connection_co and set s->quit if needed.
511 * Return value is a fatal error code or normal nbd reply error code
513 static coroutine_fn int nbd_co_receive_one_chunk(
514 NBDClientSession *s, uint64_t handle, bool only_structured,
515 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
516 Error **errp)
518 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
519 request_ret, qiov, payload, errp);
521 if (ret < 0) {
522 s->quit = true;
523 } else {
524 /* For assert at loop start in nbd_connection_entry */
525 if (reply) {
526 *reply = s->reply;
528 s->reply.handle = 0;
531 if (s->connection_co) {
532 aio_co_wake(s->connection_co);
535 return ret;
538 typedef struct NBDReplyChunkIter {
539 int ret;
540 int request_ret;
541 Error *err;
542 bool done, only_structured;
543 } NBDReplyChunkIter;
545 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
546 int ret, Error **local_err)
548 assert(ret < 0);
550 if (!iter->ret) {
551 iter->ret = ret;
552 error_propagate(&iter->err, *local_err);
553 } else {
554 error_free(*local_err);
557 *local_err = NULL;
560 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
562 assert(ret < 0);
564 if (!iter->request_ret) {
565 iter->request_ret = ret;
569 /* NBD_FOREACH_REPLY_CHUNK
571 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
572 qiov, reply, payload) \
573 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
574 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
576 /* nbd_reply_chunk_iter_receive
578 static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
579 NBDReplyChunkIter *iter,
580 uint64_t handle,
581 QEMUIOVector *qiov, NBDReply *reply,
582 void **payload)
584 int ret, request_ret;
585 NBDReply local_reply;
586 NBDStructuredReplyChunk *chunk;
587 Error *local_err = NULL;
588 if (s->quit) {
589 error_setg(&local_err, "Connection closed");
590 nbd_iter_channel_error(iter, -EIO, &local_err);
591 goto break_loop;
594 if (iter->done) {
595 /* Previous iteration was last. */
596 goto break_loop;
599 if (reply == NULL) {
600 reply = &local_reply;
603 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
604 &request_ret, qiov, reply, payload,
605 &local_err);
606 if (ret < 0) {
607 nbd_iter_channel_error(iter, ret, &local_err);
608 } else if (request_ret < 0) {
609 nbd_iter_request_error(iter, request_ret);
612 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
613 if (nbd_reply_is_simple(reply) || s->quit) {
614 goto break_loop;
617 chunk = &reply->structured;
618 iter->only_structured = true;
620 if (chunk->type == NBD_REPLY_TYPE_NONE) {
621 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
622 assert(chunk->flags & NBD_REPLY_FLAG_DONE);
623 goto break_loop;
626 if (chunk->flags & NBD_REPLY_FLAG_DONE) {
627 /* This iteration is last. */
628 iter->done = true;
631 /* Execute the loop body */
632 return true;
634 break_loop:
635 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
637 qemu_co_mutex_lock(&s->send_mutex);
638 s->in_flight--;
639 qemu_co_queue_next(&s->free_sema);
640 qemu_co_mutex_unlock(&s->send_mutex);
642 return false;
645 static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
646 int *request_ret, Error **errp)
648 NBDReplyChunkIter iter;
650 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
651 /* nbd_reply_chunk_iter_receive does all the work */
654 error_propagate(errp, iter.err);
655 *request_ret = iter.request_ret;
656 return iter.ret;
659 static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
660 uint64_t offset, QEMUIOVector *qiov,
661 int *request_ret, Error **errp)
663 NBDReplyChunkIter iter;
664 NBDReply reply;
665 void *payload = NULL;
666 Error *local_err = NULL;
668 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
669 qiov, &reply, &payload)
671 int ret;
672 NBDStructuredReplyChunk *chunk = &reply.structured;
674 assert(nbd_reply_is_structured(&reply));
676 switch (chunk->type) {
677 case NBD_REPLY_TYPE_OFFSET_DATA:
678 /* special cased in nbd_co_receive_one_chunk, data is already
679 * in qiov */
680 break;
681 case NBD_REPLY_TYPE_OFFSET_HOLE:
682 ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
683 offset, qiov, &local_err);
684 if (ret < 0) {
685 s->quit = true;
686 nbd_iter_channel_error(&iter, ret, &local_err);
688 break;
689 default:
690 if (!nbd_reply_type_is_error(chunk->type)) {
691 /* not allowed reply type */
692 s->quit = true;
693 error_setg(&local_err,
694 "Unexpected reply type: %d (%s) for CMD_READ",
695 chunk->type, nbd_reply_type_lookup(chunk->type));
696 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
700 g_free(payload);
701 payload = NULL;
704 error_propagate(errp, iter.err);
705 *request_ret = iter.request_ret;
706 return iter.ret;
709 static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
710 uint64_t handle, uint64_t length,
711 NBDExtent *extent,
712 int *request_ret, Error **errp)
714 NBDReplyChunkIter iter;
715 NBDReply reply;
716 void *payload = NULL;
717 Error *local_err = NULL;
718 bool received = false;
720 assert(!extent->length);
721 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
722 NULL, &reply, &payload)
724 int ret;
725 NBDStructuredReplyChunk *chunk = &reply.structured;
727 assert(nbd_reply_is_structured(&reply));
729 switch (chunk->type) {
730 case NBD_REPLY_TYPE_BLOCK_STATUS:
731 if (received) {
732 s->quit = true;
733 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
734 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
736 received = true;
738 ret = nbd_parse_blockstatus_payload(s, &reply.structured,
739 payload, length, extent,
740 &local_err);
741 if (ret < 0) {
742 s->quit = true;
743 nbd_iter_channel_error(&iter, ret, &local_err);
745 break;
746 default:
747 if (!nbd_reply_type_is_error(chunk->type)) {
748 s->quit = true;
749 error_setg(&local_err,
750 "Unexpected reply type: %d (%s) "
751 "for CMD_BLOCK_STATUS",
752 chunk->type, nbd_reply_type_lookup(chunk->type));
753 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
757 g_free(payload);
758 payload = NULL;
761 if (!extent->length && !iter.err) {
762 error_setg(&iter.err,
763 "Server did not reply with any status extents");
764 if (!iter.ret) {
765 iter.ret = -EIO;
769 error_propagate(errp, iter.err);
770 *request_ret = iter.request_ret;
771 return iter.ret;
774 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
775 QEMUIOVector *write_qiov)
777 int ret, request_ret;
778 Error *local_err = NULL;
779 NBDClientSession *client = nbd_get_client_session(bs);
781 assert(request->type != NBD_CMD_READ);
782 if (write_qiov) {
783 assert(request->type == NBD_CMD_WRITE);
784 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
785 } else {
786 assert(request->type != NBD_CMD_WRITE);
788 ret = nbd_co_send_request(bs, request, write_qiov);
789 if (ret < 0) {
790 return ret;
793 ret = nbd_co_receive_return_code(client, request->handle,
794 &request_ret, &local_err);
795 if (local_err) {
796 trace_nbd_co_request_fail(request->from, request->len, request->handle,
797 request->flags, request->type,
798 nbd_cmd_lookup(request->type),
799 ret, error_get_pretty(local_err));
800 error_free(local_err);
802 return ret ? ret : request_ret;
805 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
806 uint64_t bytes, QEMUIOVector *qiov, int flags)
808 int ret, request_ret;
809 Error *local_err = NULL;
810 NBDClientSession *client = nbd_get_client_session(bs);
811 NBDRequest request = {
812 .type = NBD_CMD_READ,
813 .from = offset,
814 .len = bytes,
817 assert(bytes <= NBD_MAX_BUFFER_SIZE);
818 assert(!flags);
820 if (!bytes) {
821 return 0;
823 ret = nbd_co_send_request(bs, &request, NULL);
824 if (ret < 0) {
825 return ret;
828 ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
829 &request_ret, &local_err);
830 if (local_err) {
831 trace_nbd_co_request_fail(request.from, request.len, request.handle,
832 request.flags, request.type,
833 nbd_cmd_lookup(request.type),
834 ret, error_get_pretty(local_err));
835 error_free(local_err);
837 return ret ? ret : request_ret;
840 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
841 uint64_t bytes, QEMUIOVector *qiov, int flags)
843 NBDClientSession *client = nbd_get_client_session(bs);
844 NBDRequest request = {
845 .type = NBD_CMD_WRITE,
846 .from = offset,
847 .len = bytes,
850 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
851 if (flags & BDRV_REQ_FUA) {
852 assert(client->info.flags & NBD_FLAG_SEND_FUA);
853 request.flags |= NBD_CMD_FLAG_FUA;
856 assert(bytes <= NBD_MAX_BUFFER_SIZE);
858 if (!bytes) {
859 return 0;
861 return nbd_co_request(bs, &request, qiov);
864 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
865 int bytes, BdrvRequestFlags flags)
867 NBDClientSession *client = nbd_get_client_session(bs);
868 NBDRequest request = {
869 .type = NBD_CMD_WRITE_ZEROES,
870 .from = offset,
871 .len = bytes,
874 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
875 if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
876 return -ENOTSUP;
879 if (flags & BDRV_REQ_FUA) {
880 assert(client->info.flags & NBD_FLAG_SEND_FUA);
881 request.flags |= NBD_CMD_FLAG_FUA;
883 if (!(flags & BDRV_REQ_MAY_UNMAP)) {
884 request.flags |= NBD_CMD_FLAG_NO_HOLE;
887 if (!bytes) {
888 return 0;
890 return nbd_co_request(bs, &request, NULL);
893 int nbd_client_co_flush(BlockDriverState *bs)
895 NBDClientSession *client = nbd_get_client_session(bs);
896 NBDRequest request = { .type = NBD_CMD_FLUSH };
898 if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
899 return 0;
902 request.from = 0;
903 request.len = 0;
905 return nbd_co_request(bs, &request, NULL);
908 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
910 NBDClientSession *client = nbd_get_client_session(bs);
911 NBDRequest request = {
912 .type = NBD_CMD_TRIM,
913 .from = offset,
914 .len = bytes,
917 assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
918 if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
919 return 0;
922 return nbd_co_request(bs, &request, NULL);
925 int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
926 bool want_zero,
927 int64_t offset, int64_t bytes,
928 int64_t *pnum, int64_t *map,
929 BlockDriverState **file)
931 int ret, request_ret;
932 NBDExtent extent = { 0 };
933 NBDClientSession *client = nbd_get_client_session(bs);
934 Error *local_err = NULL;
936 NBDRequest request = {
937 .type = NBD_CMD_BLOCK_STATUS,
938 .from = offset,
939 .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
940 bs->bl.request_alignment),
941 client->info.max_block), bytes),
942 .flags = NBD_CMD_FLAG_REQ_ONE,
945 if (!client->info.base_allocation) {
946 *pnum = bytes;
947 return BDRV_BLOCK_DATA;
950 ret = nbd_co_send_request(bs, &request, NULL);
951 if (ret < 0) {
952 return ret;
955 ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
956 &extent, &request_ret, &local_err);
957 if (local_err) {
958 trace_nbd_co_request_fail(request.from, request.len, request.handle,
959 request.flags, request.type,
960 nbd_cmd_lookup(request.type),
961 ret, error_get_pretty(local_err));
962 error_free(local_err);
964 if (ret < 0 || request_ret < 0) {
965 return ret ? ret : request_ret;
968 assert(extent.length);
969 *pnum = extent.length;
970 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
971 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
974 void nbd_client_detach_aio_context(BlockDriverState *bs)
976 NBDClientSession *client = nbd_get_client_session(bs);
977 qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
980 static void nbd_client_attach_aio_context_bh(void *opaque)
982 BlockDriverState *bs = opaque;
983 NBDClientSession *client = nbd_get_client_session(bs);
985 /* The node is still drained, so we know the coroutine has yielded in
986 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
987 * entered for the first time. Both places are safe for entering the
988 * coroutine.*/
989 qemu_aio_coroutine_enter(bs->aio_context, client->connection_co);
990 bdrv_dec_in_flight(bs);
993 void nbd_client_attach_aio_context(BlockDriverState *bs,
994 AioContext *new_context)
996 NBDClientSession *client = nbd_get_client_session(bs);
997 qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
999 bdrv_inc_in_flight(bs);
1001 /* Need to wait here for the BH to run because the BH must run while the
1002 * node is still drained. */
1003 aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
1006 void nbd_client_close(BlockDriverState *bs)
1008 NBDClientSession *client = nbd_get_client_session(bs);
1009 NBDRequest request = { .type = NBD_CMD_DISC };
1011 assert(client->ioc);
1013 nbd_send_request(client->ioc, &request);
1015 nbd_teardown_connection(bs);
1018 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1019 Error **errp)
1021 QIOChannelSocket *sioc;
1022 Error *local_err = NULL;
1024 sioc = qio_channel_socket_new();
1025 qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1027 qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1028 if (local_err) {
1029 object_unref(OBJECT(sioc));
1030 error_propagate(errp, local_err);
1031 return NULL;
1034 qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1036 return sioc;
1039 static int nbd_client_connect(BlockDriverState *bs,
1040 SocketAddress *saddr,
1041 const char *export,
1042 QCryptoTLSCreds *tlscreds,
1043 const char *hostname,
1044 const char *x_dirty_bitmap,
1045 Error **errp)
1047 NBDClientSession *client = nbd_get_client_session(bs);
1048 int ret;
1051 * establish TCP connection, return error if it fails
1052 * TODO: Configurable retry-until-timeout behaviour.
1054 QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1056 if (!sioc) {
1057 return -ECONNREFUSED;
1060 /* NBD handshake */
1061 logout("session init %s\n", export);
1062 qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1064 client->info.request_sizes = true;
1065 client->info.structured_reply = true;
1066 client->info.base_allocation = true;
1067 client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1068 client->info.name = g_strdup(export ?: "");
1069 ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1070 &client->ioc, &client->info, errp);
1071 g_free(client->info.x_dirty_bitmap);
1072 g_free(client->info.name);
1073 if (ret < 0) {
1074 logout("Failed to negotiate with the NBD server\n");
1075 object_unref(OBJECT(sioc));
1076 return ret;
1078 if (x_dirty_bitmap && !client->info.base_allocation) {
1079 error_setg(errp, "requested x-dirty-bitmap %s not found",
1080 x_dirty_bitmap);
1081 ret = -EINVAL;
1082 goto fail;
1084 if (client->info.flags & NBD_FLAG_READ_ONLY) {
1085 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1086 if (ret < 0) {
1087 goto fail;
1090 if (client->info.flags & NBD_FLAG_SEND_FUA) {
1091 bs->supported_write_flags = BDRV_REQ_FUA;
1092 bs->supported_zero_flags |= BDRV_REQ_FUA;
1094 if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1095 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1098 client->sioc = sioc;
1100 if (!client->ioc) {
1101 client->ioc = QIO_CHANNEL(sioc);
1102 object_ref(OBJECT(client->ioc));
1105 /* Now that we're connected, set the socket to be non-blocking and
1106 * kick the reply mechanism. */
1107 qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1108 client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1109 bdrv_inc_in_flight(bs);
1110 nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1112 logout("Established connection with NBD server\n");
1113 return 0;
1115 fail:
1117 * We have connected, but must fail for other reasons. The
1118 * connection is still blocking; send NBD_CMD_DISC as a courtesy
1119 * to the server.
1122 NBDRequest request = { .type = NBD_CMD_DISC };
1124 nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1126 object_unref(OBJECT(sioc));
1128 return ret;
1132 int nbd_client_init(BlockDriverState *bs,
1133 SocketAddress *saddr,
1134 const char *export,
1135 QCryptoTLSCreds *tlscreds,
1136 const char *hostname,
1137 const char *x_dirty_bitmap,
1138 Error **errp)
1140 NBDClientSession *client = nbd_get_client_session(bs);
1142 client->bs = bs;
1143 qemu_co_mutex_init(&client->send_mutex);
1144 qemu_co_queue_init(&client->free_sema);
1146 return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1147 x_dirty_bitmap, errp);