Version 1.33.7.
[nbdkit.git] / server / connections.c
blob4d776f2a2d40ea8f272fdf0a4aada29ce5fde793
1 /* nbdkit
2 * Copyright (C) 2013-2022 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 #include <config.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <stdint.h>
38 #include <stdbool.h>
39 #include <inttypes.h>
40 #include <string.h>
41 #include <unistd.h>
42 #include <fcntl.h>
43 #include <assert.h>
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
47 #endif
49 #include "internal.h"
50 #include "utils.h"
52 /* Default number of parallel requests. */
53 #define DEFAULT_PARALLEL_REQUESTS 16
55 static struct connection *new_connection (int sockin, int sockout,
56 int nworkers);
57 static void free_connection (struct connection *conn);
59 /* Don't call these raw socket functions directly. Use conn->recv etc. */
60 static int raw_recv ( void *buf, size_t len);
61 static int raw_send_socket (const void *buf, size_t len, int flags);
62 #ifndef WIN32
63 static int raw_send_other (const void *buf, size_t len, int flags);
64 #endif
65 static void raw_close (int how);
67 conn_status
68 connection_get_status (void)
70 GET_CONN;
71 conn_status r;
73 if (conn->nworkers &&
74 pthread_mutex_lock (&conn->status_lock))
75 abort ();
76 r = conn->status;
77 if (conn->nworkers &&
78 pthread_mutex_unlock (&conn->status_lock))
79 abort ();
80 return r;
83 /* Update the status if the new value is lower than the existing value. */
84 void
85 connection_set_status (conn_status value)
87 GET_CONN;
89 if (conn->nworkers &&
90 pthread_mutex_lock (&conn->status_lock))
91 abort ();
92 if (value < conn->status) {
93 if (conn->nworkers && conn->status > STATUS_CLIENT_DONE &&
94 value <= STATUS_CLIENT_DONE) {
95 char c = 0;
97 assert (conn->status_pipe[1] >= 0);
98 if (write (conn->status_pipe[1], &c, 1) != 1 && errno != EAGAIN)
99 debug ("failed to notify pipe-to-self: %m");
101 if (conn->status >= STATUS_CLIENT_DONE && value < STATUS_CLIENT_DONE)
102 conn->close (SHUT_WR);
103 conn->status = value;
105 if (conn->nworkers &&
106 pthread_mutex_unlock (&conn->status_lock))
107 abort ();
110 struct worker_data {
111 struct connection *conn;
112 char *name;
115 static void *
116 connection_worker (void *data)
118 struct worker_data *worker = data;
119 struct connection *conn = worker->conn;
120 char *name = worker->name;
122 debug ("starting worker thread %s", name);
123 threadlocal_new_server_thread ();
124 threadlocal_set_name (name);
125 threadlocal_set_conn (conn);
126 free (worker);
128 while (!quit && connection_get_status () > STATUS_CLIENT_DONE)
129 protocol_recv_request_send_reply ();
130 debug ("exiting worker thread %s", threadlocal_get_name ());
131 free (name);
132 return NULL;
135 void
136 handle_single_connection (int sockin, int sockout)
138 const char *plugin_name;
139 int r;
140 struct connection *conn;
141 int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
142 pthread_t *workers = NULL;
144 lock_connection ();
146 /* Because of asynchronous exit it is plausible that a new
147 * connection is started at the same time as the backend is being
148 * shut down. top may therefore be NULL, and if this happens return
149 * immediately.
151 if (!top) {
152 unlock_connection ();
153 return;
156 if (thread_model < NBDKIT_THREAD_MODEL_PARALLEL || nworkers == 1)
157 nworkers = 0;
158 conn = new_connection (sockin, sockout, nworkers);
159 if (!conn)
160 goto done;
162 plugin_name = top->plugin_name (top);
163 threadlocal_set_name (plugin_name);
165 if (top->preconnect (top, read_only) == -1)
166 goto done;
168 /* NBD handshake.
170 * Note that this calls the backend .open callback when it is safe
171 * to do so (eg. after TLS authentication).
173 if (protocol_handshake () == -1)
174 goto done;
175 conn->handshake_complete = true;
177 if (!nworkers) {
178 /* No need for a separate thread. */
179 debug ("handshake complete, processing requests serially");
180 while (!quit && connection_get_status () > STATUS_CLIENT_DONE)
181 protocol_recv_request_send_reply ();
183 else {
184 /* Create thread pool to process requests. */
185 debug ("handshake complete, processing requests with %d threads",
186 nworkers);
187 workers = calloc (nworkers, sizeof *workers);
188 if (unlikely (!workers)) {
189 perror ("malloc");
190 goto done;
193 for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
194 struct worker_data *worker = malloc (sizeof *worker);
195 int err;
197 if (unlikely (!worker)) {
198 perror ("malloc");
199 connection_set_status (STATUS_DEAD);
200 goto wait;
202 if (unlikely (asprintf (&worker->name, "%s.%d", plugin_name, nworkers)
203 < 0)) {
204 perror ("asprintf");
205 connection_set_status (STATUS_DEAD);
206 free (worker);
207 goto wait;
209 worker->conn = conn;
210 err = pthread_create (&workers[nworkers], NULL, connection_worker,
211 worker);
212 if (unlikely (err)) {
213 errno = err;
214 perror ("pthread_create");
215 connection_set_status (STATUS_DEAD);
216 free (worker);
217 goto wait;
221 wait:
222 while (nworkers)
223 pthread_join (workers[--nworkers], NULL);
224 free (workers);
227 /* Finalize (for filters), called just before close. */
228 lock_request ();
229 r = backend_finalize (conn->top_context);
230 unlock_request ();
231 if (r == -1)
232 goto done;
234 done:
235 free_connection (conn);
236 unlock_connection ();
239 static struct connection *
240 new_connection (int sockin, int sockout, int nworkers)
242 struct connection *conn;
243 #ifndef WIN32
244 int opt;
245 socklen_t optlen = sizeof opt;
246 #endif
248 conn = calloc (1, sizeof *conn);
249 if (conn == NULL) {
250 perror ("malloc");
251 return NULL;
253 conn->status_pipe[0] = conn->status_pipe[1] = -1;
255 pthread_mutex_init (&conn->request_lock, NULL);
256 pthread_mutex_init (&conn->read_lock, NULL);
257 pthread_mutex_init (&conn->write_lock, NULL);
258 pthread_mutex_init (&conn->status_lock, NULL);
260 conn->default_exportname = calloc (top->i + 1,
261 sizeof *conn->default_exportname);
262 if (conn->default_exportname == NULL) {
263 perror ("malloc");
264 goto error1;
267 conn->status = STATUS_ACTIVE;
268 conn->nworkers = nworkers;
269 if (nworkers) {
270 #ifdef HAVE_PIPE2
271 if (pipe2 (conn->status_pipe, O_NONBLOCK | O_CLOEXEC)) {
272 perror ("pipe2");
273 goto error2;
275 #else
276 #ifdef HAVE_PIPE
277 /* If we were fully parallel, then this function could be
278 * accepting connections in one thread while another thread could
279 * be in a plugin trying to fork. But plugins.c forced
280 * thread_model to serialize_all_requests when it detects a lack
281 * of atomic CLOEXEC, at which point, we can use a mutex to ensure
282 * we aren't accepting until the plugin is not running, making
283 * non-atomicity okay.
285 assert (thread_model <= NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS);
286 lock_request ();
287 if (pipe (conn->status_pipe)) {
288 perror ("pipe");
289 unlock_request ();
290 goto error2;
292 if (set_nonblock (set_cloexec (conn->status_pipe[0])) == -1) {
293 perror ("fcntl");
294 close (conn->status_pipe[1]);
295 unlock_request ();
296 goto error2;
298 if (set_nonblock (set_cloexec (conn->status_pipe[1])) == -1) {
299 perror ("fcntl");
300 close (conn->status_pipe[0]);
301 unlock_request ();
302 goto error2;
304 unlock_request ();
305 #else /* !HAVE_PIPE2 && !HAVE_PIPE */
306 /* Windows has neither pipe2 nor pipe. XXX */
307 #endif
308 #endif
311 conn->sockin = sockin;
312 conn->sockout = sockout;
313 conn->recv = raw_recv;
314 #ifndef WIN32
315 if (getsockopt (sockout, SOL_SOCKET, SO_TYPE, &opt, &optlen) == 0)
316 conn->send = raw_send_socket;
317 else
318 conn->send = raw_send_other;
319 #else
320 conn->send = raw_send_socket;
321 #endif
322 conn->close = raw_close;
324 threadlocal_set_conn (conn);
326 return conn;
328 #if defined(HAVE_PIPE2) || defined(HAVE_PIPE)
329 error2:
330 #endif
331 if (conn->status_pipe[0] >= 0)
332 close (conn->status_pipe[0]);
333 if (conn->status_pipe[1] >= 0)
334 close (conn->status_pipe[1]);
335 free (conn->default_exportname);
337 error1:
338 pthread_mutex_destroy (&conn->request_lock);
339 pthread_mutex_destroy (&conn->read_lock);
340 pthread_mutex_destroy (&conn->write_lock);
341 pthread_mutex_destroy (&conn->status_lock);
342 free (conn);
343 return NULL;
346 static void
347 free_connection (struct connection *conn)
349 struct backend *b;
351 if (!conn)
352 return;
354 conn->close (SHUT_RDWR);
356 /* Don't call the plugin again if quit has been set because the main
357 * thread will be in the process of unloading it. The plugin.unload
358 * callback should always be called.
360 if (!quit) {
361 lock_request ();
362 if (conn->top_context) {
363 backend_close (conn->top_context);
364 conn->top_context = NULL;
366 unlock_request ();
369 if (conn->status_pipe[0] >= 0) {
370 close (conn->status_pipe[0]);
371 close (conn->status_pipe[1]);
374 pthread_mutex_destroy (&conn->request_lock);
375 pthread_mutex_destroy (&conn->read_lock);
376 pthread_mutex_destroy (&conn->write_lock);
377 pthread_mutex_destroy (&conn->status_lock);
379 free (conn->exportname_from_set_meta_context);
380 free_interns ();
382 for_each_backend (b)
383 free (conn->default_exportname[b->i]);
384 free (conn->default_exportname);
386 free (conn);
387 threadlocal_set_conn (NULL);
390 /* Write buffer to conn->sockout with send() and either succeed completely
391 * (returns 0) or fail (returns -1). flags may include SEND_MORE as a hint
392 * that this send will be followed by related data.
394 static int
395 raw_send_socket (const void *vbuf, size_t len, int flags)
397 GET_CONN;
398 int sock = conn->sockout;
399 const char *buf = vbuf;
400 ssize_t r;
401 int f = 0;
403 assert (sock >= 0);
404 #ifdef MSG_MORE
405 if (flags & SEND_MORE)
406 f |= MSG_MORE;
407 #endif
408 while (len > 0) {
409 r = send (sock, buf, len, f);
410 if (r == -1) {
411 if (errno == EINTR || errno == EAGAIN)
412 continue;
413 return -1;
415 buf += r;
416 len -= r;
419 return 0;
422 #ifndef WIN32
423 /* Write buffer to conn->sockout with write() and either succeed completely
424 * (returns 0) or fail (returns -1). flags is ignored.
426 static int
427 raw_send_other (const void *vbuf, size_t len, int flags)
429 GET_CONN;
430 int sock = conn->sockout;
431 const char *buf = vbuf;
432 ssize_t r;
434 assert (sock >= 0);
435 while (len > 0) {
436 r = write (sock, buf, len);
437 if (r == -1) {
438 if (errno == EINTR || errno == EAGAIN)
439 continue;
440 return -1;
442 buf += r;
443 len -= r;
446 return 0;
448 #endif /* !WIN32 */
450 /* Read buffer from conn->sockin and either succeed completely
451 * (returns > 0), read an EOF (returns 0), or fail (returns -1).
453 static int
454 raw_recv (void *vbuf, size_t len)
456 GET_CONN;
457 int sock = conn->sockin;
458 char *buf = vbuf;
459 ssize_t r;
460 bool first_read = true;
462 while (len > 0) {
463 /* On Unix we want to use read(2) here because that allows us to
464 * read from non-sockets (think: nbdkit -s). In particular this
465 * makes fuzzing possible. However this is not possible on
466 * Windows where we must use recv.
468 #ifndef WIN32
469 r = read (sock, buf, len);
470 #else
471 r = recv (sock, buf, len, 0);
472 #endif
473 if (r == -1) {
474 if (errno == EINTR || errno == EAGAIN)
475 continue;
476 return -1;
478 if (r == 0) {
479 if (first_read)
480 return 0;
481 /* Partial record read. This is an error. */
482 errno = EBADMSG;
483 return -1;
485 first_read = false;
486 buf += r;
487 len -= r;
490 return 1;
493 /* There's no place in the NBD protocol to send back errors from
494 * close, so this function ignores errors.
496 static void
497 raw_close (int how)
499 GET_CONN;
501 if (conn->sockout >= 0 && how == SHUT_WR) {
502 if (conn->sockin == conn->sockout)
503 shutdown (conn->sockout, how);
504 else
505 closesocket (conn->sockout);
506 conn->sockout = -1;
508 else {
509 if (conn->sockin >= 0)
510 closesocket (conn->sockin);
511 if (conn->sockout >= 0 && conn->sockin != conn->sockout)
512 closesocket (conn->sockout);