nbd: Rewrite thread passing to use semaphore rather than pipe
[nbdkit/ericb.git] / plugins / nbd / nbd.c
blobb2f344631b9e2cba17cfb89d9e7f5f16e81d7a66
1 /* nbdkit
2 * Copyright (C) 2017-2019 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 #include <config.h>
35 #include <stdlib.h>
36 #include <stddef.h>
37 #include <stdbool.h>
38 #include <stdio.h>
39 #include <string.h>
40 #include <unistd.h>
41 #include <errno.h>
42 #include <inttypes.h>
43 #include <limits.h>
44 #include <netdb.h>
45 #include <netinet/in.h>
46 #include <netinet/tcp.h>
47 #include <sys/socket.h>
48 #include <sys/un.h>
49 #include <assert.h>
50 #include <pthread.h>
51 #include <semaphore.h>
53 #define NBDKIT_API_VERSION 2
55 #include <nbdkit-plugin.h>
56 #include "protocol.h"
57 #include "byte-swapping.h"
58 #include "cleanup.h"
60 /* Connect to server via absolute name of Unix socket */
61 static char *sockname;
63 /* Connect to server via TCP socket */
64 static const char *hostname;
65 static const char *port;
67 /* Human-readable server description */
68 static char *servname;
70 /* Name of export on remote server, default '', ignored for oldstyle */
71 static const char *export;
73 static void
74 nbd_unload (void)
76 free (sockname);
77 free (servname);
80 /* Called for each key=value passed on the command line. This plugin
81 * accepts socket=<sockname> or hostname=<hostname>/port=<port>
82 * (exactly one connection required) and export=<name> (optional).
84 static int
85 nbd_config (const char *key, const char *value)
87 if (strcmp (key, "socket") == 0) {
88 /* See FILENAMES AND PATHS in nbdkit-plugin(3) */
89 free (sockname);
90 sockname = nbdkit_absolute_path (value);
91 if (!sockname)
92 return -1;
94 else if (strcmp (key, "hostname") == 0)
95 hostname = value;
96 else if (strcmp (key, "port") == 0)
97 port = value;
98 else if (strcmp (key, "export") == 0)
99 export = value;
100 else {
101 nbdkit_error ("unknown parameter '%s'", key);
102 return -1;
105 return 0;
108 /* Check the user passed exactly one socket description. */
109 static int
110 nbd_config_complete (void)
112 int r;
114 if (sockname) {
115 struct sockaddr_un sock;
117 if (hostname || port) {
118 nbdkit_error ("cannot mix Unix socket and TCP hostname/port parameters");
119 return -1;
121 if (strlen (sockname) > sizeof sock.sun_path) {
122 nbdkit_error ("socket file name too large");
123 return -1;
125 servname = strdup (sockname);
127 else {
128 if (!hostname) {
129 nbdkit_error ("must supply socket= or hostname= of external NBD server");
130 return -1;
132 if (!port)
133 port = "10809";
134 if (strchr (hostname, ':'))
135 r = asprintf (&servname, "[%s]:%s", hostname, port);
136 else
137 r = asprintf (&servname, "%s:%s", hostname, port);
138 if (r < 0) {
139 nbdkit_error ("asprintf: %m");
140 return -1;
144 if (!export)
145 export = "";
146 return 0;
149 #define nbd_config_help \
150 "socket=<SOCKNAME> The Unix socket to connect to.\n" \
151 "hostname=<HOST> The hostname for the TCP socket to connect to.\n" \
152 "port=<PORT> TCP port or service name to use (default 10809).\n" \
153 "export=<NAME> Export name to connect to (default \"\").\n" \
155 #define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
157 /* The per-transaction details */
158 struct transaction {
159 uint64_t cookie;
160 sem_t sem;
161 void *buf;
162 uint64_t offset;
163 uint32_t count;
164 uint32_t err;
165 struct nbdkit_extents *extents;
166 struct transaction *next;
169 /* The per-connection handle */
170 struct handle {
171 /* These fields are read-only once initialized */
172 int fd;
173 int flags;
174 int64_t size;
175 bool structured;
176 bool extents;
177 pthread_t reader;
179 /* Prevents concurrent threads from interleaving writes to server */
180 pthread_mutex_t write_lock;
182 pthread_mutex_t trans_lock; /* Covers access to all fields below */
183 struct transaction *trans;
184 uint64_t unique;
185 bool dead;
188 /* Read an entire buffer, returning 0 on success or -1 with errno set. */
189 static int
190 read_full (int fd, void *buf, size_t len)
192 ssize_t r;
194 while (len) {
195 r = read (fd, buf, len);
196 if (r < 0) {
197 if (errno == EINTR || errno == EAGAIN)
198 continue;
199 return -1;
201 if (!r) {
202 /* Unexpected EOF */
203 errno = EBADMSG;
204 return -1;
206 buf += r;
207 len -= r;
209 return 0;
212 /* Write an entire buffer, returning 0 on success or -1 with errno set. */
213 static int
214 write_full (int fd, const void *buf, size_t len)
216 ssize_t r;
218 while (len) {
219 r = write (fd, buf, len);
220 if (r < 0) {
221 if (errno == EINTR || errno == EAGAIN)
222 continue;
223 return -1;
225 buf += r;
226 len -= r;
228 return 0;
231 /* Called during transmission phases when there is no hope of
232 * resynchronizing with the server, and all further requests from the
233 * client will fail. Returns -1 for convenience. */
234 static int
235 nbd_mark_dead (struct handle *h)
237 int err = errno;
239 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
240 if (!h->dead) {
241 nbdkit_debug ("permanent failure while talking to server %s: %m",
242 servname);
243 h->dead = true;
245 else if (!err)
246 errno = ESHUTDOWN;
247 /* NBD only accepts a limited set of errno values over the wire, and
248 nbdkit converts all other values to EINVAL. If we died due to an
249 errno value that cannot transmit over the wire, translate it to
250 ESHUTDOWN instead. */
251 if (err == EPIPE || err == EBADMSG)
252 nbdkit_set_error (ESHUTDOWN);
253 return -1;
256 /* Find and possibly remove the transaction corresponding to cookie
257 from the list. */
258 static struct transaction *
259 find_trans_by_cookie (struct handle *h, uint64_t cookie, bool remove)
261 struct transaction **ptr;
262 struct transaction *trans;
264 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
265 ptr = &h->trans;
266 while ((trans = *ptr) != NULL) {
267 if (cookie == trans->cookie)
268 break;
269 ptr = &trans->next;
271 if (trans && remove)
272 *ptr = trans->next;
273 return trans;
276 /* Send a request, return 0 on success or -1 on write failure. */
277 static int
278 nbd_request_raw (struct handle *h, uint16_t flags, uint16_t type,
279 uint64_t offset, uint32_t count, uint64_t cookie,
280 const void *buf)
282 struct request req = {
283 .magic = htobe32 (NBD_REQUEST_MAGIC),
284 .flags = htobe16 (flags),
285 .type = htobe16 (type),
286 .handle = cookie, /* Opaque to server, so endianness doesn't matter */
287 .offset = htobe64 (offset),
288 .count = htobe32 (count),
290 int r;
292 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->write_lock);
293 nbdkit_debug ("sending request type %d (%s), flags %#x, offset %#" PRIx64
294 ", count %#x, cookie %#" PRIx64, type, name_of_nbd_cmd (type),
295 flags, offset, count, cookie);
296 r = write_full (h->fd, &req, sizeof req);
297 if (buf && !r)
298 r = write_full (h->fd, buf, count);
299 return r;
302 /* Perform the request half of a transaction. On success, return the
303 transaction; on error return NULL. */
304 static struct transaction *
305 nbd_request_full (struct handle *h, uint16_t flags, uint16_t type,
306 uint64_t offset, uint32_t count, const void *req_buf,
307 void *rep_buf, struct nbdkit_extents *extents)
309 int err;
310 struct transaction *trans;
311 uint64_t cookie;
313 trans = calloc (1, sizeof *trans);
314 if (!trans) {
315 nbdkit_error ("unable to track transaction: %m");
316 /* Still in sync with server, so don't mark connection dead */
317 return NULL;
319 if (sem_init (&trans->sem, 0, 0)) {
320 nbdkit_error ("unable to create semaphore: %m");
321 /* Still in sync with server, so don't mark connection dead */
322 free (trans);
323 return NULL;
325 trans->buf = rep_buf;
326 trans->count = rep_buf ? count : 0;
327 trans->offset = offset;
328 trans->extents = extents;
330 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
331 if (h->dead)
332 goto err;
333 cookie = trans->cookie = h->unique++;
334 trans->next = h->trans;
335 h->trans = trans;
337 if (nbd_request_raw (h, flags, type, offset, count, cookie, req_buf) == 0)
338 return trans;
339 trans = find_trans_by_cookie (h, cookie, true);
341 err:
342 err = errno;
343 if (sem_destroy (&trans->sem))
344 abort ();
345 free (trans);
346 nbd_mark_dead (h);
347 errno = err;
348 return NULL;
351 /* Shorthand for nbd_request_full when no extra buffers are involved. */
352 static struct transaction *
353 nbd_request (struct handle *h, uint16_t flags, uint16_t type, uint64_t offset,
354 uint32_t count)
356 return nbd_request_full (h, flags, type, offset, count, NULL, NULL, NULL);
359 /* Read a reply, and look up the corresponding transaction.
360 Return the server's non-negative answer (converted to local errno
361 value) on success, or -1 on read failure. If structured replies
362 were negotiated, trans_out is set to NULL if there are still more replies
363 expected. */
364 static int
365 nbd_reply_raw (struct handle *h, struct transaction **trans_out)
367 union {
368 struct simple_reply simple;
369 struct structured_reply structured;
370 } rep;
371 struct transaction *trans;
372 void *buf = NULL;
373 CLEANUP_FREE char *payload = NULL;
374 uint32_t count;
375 uint32_t id;
376 struct block_descriptor *extents = NULL;
377 size_t nextents = 0;
378 int error = NBD_SUCCESS;
379 bool more = false;
380 uint32_t len = 0; /* 0 except for structured reads */
381 uint64_t offset = 0; /* if len, absolute offset of structured read chunk */
382 bool zero = false; /* if len, whether to read or memset */
383 uint16_t errlen;
385 *trans_out = NULL;
386 /* magic and handle overlap between simple and structured replies */
387 if (read_full (h->fd, &rep, sizeof rep.simple))
388 return nbd_mark_dead (h);
389 rep.simple.magic = be32toh (rep.simple.magic);
390 switch (rep.simple.magic) {
391 case NBD_SIMPLE_REPLY_MAGIC:
392 nbdkit_debug ("received simple reply for cookie %#" PRIx64 ", status %s",
393 rep.simple.handle,
394 name_of_nbd_error (be32toh (rep.simple.error)));
395 error = be32toh (rep.simple.error);
396 break;
397 case NBD_STRUCTURED_REPLY_MAGIC:
398 if (!h->structured) {
399 nbdkit_error ("structured response without negotiation");
400 return nbd_mark_dead (h);
402 if (read_full (h->fd, sizeof rep.simple + (char *) &rep,
403 sizeof rep - sizeof rep.simple))
404 return nbd_mark_dead (h);
405 rep.structured.flags = be16toh (rep.structured.flags);
406 rep.structured.type = be16toh (rep.structured.type);
407 rep.structured.length = be32toh (rep.structured.length);
408 nbdkit_debug ("received structured reply %s for cookie %#" PRIx64
409 ", payload length %" PRId32,
410 name_of_nbd_reply_type (rep.structured.type),
411 rep.structured.handle, rep.structured.length);
412 if (rep.structured.length > 64 * 1024 * 1024) {
413 nbdkit_error ("structured reply length is suspiciously large: %" PRId32,
414 rep.structured.length);
415 return nbd_mark_dead (h);
417 if (rep.structured.length) {
418 /* Special case for OFFSET_DATA in order to read tail of chunk
419 directly into final buffer later on */
420 len = (rep.structured.type == NBD_REPLY_TYPE_OFFSET_DATA &&
421 rep.structured.length > sizeof offset) ? sizeof offset :
422 rep.structured.length;
423 payload = malloc (len);
424 if (!payload) {
425 nbdkit_error ("reading structured reply payload: %m");
426 return nbd_mark_dead (h);
428 if (read_full (h->fd, payload, len))
429 return nbd_mark_dead (h);
430 len = 0;
432 more = !(rep.structured.flags & NBD_REPLY_FLAG_DONE);
433 switch (rep.structured.type) {
434 case NBD_REPLY_TYPE_NONE:
435 if (rep.structured.length) {
436 nbdkit_error ("NBD_REPLY_TYPE_NONE with invalid payload");
437 return nbd_mark_dead (h);
439 if (more) {
440 nbdkit_error ("NBD_REPLY_TYPE_NONE without done flag");
441 return nbd_mark_dead (h);
443 break;
444 case NBD_REPLY_TYPE_OFFSET_DATA:
445 if (rep.structured.length <= sizeof offset) {
446 nbdkit_error ("structured reply OFFSET_DATA too small");
447 return nbd_mark_dead (h);
449 memcpy (&offset, payload, sizeof offset);
450 offset = be64toh (offset);
451 len = rep.structured.length - sizeof offset;
452 break;
453 case NBD_REPLY_TYPE_OFFSET_HOLE:
454 if (rep.structured.length != sizeof offset + sizeof len) {
455 nbdkit_error ("structured reply OFFSET_HOLE size incorrect");
456 return nbd_mark_dead (h);
458 memcpy (&offset, payload, sizeof offset);
459 offset = be64toh (offset);
460 memcpy (&len, payload, sizeof len);
461 len = be32toh (len);
462 if (!len) {
463 nbdkit_error ("structured reply OFFSET_HOLE length incorrect");
464 return nbd_mark_dead (h);
466 zero = true;
467 break;
468 case NBD_REPLY_TYPE_BLOCK_STATUS:
469 if (!h->extents) {
470 nbdkit_error ("block status response without negotiation");
471 return nbd_mark_dead (h);
473 if (rep.structured.length < sizeof *extents ||
474 rep.structured.length % sizeof *extents != sizeof id) {
475 nbdkit_error ("structured reply OFFSET_HOLE size incorrect");
476 return nbd_mark_dead (h);
478 nextents = rep.structured.length / sizeof *extents;
479 extents = (struct block_descriptor *) &payload[sizeof id];
480 memcpy (&id, payload, sizeof id);
481 id = be32toh (id);
482 nbdkit_debug ("parsing %zu extents for context id %" PRId32,
483 nextents, id);
484 break;
485 default:
486 if (!NBD_REPLY_TYPE_IS_ERR (rep.structured.type)) {
487 nbdkit_error ("received unexpected structured reply %s",
488 name_of_nbd_reply_type (rep.structured.type));
489 return nbd_mark_dead (h);
492 if (rep.structured.length < sizeof error + sizeof errlen) {
493 nbdkit_error ("structured reply error size incorrect");
494 return nbd_mark_dead (h);
496 memcpy (&errlen, payload + sizeof error, sizeof errlen);
497 errlen = be16toh (errlen);
498 if (errlen > rep.structured.length - sizeof error - sizeof errlen) {
499 nbdkit_error ("structured reply error message size incorrect");
500 return nbd_mark_dead (h);
502 memcpy (&error, payload, sizeof error);
503 error = be32toh (error);
504 if (errlen)
505 nbdkit_debug ("received structured error %s with message: %.*s",
506 name_of_nbd_error (error), (int) errlen,
507 payload + sizeof error + sizeof errlen);
508 else
509 nbdkit_debug ("received structured error %s without message",
510 name_of_nbd_error (error));
512 break;
514 default:
515 nbdkit_error ("received unexpected magic in reply: %#" PRIx32,
516 rep.simple.magic);
517 return nbd_mark_dead (h);
520 trans = find_trans_by_cookie (h, rep.simple.handle, !more);
521 if (!trans) {
522 nbdkit_error ("reply with unexpected cookie %#" PRIx64, rep.simple.handle);
523 return nbd_mark_dead (h);
526 buf = trans->buf;
527 count = trans->count;
528 if (nextents) {
529 if (!trans->extents) {
530 nbdkit_error ("block status response to a non-status command");
531 return nbd_mark_dead (h);
533 offset = trans->offset;
534 for (size_t i = 0; i < nextents; i++) {
535 /* We rely on the fact that NBDKIT_EXTENT_* match NBD_STATE_* */
536 if (nbdkit_add_extent (trans->extents, offset,
537 be32toh (extents[i].length),
538 be32toh (extents[i].status_flags)) == -1) {
539 error = errno;
540 break;
542 offset += be32toh (extents[i].length);
545 if (buf && h->structured && rep.simple.magic == NBD_SIMPLE_REPLY_MAGIC) {
546 nbdkit_error ("simple read reply when structured was expected");
547 return nbd_mark_dead (h);
549 if (len) {
550 if (!buf) {
551 nbdkit_error ("structured read response to a non-read command");
552 return nbd_mark_dead (h);
554 if (offset < trans->offset || offset > INT64_MAX ||
555 offset + len > trans->offset + count) {
556 nbdkit_error ("structured read reply with unexpected offset/length");
557 return nbd_mark_dead (h);
559 buf = (char *) buf + offset - trans->offset;
560 if (zero) {
561 memset (buf, 0, len);
562 buf = NULL;
564 else
565 count = len;
568 /* Thanks to structured replies, we must preserve an error in any
569 earlier chunk for replay during the final chunk. */
570 if (!more) {
571 *trans_out = trans;
572 if (!error)
573 error = trans->err;
575 else if (error && !trans->err)
576 trans->err = error;
578 /* Convert from wire value to local errno, and perform any final read */
579 switch (error) {
580 case NBD_SUCCESS:
581 if (buf && read_full (h->fd, buf, count))
582 return nbd_mark_dead (h);
583 return 0;
584 case NBD_EPERM:
585 return EPERM;
586 case NBD_EIO:
587 return EIO;
588 case NBD_ENOMEM:
589 return ENOMEM;
590 default:
591 nbdkit_debug ("unexpected error %d, squashing to EINVAL", error);
592 /* fallthrough */
593 case NBD_EINVAL:
594 return EINVAL;
595 case NBD_ENOSPC:
596 return ENOSPC;
597 case NBD_EOVERFLOW:
598 return EOVERFLOW;
599 case NBD_ESHUTDOWN:
600 return ESHUTDOWN;
604 /* Reader loop. */
605 void *
606 nbd_reader (void *handle)
608 struct handle *h = handle;
609 bool done = false;
610 int r;
612 while (!done) {
613 struct transaction *trans;
615 r = nbd_reply_raw (h, &trans);
616 if (r >= 0) {
617 if (!trans)
618 nbdkit_debug ("partial reply handled, waiting for final reply");
619 else {
620 trans->err = r;
621 if (sem_post (&trans->sem)) {
622 nbdkit_error ("failed to post semaphore: %m");
623 abort ();
627 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
628 done = h->dead;
631 /* Clean up any stranded in-flight requests */
632 r = ESHUTDOWN;
633 while (1) {
634 struct transaction *trans;
637 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock);
638 trans = h->trans;
639 h->trans = trans ? trans->next : NULL;
641 if (!trans)
642 break;
643 trans->err = r;
644 if (sem_post (&trans->sem)) {
645 nbdkit_error ("failed to post semaphore: %m");
646 abort ();
649 return NULL;
652 /* Perform the reply half of a transaction. */
653 static int
654 nbd_reply (struct handle *h, struct transaction *trans)
656 int err;
658 while ((err = sem_wait (&trans->sem)) == -1 && errno == EINTR)
659 /* try again */;
660 if (err) {
661 nbdkit_debug ("failed to wait on semaphore: %m");
662 err = EIO;
664 else
665 err = trans->err;
666 if (sem_destroy (&trans->sem))
667 abort ();
668 free (trans);
669 errno = err;
670 return err ? -1 : 0;
673 /* Receive response to @option into @reply, and consume any
674 payload. If @payload is non-NULL, caller must free *payload. Return
675 0 on success, or -1 if communication to server is no longer
676 possible. */
677 static int
678 nbd_newstyle_recv_option_reply (struct handle *h, uint32_t option,
679 struct fixed_new_option_reply *reply,
680 void **payload)
682 CLEANUP_FREE char *buffer = NULL;
684 if (payload)
685 *payload = NULL;
686 if (read_full (h->fd, reply, sizeof *reply)) {
687 nbdkit_error ("unable to read option reply: %m");
688 return -1;
690 reply->magic = be64toh (reply->magic);
691 reply->option = be32toh (reply->option);
692 reply->reply = be32toh (reply->reply);
693 reply->replylen = be32toh (reply->replylen);
694 if (reply->magic != NBD_REP_MAGIC || reply->option != option) {
695 nbdkit_error ("unexpected option reply");
696 return -1;
698 if (reply->replylen) {
699 if (reply->reply == NBD_REP_ACK) {
700 nbdkit_error ("NBD_REP_ACK should not have replylen %" PRId32,
701 reply->replylen);
702 return -1;
704 if (reply->replylen > 16 * 1024 * 1024) {
705 nbdkit_error ("option reply length is suspiciously large: %" PRId32,
706 reply->replylen);
707 return -1;
709 /* buffer is a string for NBD_REP_ERR_*; adding a NUL terminator
710 makes that string easier to use, without hurting other reply
711 types where buffer is not a string */
712 buffer = malloc (reply->replylen + 1);
713 if (!buffer) {
714 nbdkit_error ("malloc: %m");
715 return -1;
717 if (read_full (h->fd, buffer, reply->replylen)) {
718 nbdkit_error ("unable to read option reply payload: %m");
719 return -1;
721 buffer[reply->replylen] = '\0';
722 if (!payload)
723 nbdkit_debug ("ignoring option reply payload");
724 else {
725 *payload = buffer;
726 buffer = NULL;
729 return 0;
732 /* Attempt to negotiate structured reads, block status, and NBD_OPT_GO.
733 Return 1 if haggling completed, 0 if haggling failed but
734 NBD_OPT_EXPORT_NAME is still viable, or -1 on inability to connect. */
735 static int
736 nbd_newstyle_haggle (struct handle *h)
738 const char *const query = "base:allocation";
739 struct new_option opt;
740 uint32_t exportnamelen = htobe32 (strlen (export));
741 uint32_t nrqueries = htobe32 (1);
742 uint32_t querylen = htobe32 (strlen (query));
743 /* For now, we make no NBD_INFO_* requests, relying on the server to
744 send its defaults. TODO: nbdkit should let plugins report block
745 sizes, at which point we should request NBD_INFO_BLOCK_SIZE and
746 obey any sizes set by server. */
747 uint16_t nrinfos = htobe16 (0);
748 struct fixed_new_option_reply reply;
750 nbdkit_debug ("trying NBD_OPT_STRUCTURED_REPLY");
751 opt.version = htobe64 (NEW_VERSION);
752 opt.option = htobe32 (NBD_OPT_STRUCTURED_REPLY);
753 opt.optlen = htobe32 (0);
754 if (write_full (h->fd, &opt, sizeof opt)) {
755 nbdkit_error ("unable to request NBD_OPT_STRUCTURED_REPLY: %m");
756 return -1;
758 if (nbd_newstyle_recv_option_reply (h, NBD_OPT_STRUCTURED_REPLY, &reply,
759 NULL) < 0)
760 return -1;
761 if (reply.reply == NBD_REP_ACK) {
762 nbdkit_debug ("structured replies enabled, trying NBD_OPT_SET_META_CONTEXT");
763 h->structured = true;
765 opt.version = htobe64 (NEW_VERSION);
766 opt.option = htobe32 (NBD_OPT_SET_META_CONTEXT);
767 opt.optlen = htobe32 (sizeof exportnamelen + strlen (export) +
768 sizeof nrqueries + sizeof querylen + strlen (query));
769 if (write_full (h->fd, &opt, sizeof opt) ||
770 write_full (h->fd, &exportnamelen, sizeof exportnamelen) ||
771 write_full (h->fd, export, strlen (export)) ||
772 write_full (h->fd, &nrqueries, sizeof nrqueries) ||
773 write_full (h->fd, &querylen, sizeof querylen) ||
774 write_full (h->fd, query, strlen (query))) {
775 nbdkit_error ("unable to request NBD_OPT_SET_META_CONTEXT: %m");
776 return -1;
778 if (nbd_newstyle_recv_option_reply (h, NBD_OPT_SET_META_CONTEXT, &reply,
779 NULL) < 0)
780 return -1;
781 if (reply.reply == NBD_REP_META_CONTEXT) {
782 /* Cheat: we asked for exactly one context. We could double
783 check that the server is replying with exactly the
784 "base:allocation" context, and then remember the id it tells
785 us to later confirm that responses to NBD_CMD_BLOCK_STATUS
786 match up; but in the absence of multiple contexts, it's
787 easier to just assume the server is compliant, and will reuse
788 the same id, without bothering to check further. */
789 nbdkit_debug ("extents enabled");
790 h->extents = true;
791 if (nbd_newstyle_recv_option_reply (h, NBD_OPT_SET_META_CONTEXT, &reply,
792 NULL) < 0)
793 return -1;
795 if (reply.reply != NBD_REP_ACK) {
796 if (h->extents) {
797 nbdkit_error ("unexpected response to set meta context");
798 return -1;
800 nbdkit_debug ("ignoring meta context response %s",
801 name_of_nbd_rep (reply.reply));
804 else {
805 nbdkit_debug ("structured replies disabled");
808 /* Try NBD_OPT_GO */
809 nbdkit_debug ("trying NBD_OPT_GO");
810 opt.version = htobe64 (NEW_VERSION);
811 opt.option = htobe32 (NBD_OPT_GO);
812 opt.optlen = htobe32 (sizeof exportnamelen + strlen (export) +
813 sizeof nrinfos);
814 if (write_full (h->fd, &opt, sizeof opt) ||
815 write_full (h->fd, &exportnamelen, sizeof exportnamelen) ||
816 write_full (h->fd, export, strlen (export)) ||
817 write_full (h->fd, &nrinfos, sizeof nrinfos)) {
818 nbdkit_error ("unable to request NBD_OPT_GO: %m");
819 return -1;
821 while (1) {
822 CLEANUP_FREE void *buffer;
823 struct fixed_new_option_reply_info_export *reply_export;
824 uint16_t info;
826 if (nbd_newstyle_recv_option_reply (h, NBD_OPT_GO, &reply, &buffer) < 0)
827 return -1;
828 switch (reply.reply) {
829 case NBD_REP_INFO:
830 /* Parse payload, but ignore all except NBD_INFO_EXPORT */
831 if (reply.replylen < 2) {
832 nbdkit_error ("NBD_REP_INFO reply too short");
833 return -1;
835 memcpy (&info, buffer, sizeof info);
836 info = be16toh (info);
837 switch (info) {
838 case NBD_INFO_EXPORT:
839 if (reply.replylen != sizeof *reply_export) {
840 nbdkit_error ("NBD_INFO_EXPORT reply wrong size");
841 return -1;
843 reply_export = buffer;
844 h->size = be64toh (reply_export->exportsize);
845 h->flags = be16toh (reply_export->eflags);
846 break;
847 default:
848 nbdkit_debug ("ignoring server info %d", info);
850 break;
851 case NBD_REP_ACK:
852 /* End of replies, valid if server already sent NBD_INFO_EXPORT,
853 observable since h->flags must contain NBD_FLAG_HAS_FLAGS */
854 assert (!buffer);
855 if (!h->flags) {
856 nbdkit_error ("server omitted NBD_INFO_EXPORT reply to NBD_OPT_GO");
857 return -1;
859 nbdkit_debug ("NBD_OPT_GO complete");
860 return 1;
861 case NBD_REP_ERR_UNSUP:
862 /* Special case this failure to fall back to NBD_OPT_EXPORT_NAME */
863 nbdkit_debug ("server lacks NBD_OPT_GO support");
864 return 0;
865 default:
866 /* Unexpected. Either the server sent a legitimate error or an
867 unexpected reply, but either way, we can't connect. */
868 if (NBD_REP_IS_ERR (reply.reply))
869 if (reply.replylen)
870 nbdkit_error ("server rejected NBD_OPT_GO with %s: %s",
871 name_of_nbd_rep (reply.reply), (char *) buffer);
872 else
873 nbdkit_error ("server rejected NBD_OPT_GO with %s",
874 name_of_nbd_rep (reply.reply));
875 else
876 nbdkit_error ("server used unexpected reply %s to NBD_OPT_GO",
877 name_of_nbd_rep (reply.reply));
878 return -1;
883 /* Connect to a Unix socket */
884 static int
885 nbd_connect_unix (struct handle *h)
887 struct sockaddr_un sock = { .sun_family = AF_UNIX };
889 nbdkit_debug ("connecting to Unix socket name=%s", sockname);
890 h->fd = socket (AF_UNIX, SOCK_STREAM, 0);
891 if (h->fd < 0) {
892 nbdkit_error ("socket: %m");
893 return -1;
896 /* We already validated length during nbd_config_complete */
897 assert (strlen (sockname) <= sizeof sock.sun_path);
898 memcpy (sock.sun_path, sockname, strlen (sockname));
899 if (connect (h->fd, (const struct sockaddr *) &sock, sizeof sock) < 0) {
900 nbdkit_error ("connect: %m");
901 return -1;
903 return 0;
906 /* Connect to a TCP socket */
907 static int
908 nbd_connect_tcp (struct handle *h)
910 struct addrinfo hints = { .ai_family = AF_UNSPEC,
911 .ai_socktype = SOCK_STREAM, };
912 struct addrinfo *result, *rp;
913 int r;
914 const int optval = 1;
916 nbdkit_debug ("connecting to TCP socket host=%s port=%s", hostname, port);
917 r = getaddrinfo (hostname, port, &hints, &result);
918 if (r != 0) {
919 nbdkit_error ("getaddrinfo: %s", gai_strerror (r));
920 return -1;
923 for (rp = result; rp; rp = rp->ai_next) {
924 h->fd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
925 if (h->fd == -1)
926 continue;
927 if (connect (h->fd, rp->ai_addr, rp->ai_addrlen) != -1)
928 break;
929 close (h->fd);
931 freeaddrinfo (result);
932 if (rp == NULL) {
933 nbdkit_error ("connect: %m");
934 return -1;
937 if (setsockopt (h->fd, IPPROTO_TCP, TCP_NODELAY, &optval,
938 sizeof (int)) == -1) {
939 nbdkit_error ("cannot set TCP_NODELAY option: %m");
940 return -1;
942 return 0;
945 /* Create the per-connection handle. */
946 static void *
947 nbd_open (int readonly)
949 struct handle *h;
950 struct old_handshake old;
951 uint64_t version;
953 h = calloc (1, sizeof *h);
954 if (h == NULL) {
955 nbdkit_error ("malloc: %m");
956 return NULL;
958 h->fd = -1;
960 if (sockname) {
961 if (nbd_connect_unix (h) == -1)
962 goto err;
964 else if (nbd_connect_tcp (h) == -1)
965 goto err;
967 /* old and new handshake share same meaning of first 16 bytes */
968 if (read_full (h->fd, &old, offsetof (struct old_handshake, exportsize))) {
969 nbdkit_error ("unable to read magic: %m");
970 goto err;
972 if (strncmp (old.nbdmagic, "NBDMAGIC", sizeof old.nbdmagic)) {
973 nbdkit_error ("wrong magic, %s is not an NBD server", servname);
974 goto err;
976 version = be64toh (old.version);
977 if (version == OLD_VERSION) {
978 nbdkit_debug ("trying oldstyle connection");
979 if (read_full (h->fd,
980 (char *) &old + offsetof (struct old_handshake, exportsize),
981 sizeof old - offsetof (struct old_handshake, exportsize))) {
982 nbdkit_error ("unable to read old handshake: %m");
983 goto err;
985 h->size = be64toh (old.exportsize);
986 h->flags = be16toh (old.eflags);
988 else if (version == NEW_VERSION) {
989 uint16_t gflags;
990 uint32_t cflags;
991 struct new_option opt;
992 struct new_handshake_finish finish;
993 size_t expect;
995 nbdkit_debug ("trying newstyle connection");
996 if (read_full (h->fd, &gflags, sizeof gflags)) {
997 nbdkit_error ("unable to read global flags: %m");
998 goto err;
1000 gflags = be16toh (gflags);
1001 cflags = htobe32 (gflags & (NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES));
1002 if (write_full (h->fd, &cflags, sizeof cflags)) {
1003 nbdkit_error ("unable to return global flags: %m");
1004 goto err;
1007 /* Prefer NBD_OPT_GO if possible */
1008 if (gflags & NBD_FLAG_FIXED_NEWSTYLE) {
1009 int rc = nbd_newstyle_haggle (h);
1010 if (rc < 0)
1011 goto err;
1012 if (!rc)
1013 goto export_name;
1015 else {
1016 export_name:
1017 /* Option haggling untried or failed, use older NBD_OPT_EXPORT_NAME */
1018 nbdkit_debug ("trying NBD_OPT_EXPORT_NAME");
1019 opt.version = htobe64 (NEW_VERSION);
1020 opt.option = htobe32 (NBD_OPT_EXPORT_NAME);
1021 opt.optlen = htobe32 (strlen (export));
1022 if (write_full (h->fd, &opt, sizeof opt) ||
1023 write_full (h->fd, export, strlen (export))) {
1024 nbdkit_error ("unable to request export '%s': %m", export);
1025 goto err;
1027 expect = sizeof finish;
1028 if (gflags & NBD_FLAG_NO_ZEROES)
1029 expect -= sizeof finish.zeroes;
1030 if (read_full (h->fd, &finish, expect)) {
1031 nbdkit_error ("unable to read new handshake: %m");
1032 goto err;
1034 h->size = be64toh (finish.exportsize);
1035 h->flags = be16toh (finish.eflags);
1038 else {
1039 nbdkit_error ("unexpected version %#" PRIx64, version);
1040 goto err;
1042 if (readonly)
1043 h->flags |= NBD_FLAG_READ_ONLY;
1045 /* Spawn a dedicated reader thread */
1046 if ((errno = pthread_mutex_init (&h->write_lock, NULL))) {
1047 nbdkit_error ("failed to initialize write mutex: %m");
1048 goto err;
1050 if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) {
1051 nbdkit_error ("failed to initialize transaction mutex: %m");
1052 pthread_mutex_destroy (&h->write_lock);
1053 goto err;
1055 if ((errno = pthread_create (&h->reader, NULL, nbd_reader, h))) {
1056 nbdkit_error ("failed to initialize reader thread: %m");
1057 pthread_mutex_destroy (&h->write_lock);
1058 pthread_mutex_destroy (&h->trans_lock);
1059 goto err;
1062 return h;
1064 err:
1065 if (h->fd >= 0)
1066 close (h->fd);
1067 free (h);
1068 return NULL;
1071 /* Free up the per-connection handle. */
1072 static void
1073 nbd_close (void *handle)
1075 struct handle *h = handle;
1077 if (!h->dead) {
1078 nbd_request_raw (h, 0, NBD_CMD_DISC, 0, 0, 0, NULL);
1079 shutdown (h->fd, SHUT_WR);
1081 if ((errno = pthread_join (h->reader, NULL)))
1082 nbdkit_debug ("failed to join reader thread: %m");
1083 close (h->fd);
1084 pthread_mutex_destroy (&h->write_lock);
1085 pthread_mutex_destroy (&h->trans_lock);
1086 free (h);
1089 /* Get the file size. */
1090 static int64_t
1091 nbd_get_size (void *handle)
1093 struct handle *h = handle;
1095 return h->size;
1098 static int
1099 nbd_can_write (void *handle)
1101 struct handle *h = handle;
1103 return !(h->flags & NBD_FLAG_READ_ONLY);
1106 static int
1107 nbd_can_flush (void *handle)
1109 struct handle *h = handle;
1111 return h->flags & NBD_FLAG_SEND_FLUSH;
1114 static int
1115 nbd_is_rotational (void *handle)
1117 struct handle *h = handle;
1119 return h->flags & NBD_FLAG_ROTATIONAL;
1122 static int
1123 nbd_can_trim (void *handle)
1125 struct handle *h = handle;
1127 return h->flags & NBD_FLAG_SEND_TRIM;
1130 static int
1131 nbd_can_zero (void *handle)
1133 struct handle *h = handle;
1135 return h->flags & NBD_FLAG_SEND_WRITE_ZEROES;
1138 static int
1139 nbd_can_fua (void *handle)
1141 struct handle *h = handle;
1143 return h->flags & NBD_FLAG_SEND_FUA ? NBDKIT_FUA_NATIVE : NBDKIT_FUA_NONE;
1146 static int
1147 nbd_can_multi_conn (void *handle)
1149 struct handle *h = handle;
1151 return h->flags & NBD_FLAG_CAN_MULTI_CONN;
1154 static int
1155 nbd_can_cache (void *handle)
1157 struct handle *h = handle;
1159 if (h->flags & NBD_FLAG_SEND_CACHE)
1160 return NBDKIT_CACHE_NATIVE;
1161 return NBDKIT_CACHE_NONE;
1164 static int
1165 nbd_can_extents (void *handle)
1167 struct handle *h = handle;
1169 return h->extents;
1172 /* Read data from the file. */
1173 static int
1174 nbd_pread (void *handle, void *buf, uint32_t count, uint64_t offset,
1175 uint32_t flags)
1177 struct handle *h = handle;
1178 struct transaction *s;
1180 assert (!flags);
1181 s = nbd_request_full (h, 0, NBD_CMD_READ, offset, count, NULL, buf, NULL);
1182 return s ? nbd_reply (h, s) : -1;
1185 /* Write data to the file. */
1186 static int
1187 nbd_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset,
1188 uint32_t flags)
1190 struct handle *h = handle;
1191 struct transaction *s;
1193 assert (!(flags & ~NBDKIT_FLAG_FUA));
1194 s = nbd_request_full (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
1195 NBD_CMD_WRITE, offset, count, buf, NULL, NULL);
1196 return s ? nbd_reply (h, s) : -1;
1199 /* Write zeroes to the file. */
1200 static int
1201 nbd_zero (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
1203 struct handle *h = handle;
1204 struct transaction *s;
1205 int f = 0;
1207 assert (!(flags & ~(NBDKIT_FLAG_FUA | NBDKIT_FLAG_MAY_TRIM)));
1208 assert (h->flags & NBD_FLAG_SEND_WRITE_ZEROES);
1210 if (!(flags & NBDKIT_FLAG_MAY_TRIM))
1211 f |= NBD_CMD_FLAG_NO_HOLE;
1212 if (flags & NBDKIT_FLAG_FUA)
1213 f |= NBD_CMD_FLAG_FUA;
1214 s = nbd_request (h, f, NBD_CMD_WRITE_ZEROES, offset, count);
1215 return s ? nbd_reply (h, s) : -1;
1218 /* Trim a portion of the file. */
1219 static int
1220 nbd_trim (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
1222 struct handle *h = handle;
1223 struct transaction *s;
1225 assert (!(flags & ~NBDKIT_FLAG_FUA));
1226 s = nbd_request (h, flags & NBDKIT_FLAG_FUA ? NBD_CMD_FLAG_FUA : 0,
1227 NBD_CMD_TRIM, offset, count);
1228 return s ? nbd_reply (h, s) : -1;
1231 /* Flush the file to disk. */
1232 static int
1233 nbd_flush (void *handle, uint32_t flags)
1235 struct handle *h = handle;
1236 struct transaction *s;
1238 assert (!flags);
1239 s = nbd_request (h, 0, NBD_CMD_FLUSH, 0, 0);
1240 return s ? nbd_reply (h, s) : -1;
1243 /* Read extents of the file. */
1244 static int
1245 nbd_extents (void *handle, uint32_t count, uint64_t offset,
1246 uint32_t flags, struct nbdkit_extents *extents)
1248 struct handle *h = handle;
1249 struct transaction *s;
1251 assert (!(flags & ~NBDKIT_FLAG_REQ_ONE) && h->extents);
1252 s = nbd_request_full (h, flags & NBDKIT_FLAG_REQ_ONE ? NBD_CMD_FLAG_REQ_ONE : 0,
1253 NBD_CMD_BLOCK_STATUS, offset, count, NULL, NULL,
1254 extents);
1255 return s ? nbd_reply (h, s) : -1;
1258 /* Cache a portion of the file. */
1259 static int
1260 nbd_cache (void *handle, uint32_t count, uint64_t offset, uint32_t flags)
1262 struct handle *h = handle;
1263 struct transaction *s;
1265 assert (!flags);
1266 s = nbd_request (h, 0, NBD_CMD_CACHE, offset, count);
1267 return s ? nbd_reply (h, s) : -1;
1270 static struct nbdkit_plugin plugin = {
1271 .name = "nbd",
1272 .longname = "nbdkit nbd plugin",
1273 .version = PACKAGE_VERSION,
1274 .unload = nbd_unload,
1275 .config = nbd_config,
1276 .config_complete = nbd_config_complete,
1277 .config_help = nbd_config_help,
1278 .open = nbd_open,
1279 .close = nbd_close,
1280 .get_size = nbd_get_size,
1281 .can_write = nbd_can_write,
1282 .can_flush = nbd_can_flush,
1283 .is_rotational = nbd_is_rotational,
1284 .can_trim = nbd_can_trim,
1285 .can_zero = nbd_can_zero,
1286 .can_fua = nbd_can_fua,
1287 .can_multi_conn = nbd_can_multi_conn,
1288 .can_extents = nbd_can_extents,
1289 .can_cache = nbd_can_cache,
1290 .pread = nbd_pread,
1291 .pwrite = nbd_pwrite,
1292 .zero = nbd_zero,
1293 .flush = nbd_flush,
1294 .trim = nbd_trim,
1295 .extents = nbd_extents,
1296 .cache = nbd_cache,
1297 .errno_is_preserved = 1,
1300 NBDKIT_REGISTER_PLUGIN (plugin)