4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
45 #ifdef HAVE_SYS_SOCKET_H
46 #include <sys/socket.h>
52 /* Default number of parallel requests. */
53 #define DEFAULT_PARALLEL_REQUESTS 16
55 static struct connection
*new_connection (int sockin
, int sockout
,
57 static void free_connection (struct connection
*conn
);
59 /* Don't call these raw socket functions directly. Use conn->recv etc. */
60 static int raw_recv ( void *buf
, size_t len
);
61 static int raw_send_socket (const void *buf
, size_t len
, int flags
);
63 static int raw_send_other (const void *buf
, size_t len
, int flags
);
65 static void raw_close (int how
);
68 connection_get_status (void)
74 pthread_mutex_lock (&conn
->status_lock
))
78 pthread_mutex_unlock (&conn
->status_lock
))
83 /* Update the status if the new value is lower than the existing value.
84 * Return true if the caller should shutdown.
87 connection_set_status (conn_status value
)
93 pthread_mutex_lock (&conn
->status_lock
))
95 if (value
< conn
->status
) {
96 if (conn
->nworkers
&& conn
->status
> STATUS_CLIENT_DONE
&&
97 value
<= STATUS_CLIENT_DONE
) {
100 assert (conn
->status_pipe
[1] >= 0);
101 if (write (conn
->status_pipe
[1], &c
, 1) != 1 && errno
!= EAGAIN
)
102 debug ("failed to notify pipe-to-self: %m");
104 if (conn
->status
>= STATUS_CLIENT_DONE
&& value
< STATUS_CLIENT_DONE
)
106 conn
->status
= value
;
108 if (conn
->nworkers
&&
109 pthread_mutex_unlock (&conn
->status_lock
))
115 struct connection
*conn
;
120 connection_worker (void *data
)
122 struct worker_data
*worker
= data
;
123 struct connection
*conn
= worker
->conn
;
124 char *name
= worker
->name
;
126 debug ("starting worker thread %s", name
);
127 threadlocal_new_server_thread ();
128 threadlocal_set_name (name
);
129 threadlocal_set_conn (conn
);
132 while (!quit
&& connection_get_status () > STATUS_CLIENT_DONE
)
133 if (protocol_recv_request_send_reply ()) {
134 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn
->write_lock
);
135 conn
->close (SHUT_WR
);
137 debug ("exiting worker thread %s", threadlocal_get_name ());
143 handle_single_connection (int sockin
, int sockout
)
145 const char *plugin_name
;
147 struct connection
*conn
;
148 int nworkers
= threads
? threads
: DEFAULT_PARALLEL_REQUESTS
;
149 pthread_t
*workers
= NULL
;
153 /* Because of asynchronous exit it is plausible that a new
154 * connection is started at the same time as the backend is being
155 * shut down. top may therefore be NULL, and if this happens return
159 unlock_connection ();
163 if (thread_model
< NBDKIT_THREAD_MODEL_PARALLEL
|| nworkers
== 1)
165 conn
= new_connection (sockin
, sockout
, nworkers
);
169 plugin_name
= top
->plugin_name (top
);
170 threadlocal_set_name (plugin_name
);
172 if (top
->preconnect (top
, read_only
) == -1)
177 * Note that this calls the backend .open callback when it is safe
178 * to do so (eg. after TLS authentication).
180 if (protocol_handshake () == -1)
182 conn
->handshake_complete
= true;
185 /* No need for a separate thread. */
186 debug ("handshake complete, processing requests serially");
187 while (!quit
&& connection_get_status () > STATUS_CLIENT_DONE
)
188 if (protocol_recv_request_send_reply ())
189 conn
->close (SHUT_WR
);
192 /* Create thread pool to process requests. */
193 debug ("handshake complete, processing requests with %d threads",
195 workers
= calloc (nworkers
, sizeof *workers
);
196 if (unlikely (!workers
)) {
201 for (nworkers
= 0; nworkers
< conn
->nworkers
; nworkers
++) {
202 struct worker_data
*worker
= malloc (sizeof *worker
);
205 if (unlikely (!worker
)) {
207 connection_set_status (STATUS_DEAD
);
210 if (unlikely (asprintf (&worker
->name
, "%s.%d", plugin_name
, nworkers
)
213 connection_set_status (STATUS_DEAD
);
218 err
= pthread_create (&workers
[nworkers
], NULL
, connection_worker
,
220 if (unlikely (err
)) {
222 perror ("pthread_create");
223 connection_set_status (STATUS_DEAD
);
231 pthread_join (workers
[--nworkers
], NULL
);
235 /* Finalize (for filters), called just before close. */
237 r
= backend_finalize (conn
->top_context
);
243 free_connection (conn
);
244 unlock_connection ();
247 static struct connection
*
248 new_connection (int sockin
, int sockout
, int nworkers
)
250 struct connection
*conn
;
253 socklen_t optlen
= sizeof opt
;
256 conn
= calloc (1, sizeof *conn
);
261 conn
->status_pipe
[0] = conn
->status_pipe
[1] = -1;
263 pthread_mutex_init (&conn
->request_lock
, NULL
);
264 pthread_mutex_init (&conn
->read_lock
, NULL
);
265 pthread_mutex_init (&conn
->write_lock
, NULL
);
266 pthread_mutex_init (&conn
->status_lock
, NULL
);
268 conn
->default_exportname
= calloc (top
->i
+ 1,
269 sizeof *conn
->default_exportname
);
270 if (conn
->default_exportname
== NULL
) {
275 conn
->status
= STATUS_ACTIVE
;
276 conn
->nworkers
= nworkers
;
279 if (pipe2 (conn
->status_pipe
, O_NONBLOCK
| O_CLOEXEC
)) {
285 /* If we were fully parallel, then this function could be
286 * accepting connections in one thread while another thread could
287 * be in a plugin trying to fork. But plugins.c forced
288 * thread_model to serialize_all_requests when it detects a lack
289 * of atomic CLOEXEC, at which point, we can use a mutex to ensure
290 * we aren't accepting until the plugin is not running, making
291 * non-atomicity okay.
293 assert (thread_model
<= NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS
);
295 if (pipe (conn
->status_pipe
)) {
300 if (set_nonblock (set_cloexec (conn
->status_pipe
[0])) == -1) {
302 close (conn
->status_pipe
[1]);
306 if (set_nonblock (set_cloexec (conn
->status_pipe
[1])) == -1) {
308 close (conn
->status_pipe
[0]);
313 #else /* !HAVE_PIPE2 && !HAVE_PIPE */
314 /* Windows has neither pipe2 nor pipe. XXX */
319 conn
->sockin
= sockin
;
320 conn
->sockout
= sockout
;
321 conn
->recv
= raw_recv
;
323 if (getsockopt (sockout
, SOL_SOCKET
, SO_TYPE
, &opt
, &optlen
) == 0)
324 conn
->send
= raw_send_socket
;
326 conn
->send
= raw_send_other
;
328 conn
->send
= raw_send_socket
;
330 conn
->close
= raw_close
;
332 threadlocal_set_conn (conn
);
336 #if defined (HAVE_PIPE2) || defined (HAVE_PIPE)
339 if (conn
->status_pipe
[0] >= 0)
340 close (conn
->status_pipe
[0]);
341 if (conn
->status_pipe
[1] >= 0)
342 close (conn
->status_pipe
[1]);
343 free (conn
->default_exportname
);
346 pthread_mutex_destroy (&conn
->request_lock
);
347 pthread_mutex_destroy (&conn
->read_lock
);
348 pthread_mutex_destroy (&conn
->write_lock
);
349 pthread_mutex_destroy (&conn
->status_lock
);
355 free_connection (struct connection
*conn
)
362 conn
->close (SHUT_RDWR
);
364 /* Don't call the plugin again if quit has been set because the main
365 * thread will be in the process of unloading it. The plugin.unload
366 * callback should always be called.
370 if (conn
->top_context
) {
371 backend_close (conn
->top_context
);
372 conn
->top_context
= NULL
;
377 if (conn
->status_pipe
[0] >= 0) {
378 close (conn
->status_pipe
[0]);
379 close (conn
->status_pipe
[1]);
382 pthread_mutex_destroy (&conn
->request_lock
);
383 pthread_mutex_destroy (&conn
->read_lock
);
384 pthread_mutex_destroy (&conn
->write_lock
);
385 pthread_mutex_destroy (&conn
->status_lock
);
387 free (conn
->exportname_from_set_meta_context
);
391 free (conn
->default_exportname
[b
->i
]);
392 free (conn
->default_exportname
);
395 threadlocal_set_conn (NULL
);
398 /* Write buffer to conn->sockout with send() and either succeed completely
399 * (returns 0) or fail (returns -1). flags may include SEND_MORE as a hint
400 * that this send will be followed by related data.
403 raw_send_socket (const void *vbuf
, size_t len
, int flags
)
406 int sock
= conn
->sockout
;
407 const char *buf
= vbuf
;
416 if (flags
& SEND_MORE
)
420 r
= send (sock
, buf
, len
, f
);
422 if (errno
== EINTR
|| errno
== EAGAIN
)
434 /* Write buffer to conn->sockout with write() and either succeed completely
435 * (returns 0) or fail (returns -1). flags is ignored.
438 raw_send_other (const void *vbuf
, size_t len
, int flags
)
441 int sock
= conn
->sockout
;
442 const char *buf
= vbuf
;
447 r
= write (sock
, buf
, len
);
449 if (errno
== EINTR
|| errno
== EAGAIN
)
461 /* Read buffer from conn->sockin and either succeed completely
462 * (returns > 0), read an EOF (returns 0), or fail (returns -1).
465 raw_recv (void *vbuf
, size_t len
)
468 int sock
= conn
->sockin
;
471 bool first_read
= true;
474 /* On Unix we want to use read(2) here because that allows us to
475 * read from non-sockets (think: nbdkit -s). In particular this
476 * makes fuzzing possible. However this is not possible on
477 * Windows where we must use recv.
480 r
= read (sock
, buf
, len
);
482 r
= recv (sock
, buf
, len
, 0);
485 if (errno
== EINTR
|| errno
== EAGAIN
)
492 /* Partial record read. This is an error. */
504 /* There's no place in the NBD protocol to send back errors from
505 * close, so this function ignores errors.
507 * If @how == SHUT_WR and conn->nworkers > 1, the caller holds write_lock.
514 if (conn
->sockout
>= 0 && how
== SHUT_WR
) {
515 if (conn
->sockin
== conn
->sockout
)
516 shutdown (conn
->sockout
, how
);
518 closesocket (conn
->sockout
);
522 if (conn
->sockin
>= 0)
523 closesocket (conn
->sockin
);
524 if (conn
->sockout
>= 0 && conn
->sockin
!= conn
->sockout
)
525 closesocket (conn
->sockout
);