Version 1.33.10.
[nbdkit.git] / server / connections.c
blob6d9f6a961be354b540d5d5b7e009b3b0599f0376
1 /* nbdkit
2 * Copyright (C) 2013-2023 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 #include <config.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <stdint.h>
38 #include <stdbool.h>
39 #include <inttypes.h>
40 #include <string.h>
41 #include <unistd.h>
42 #include <fcntl.h>
43 #include <assert.h>
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
49 #include "internal.h"
50 #include "utils.h"
52 /* Default number of parallel requests. */
53 #define DEFAULT_PARALLEL_REQUESTS 16
55 static struct connection *new_connection (int sockin, int sockout,
56 int nworkers);
57 static void free_connection (struct connection *conn);
59 /* Don't call these raw socket functions directly. Use conn->recv etc. */
60 static int raw_recv ( void *buf, size_t len);
61 static int raw_send_socket (const void *buf, size_t len, int flags);
62 #ifndef WIN32
63 static int raw_send_other (const void *buf, size_t len, int flags);
64 #endif
65 static void raw_close (int how);
67 conn_status
68 connection_get_status (void)
70 GET_CONN;
71 conn_status r;
73 if (conn->nworkers &&
74 pthread_mutex_lock (&conn->status_lock))
75 abort ();
76 r = conn->status;
77 if (conn->nworkers &&
78 pthread_mutex_unlock (&conn->status_lock))
79 abort ();
80 return r;
83 /* Update the status if the new value is lower than the existing value.
84 * Return true if the caller should shutdown.
86 bool
87 connection_set_status (conn_status value)
89 GET_CONN;
90 bool ret = false;
92 if (conn->nworkers &&
93 pthread_mutex_lock (&conn->status_lock))
94 abort ();
95 if (value < conn->status) {
96 if (conn->nworkers && conn->status > STATUS_CLIENT_DONE &&
97 value <= STATUS_CLIENT_DONE) {
98 char c = 0;
100 assert (conn->status_pipe[1] >= 0);
101 if (write (conn->status_pipe[1], &c, 1) != 1 && errno != EAGAIN)
102 debug ("failed to notify pipe-to-self: %m");
104 if (conn->status >= STATUS_CLIENT_DONE && value < STATUS_CLIENT_DONE)
105 ret = true;
106 conn->status = value;
108 if (conn->nworkers &&
109 pthread_mutex_unlock (&conn->status_lock))
110 abort ();
111 return ret;
114 struct worker_data {
115 struct connection *conn;
116 char *name;
119 static void *
120 connection_worker (void *data)
122 struct worker_data *worker = data;
123 struct connection *conn = worker->conn;
124 char *name = worker->name;
126 debug ("starting worker thread %s", name);
127 threadlocal_new_server_thread ();
128 threadlocal_set_name (name);
129 threadlocal_set_conn (conn);
130 free (worker);
132 while (!quit && connection_get_status () > STATUS_CLIENT_DONE)
133 if (protocol_recv_request_send_reply ()) {
134 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->write_lock);
135 conn->close (SHUT_WR);
137 debug ("exiting worker thread %s", threadlocal_get_name ());
138 free (name);
139 return NULL;
142 void
143 handle_single_connection (int sockin, int sockout)
145 const char *plugin_name;
146 int r;
147 struct connection *conn;
148 int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
149 pthread_t *workers = NULL;
151 lock_connection ();
153 /* Because of asynchronous exit it is plausible that a new
154 * connection is started at the same time as the backend is being
155 * shut down. top may therefore be NULL, and if this happens return
156 * immediately.
158 if (!top) {
159 unlock_connection ();
160 return;
163 if (thread_model < NBDKIT_THREAD_MODEL_PARALLEL || nworkers == 1)
164 nworkers = 0;
165 conn = new_connection (sockin, sockout, nworkers);
166 if (!conn)
167 goto done;
169 plugin_name = top->plugin_name (top);
170 threadlocal_set_name (plugin_name);
172 if (top->preconnect (top, read_only) == -1)
173 goto done;
175 /* NBD handshake.
177 * Note that this calls the backend .open callback when it is safe
178 * to do so (eg. after TLS authentication).
180 if (protocol_handshake () == -1)
181 goto done;
182 conn->handshake_complete = true;
184 if (!nworkers) {
185 /* No need for a separate thread. */
186 debug ("handshake complete, processing requests serially");
187 while (!quit && connection_get_status () > STATUS_CLIENT_DONE)
188 if (protocol_recv_request_send_reply ())
189 conn->close (SHUT_WR);
191 else {
192 /* Create thread pool to process requests. */
193 debug ("handshake complete, processing requests with %d threads",
194 nworkers);
195 workers = calloc (nworkers, sizeof *workers);
196 if (unlikely (!workers)) {
197 perror ("malloc");
198 goto done;
201 for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
202 struct worker_data *worker = malloc (sizeof *worker);
203 int err;
205 if (unlikely (!worker)) {
206 perror ("malloc");
207 connection_set_status (STATUS_DEAD);
208 goto wait;
210 if (unlikely (asprintf (&worker->name, "%s.%d", plugin_name, nworkers)
211 < 0)) {
212 perror ("asprintf");
213 connection_set_status (STATUS_DEAD);
214 free (worker);
215 goto wait;
217 worker->conn = conn;
218 err = pthread_create (&workers[nworkers], NULL, connection_worker,
219 worker);
220 if (unlikely (err)) {
221 errno = err;
222 perror ("pthread_create");
223 connection_set_status (STATUS_DEAD);
224 free (worker);
225 goto wait;
229 wait:
230 while (nworkers)
231 pthread_join (workers[--nworkers], NULL);
232 free (workers);
235 /* Finalize (for filters), called just before close. */
236 lock_request ();
237 r = backend_finalize (conn->top_context);
238 unlock_request ();
239 if (r == -1)
240 goto done;
242 done:
243 free_connection (conn);
244 unlock_connection ();
247 static struct connection *
248 new_connection (int sockin, int sockout, int nworkers)
250 struct connection *conn;
251 #ifndef WIN32
252 int opt;
253 socklen_t optlen = sizeof opt;
254 #endif
256 conn = calloc (1, sizeof *conn);
257 if (conn == NULL) {
258 perror ("malloc");
259 return NULL;
261 conn->status_pipe[0] = conn->status_pipe[1] = -1;
263 pthread_mutex_init (&conn->request_lock, NULL);
264 pthread_mutex_init (&conn->read_lock, NULL);
265 pthread_mutex_init (&conn->write_lock, NULL);
266 pthread_mutex_init (&conn->status_lock, NULL);
268 conn->default_exportname = calloc (top->i + 1,
269 sizeof *conn->default_exportname);
270 if (conn->default_exportname == NULL) {
271 perror ("malloc");
272 goto error1;
275 conn->status = STATUS_ACTIVE;
276 conn->nworkers = nworkers;
277 if (nworkers) {
278 #ifdef HAVE_PIPE2
279 if (pipe2 (conn->status_pipe, O_NONBLOCK | O_CLOEXEC)) {
280 perror ("pipe2");
281 goto error2;
283 #else
284 #ifdef HAVE_PIPE
285 /* If we were fully parallel, then this function could be
286 * accepting connections in one thread while another thread could
287 * be in a plugin trying to fork. But plugins.c forced
288 * thread_model to serialize_all_requests when it detects a lack
289 * of atomic CLOEXEC, at which point, we can use a mutex to ensure
290 * we aren't accepting until the plugin is not running, making
291 * non-atomicity okay.
293 assert (thread_model <= NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS);
294 lock_request ();
295 if (pipe (conn->status_pipe)) {
296 perror ("pipe");
297 unlock_request ();
298 goto error2;
300 if (set_nonblock (set_cloexec (conn->status_pipe[0])) == -1) {
301 perror ("fcntl");
302 close (conn->status_pipe[1]);
303 unlock_request ();
304 goto error2;
306 if (set_nonblock (set_cloexec (conn->status_pipe[1])) == -1) {
307 perror ("fcntl");
308 close (conn->status_pipe[0]);
309 unlock_request ();
310 goto error2;
312 unlock_request ();
313 #else /* !HAVE_PIPE2 && !HAVE_PIPE */
314 /* Windows has neither pipe2 nor pipe. XXX */
315 #endif
316 #endif
319 conn->sockin = sockin;
320 conn->sockout = sockout;
321 conn->recv = raw_recv;
322 #ifndef WIN32
323 if (getsockopt (sockout, SOL_SOCKET, SO_TYPE, &opt, &optlen) == 0)
324 conn->send = raw_send_socket;
325 else
326 conn->send = raw_send_other;
327 #else
328 conn->send = raw_send_socket;
329 #endif
330 conn->close = raw_close;
332 threadlocal_set_conn (conn);
334 return conn;
336 #if defined(HAVE_PIPE2) || defined(HAVE_PIPE)
337 error2:
338 #endif
339 if (conn->status_pipe[0] >= 0)
340 close (conn->status_pipe[0]);
341 if (conn->status_pipe[1] >= 0)
342 close (conn->status_pipe[1]);
343 free (conn->default_exportname);
345 error1:
346 pthread_mutex_destroy (&conn->request_lock);
347 pthread_mutex_destroy (&conn->read_lock);
348 pthread_mutex_destroy (&conn->write_lock);
349 pthread_mutex_destroy (&conn->status_lock);
350 free (conn);
351 return NULL;
354 static void
355 free_connection (struct connection *conn)
357 struct backend *b;
359 if (!conn)
360 return;
362 conn->close (SHUT_RDWR);
364 /* Don't call the plugin again if quit has been set because the main
365 * thread will be in the process of unloading it. The plugin.unload
366 * callback should always be called.
368 if (!quit) {
369 lock_request ();
370 if (conn->top_context) {
371 backend_close (conn->top_context);
372 conn->top_context = NULL;
374 unlock_request ();
377 if (conn->status_pipe[0] >= 0) {
378 close (conn->status_pipe[0]);
379 close (conn->status_pipe[1]);
382 pthread_mutex_destroy (&conn->request_lock);
383 pthread_mutex_destroy (&conn->read_lock);
384 pthread_mutex_destroy (&conn->write_lock);
385 pthread_mutex_destroy (&conn->status_lock);
387 free (conn->exportname_from_set_meta_context);
388 free_interns ();
390 for_each_backend (b)
391 free (conn->default_exportname[b->i]);
392 free (conn->default_exportname);
394 free (conn);
395 threadlocal_set_conn (NULL);
398 /* Write buffer to conn->sockout with send() and either succeed completely
399 * (returns 0) or fail (returns -1). flags may include SEND_MORE as a hint
400 * that this send will be followed by related data.
402 static int
403 raw_send_socket (const void *vbuf, size_t len, int flags)
405 GET_CONN;
406 int sock = conn->sockout;
407 const char *buf = vbuf;
408 ssize_t r;
409 int f = 0;
411 if (sock < 0) {
412 errno = EBADF;
413 return -1;
415 #ifdef MSG_MORE
416 if (flags & SEND_MORE)
417 f |= MSG_MORE;
418 #endif
419 while (len > 0) {
420 r = send (sock, buf, len, f);
421 if (r == -1) {
422 if (errno == EINTR || errno == EAGAIN)
423 continue;
424 return -1;
426 buf += r;
427 len -= r;
430 return 0;
433 #ifndef WIN32
434 /* Write buffer to conn->sockout with write() and either succeed completely
435 * (returns 0) or fail (returns -1). flags is ignored.
437 static int
438 raw_send_other (const void *vbuf, size_t len, int flags)
440 GET_CONN;
441 int sock = conn->sockout;
442 const char *buf = vbuf;
443 ssize_t r;
445 assert (sock >= 0);
446 while (len > 0) {
447 r = write (sock, buf, len);
448 if (r == -1) {
449 if (errno == EINTR || errno == EAGAIN)
450 continue;
451 return -1;
453 buf += r;
454 len -= r;
457 return 0;
459 #endif /* !WIN32 */
461 /* Read buffer from conn->sockin and either succeed completely
462 * (returns > 0), read an EOF (returns 0), or fail (returns -1).
464 static int
465 raw_recv (void *vbuf, size_t len)
467 GET_CONN;
468 int sock = conn->sockin;
469 char *buf = vbuf;
470 ssize_t r;
471 bool first_read = true;
473 while (len > 0) {
474 /* On Unix we want to use read(2) here because that allows us to
475 * read from non-sockets (think: nbdkit -s). In particular this
476 * makes fuzzing possible. However this is not possible on
477 * Windows where we must use recv.
479 #ifndef WIN32
480 r = read (sock, buf, len);
481 #else
482 r = recv (sock, buf, len, 0);
483 #endif
484 if (r == -1) {
485 if (errno == EINTR || errno == EAGAIN)
486 continue;
487 return -1;
489 if (r == 0) {
490 if (first_read)
491 return 0;
492 /* Partial record read. This is an error. */
493 errno = EBADMSG;
494 return -1;
496 first_read = false;
497 buf += r;
498 len -= r;
501 return 1;
504 /* There's no place in the NBD protocol to send back errors from
505 * close, so this function ignores errors.
507 * If @how == SHUT_WR and conn->nworkers > 1, the caller holds write_lock.
509 static void
510 raw_close (int how)
512 GET_CONN;
514 if (conn->sockout >= 0 && how == SHUT_WR) {
515 if (conn->sockin == conn->sockout)
516 shutdown (conn->sockout, how);
517 else
518 closesocket (conn->sockout);
519 conn->sockout = -1;
521 else {
522 if (conn->sockin >= 0)
523 closesocket (conn->sockin);
524 if (conn->sockout >= 0 && conn->sockin != conn->sockout)
525 closesocket (conn->sockout);