2 * Copyright (C) 2013-2019 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41 #include <sys/socket.h>
48 /* Default number of parallel requests. */
49 #define DEFAULT_PARALLEL_REQUESTS 16
51 static struct connection
*new_connection (int sockin
, int sockout
,
53 static void free_connection (struct connection
*conn
);
55 /* Don't call these raw socket functions directly. Use conn->recv etc. */
56 static int raw_recv (struct connection
*, void *buf
, size_t len
);
57 static int raw_send_socket (struct connection
*, const void *buf
, size_t len
,
59 static int raw_send_other (struct connection
*, const void *buf
, size_t len
,
61 static void raw_close (struct connection
*);
64 connection_get_handle (struct connection
*conn
, size_t i
)
66 assert (i
< conn
->nr_handles
);
67 return conn
->handles
[i
].handle
;
71 connection_get_status (struct connection
*conn
)
76 pthread_mutex_lock (&conn
->status_lock
))
80 pthread_mutex_unlock (&conn
->status_lock
))
85 /* Update the status if the new value is lower than the existing value.
86 * For convenience, return the incoming value.
89 connection_set_status (struct connection
*conn
, int value
)
92 pthread_mutex_lock (&conn
->status_lock
))
94 if (value
< conn
->status
) {
95 if (conn
->nworkers
&& conn
->status
> 0) {
98 assert (conn
->status_pipe
[1] >= 0);
99 if (write (conn
->status_pipe
[1], &c
, 1) != 1 && errno
!= EAGAIN
)
100 nbdkit_debug ("failed to notify pipe-to-self: %m");
102 conn
->status
= value
;
104 if (conn
->nworkers
&&
105 pthread_mutex_unlock (&conn
->status_lock
))
111 struct connection
*conn
;
116 connection_worker (void *data
)
118 struct worker_data
*worker
= data
;
119 struct connection
*conn
= worker
->conn
;
120 char *name
= worker
->name
;
122 debug ("starting worker thread %s", name
);
123 threadlocal_new_server_thread ();
124 threadlocal_set_name (name
);
125 threadlocal_set_conn (conn
);
128 while (!quit
&& connection_get_status (conn
) > 0)
129 protocol_recv_request_send_reply (conn
);
130 debug ("exiting worker thread %s", threadlocal_get_name ());
136 _handle_single_connection (int sockin
, int sockout
)
138 const char *plugin_name
;
140 struct connection
*conn
;
141 int nworkers
= threads
? threads
: DEFAULT_PARALLEL_REQUESTS
;
142 pthread_t
*workers
= NULL
;
144 if (backend
->thread_model (backend
) < NBDKIT_THREAD_MODEL_PARALLEL
||
147 conn
= new_connection (sockin
, sockout
, nworkers
);
151 /* NB: because of an asynchronous exit backend can be set to NULL at
152 * just about any time.
155 plugin_name
= backend
->plugin_name (backend
);
157 plugin_name
= "(unknown)";
158 threadlocal_set_name (plugin_name
);
162 * Note that this calls the backend .open callback when it is safe
163 * to do so (eg. after TLS authentication).
165 if (protocol_handshake (conn
) == -1)
169 /* No need for a separate thread. */
170 debug ("handshake complete, processing requests serially");
171 while (!quit
&& connection_get_status (conn
) > 0)
172 protocol_recv_request_send_reply (conn
);
175 /* Create thread pool to process requests. */
176 debug ("handshake complete, processing requests with %d threads",
178 workers
= calloc (nworkers
, sizeof *workers
);
184 for (nworkers
= 0; nworkers
< conn
->nworkers
; nworkers
++) {
185 struct worker_data
*worker
= malloc (sizeof *worker
);
190 connection_set_status (conn
, -1);
193 if (asprintf (&worker
->name
, "%s.%d", plugin_name
, nworkers
) < 0) {
195 connection_set_status (conn
, -1);
200 err
= pthread_create (&workers
[nworkers
], NULL
, connection_worker
,
204 perror ("pthread_create");
205 connection_set_status (conn
, -1);
213 pthread_join (workers
[--nworkers
], NULL
);
217 /* Finalize (for filters), called just before close. */
220 r
= backend
->finalize (backend
, conn
);
223 unlock_request (conn
);
227 ret
= connection_get_status (conn
);
229 free_connection (conn
);
234 handle_single_connection (int sockin
, int sockout
)
239 r
= _handle_single_connection (sockin
, sockout
);
240 unlock_connection ();
245 static struct connection
*
246 new_connection (int sockin
, int sockout
, int nworkers
)
248 struct connection
*conn
;
250 socklen_t optlen
= sizeof opt
;
253 conn
= calloc (1, sizeof *conn
);
259 conn
->status_pipe
[0] = conn
->status_pipe
[1] = -1;
261 conn
->exportname
= strdup ("");
262 if (conn
->exportname
== NULL
) {
266 conn
->handles
= calloc (backend
->i
+ 1, sizeof *conn
->handles
);
267 if (conn
->handles
== NULL
) {
271 conn
->nr_handles
= backend
->i
+ 1;
273 reset_b_conn_handle (&conn
->handles
[b
->i
]);
276 conn
->nworkers
= nworkers
;
279 if (pipe2 (conn
->status_pipe
, O_NONBLOCK
| O_CLOEXEC
)) {
284 /* If we were fully parallel, then this function could be
285 * accepting connections in one thread while another thread could
286 * be in a plugin trying to fork. But plugins.c forced
287 * thread_model to serialize_all_requests when it detects a lack
288 * of atomic CLOEXEC, at which point, we can use a mutex to ensure
289 * we aren't accepting until the plugin is not running, making
290 * non-atomicity okay.
292 assert (backend
->thread_model (backend
) <=
293 NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS
);
295 if (pipe (conn
->status_pipe
)) {
297 unlock_request (NULL
);
300 if (set_nonblock (set_cloexec (conn
->status_pipe
[0])) == -1) {
302 close (conn
->status_pipe
[1]);
303 unlock_request (NULL
);
306 if (set_nonblock (set_cloexec (conn
->status_pipe
[1])) == -1) {
308 close (conn
->status_pipe
[0]);
309 unlock_request (NULL
);
312 unlock_request (NULL
);
315 conn
->sockin
= sockin
;
316 conn
->sockout
= sockout
;
317 pthread_mutex_init (&conn
->request_lock
, NULL
);
318 pthread_mutex_init (&conn
->read_lock
, NULL
);
319 pthread_mutex_init (&conn
->write_lock
, NULL
);
320 pthread_mutex_init (&conn
->status_lock
, NULL
);
322 conn
->recv
= raw_recv
;
323 if (getsockopt (sockout
, SOL_SOCKET
, SO_TYPE
, &opt
, &optlen
) == 0)
324 conn
->send
= raw_send_socket
;
326 conn
->send
= raw_send_other
;
327 conn
->close
= raw_close
;
329 threadlocal_set_conn (conn
);
334 if (conn
->status_pipe
[0] >= 0)
335 close (conn
->status_pipe
[0]);
336 if (conn
->status_pipe
[1] >= 0)
337 close (conn
->status_pipe
[1]);
338 free (conn
->exportname
);
339 free (conn
->handles
);
345 free_connection (struct connection
*conn
)
350 threadlocal_set_conn (NULL
);
355 /* Restore something to stdin/out so the rest of our code can
356 * continue to assume that all new fds will be above stderr.
357 * Swap directions to get EBADF on improper use of stdin/out.
359 fd
= open ("/dev/null", O_WRONLY
| O_CLOEXEC
);
361 fd
= open ("/dev/null", O_RDONLY
| O_CLOEXEC
);
365 /* Don't call the plugin again if quit has been set because the main
366 * thread will be in the process of unloading it. The plugin.unload
367 * callback should always be called.
369 if (!quit
&& connection_get_handle (conn
, 0)) {
371 backend_close (backend
, conn
);
372 unlock_request (conn
);
375 if (conn
->status_pipe
[0] >= 0) {
376 close (conn
->status_pipe
[0]);
377 close (conn
->status_pipe
[1]);
380 pthread_mutex_destroy (&conn
->request_lock
);
381 pthread_mutex_destroy (&conn
->read_lock
);
382 pthread_mutex_destroy (&conn
->write_lock
);
383 pthread_mutex_destroy (&conn
->status_lock
);
385 free (conn
->handles
);
386 free (conn
->exportname
);
390 /* Write buffer to conn->sockout with send() and either succeed completely
391 * (returns 0) or fail (returns -1). flags may include SEND_MORE as a hint
392 * that this send will be followed by related data.
395 raw_send_socket (struct connection
*conn
, const void *vbuf
, size_t len
,
398 int sock
= conn
->sockout
;
399 const char *buf
= vbuf
;
404 if (flags
& SEND_MORE
)
408 r
= send (sock
, buf
, len
, f
);
410 if (errno
== EINTR
|| errno
== EAGAIN
)
421 /* Write buffer to conn->sockout with write() and either succeed completely
422 * (returns 0) or fail (returns -1). flags is ignored.
425 raw_send_other (struct connection
*conn
, const void *vbuf
, size_t len
,
428 int sock
= conn
->sockout
;
429 const char *buf
= vbuf
;
433 r
= write (sock
, buf
, len
);
435 if (errno
== EINTR
|| errno
== EAGAIN
)
446 /* Read buffer from conn->sockin and either succeed completely
447 * (returns > 0), read an EOF (returns 0), or fail (returns -1).
450 raw_recv (struct connection
*conn
, void *vbuf
, size_t len
)
452 int sock
= conn
->sockin
;
455 bool first_read
= true;
458 r
= read (sock
, buf
, len
);
460 if (errno
== EINTR
|| errno
== EAGAIN
)
467 /* Partial record read. This is an error. */
479 /* There's no place in the NBD protocol to send back errors from
480 * close, so this function ignores errors.
483 raw_close (struct connection
*conn
)
485 if (conn
->sockin
>= 0)
486 close (conn
->sockin
);
487 if (conn
->sockout
>= 0 && conn
->sockin
!= conn
->sockout
)
488 close (conn
->sockout
);