server: Remember .open(readonly) status
[nbdkit/ericb.git] / server / connections.c
blob0c1f24136556f3989dfbee8d6b3a33a3bd067abf
1 /* nbdkit
2 * Copyright (C) 2013-2019 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 #include <config.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <stdint.h>
38 #include <inttypes.h>
39 #include <string.h>
40 #include <unistd.h>
41 #include <sys/socket.h>
42 #include <fcntl.h>
43 #include <assert.h>
45 #include "internal.h"
46 #include "utils.h"
48 /* Default number of parallel requests. */
49 #define DEFAULT_PARALLEL_REQUESTS 16
51 static struct connection *new_connection (int sockin, int sockout,
52 int nworkers);
53 static void free_connection (struct connection *conn);
55 /* Don't call these raw socket functions directly. Use conn->recv etc. */
56 static int raw_recv (struct connection *, void *buf, size_t len);
57 static int raw_send_socket (struct connection *, const void *buf, size_t len,
58 int flags);
59 static int raw_send_other (struct connection *, const void *buf, size_t len,
60 int flags);
61 static void raw_close (struct connection *);
63 void *
64 connection_get_handle (struct connection *conn, size_t i)
66 assert (i < conn->nr_handles);
67 return conn->handles[i].handle;
70 int
71 connection_get_status (struct connection *conn)
73 int r;
75 if (conn->nworkers &&
76 pthread_mutex_lock (&conn->status_lock))
77 abort ();
78 r = conn->status;
79 if (conn->nworkers &&
80 pthread_mutex_unlock (&conn->status_lock))
81 abort ();
82 return r;
85 /* Update the status if the new value is lower than the existing value.
86 * For convenience, return the incoming value.
88 int
89 connection_set_status (struct connection *conn, int value)
91 if (conn->nworkers &&
92 pthread_mutex_lock (&conn->status_lock))
93 abort ();
94 if (value < conn->status) {
95 if (conn->nworkers && conn->status > 0) {
96 char c = 0;
98 assert (conn->status_pipe[1] >= 0);
99 if (write (conn->status_pipe[1], &c, 1) != 1 && errno != EAGAIN)
100 nbdkit_debug ("failed to notify pipe-to-self: %m");
102 conn->status = value;
104 if (conn->nworkers &&
105 pthread_mutex_unlock (&conn->status_lock))
106 abort ();
107 return value;
110 struct worker_data {
111 struct connection *conn;
112 char *name;
115 static void *
116 connection_worker (void *data)
118 struct worker_data *worker = data;
119 struct connection *conn = worker->conn;
120 char *name = worker->name;
122 debug ("starting worker thread %s", name);
123 threadlocal_new_server_thread ();
124 threadlocal_set_name (name);
125 threadlocal_set_conn (conn);
126 free (worker);
128 while (!quit && connection_get_status (conn) > 0)
129 protocol_recv_request_send_reply (conn);
130 debug ("exiting worker thread %s", threadlocal_get_name ());
131 free (name);
132 return NULL;
135 static int
136 _handle_single_connection (int sockin, int sockout)
138 const char *plugin_name;
139 int ret = -1, r;
140 struct connection *conn;
141 int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
142 pthread_t *workers = NULL;
144 if (backend->thread_model (backend) < NBDKIT_THREAD_MODEL_PARALLEL ||
145 nworkers == 1)
146 nworkers = 0;
147 conn = new_connection (sockin, sockout, nworkers);
148 if (!conn)
149 goto done;
151 lock_request (conn);
152 r = backend_open (backend, conn, readonly);
153 unlock_request (conn);
154 if (r == -1)
155 goto done;
157 /* NB: because of an asynchronous exit backend can be set to NULL at
158 * just about any time.
160 if (backend)
161 plugin_name = backend->plugin_name (backend);
162 else
163 plugin_name = "(unknown)";
164 threadlocal_set_name (plugin_name);
166 /* Prepare (for filters), called just after open. */
167 lock_request (conn);
168 if (backend)
169 r = backend->prepare (backend, conn);
170 else
171 r = 0;
172 unlock_request (conn);
173 if (r == -1)
174 goto done;
176 /* Handshake. */
177 if (protocol_handshake (conn) == -1)
178 goto done;
180 if (!nworkers) {
181 /* No need for a separate thread. */
182 debug ("handshake complete, processing requests serially");
183 while (!quit && connection_get_status (conn) > 0)
184 protocol_recv_request_send_reply (conn);
186 else {
187 /* Create thread pool to process requests. */
188 debug ("handshake complete, processing requests with %d threads",
189 nworkers);
190 workers = calloc (nworkers, sizeof *workers);
191 if (!workers) {
192 perror ("malloc");
193 goto done;
196 for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
197 struct worker_data *worker = malloc (sizeof *worker);
198 int err;
200 if (!worker) {
201 perror ("malloc");
202 connection_set_status (conn, -1);
203 goto wait;
205 if (asprintf (&worker->name, "%s.%d", plugin_name, nworkers) < 0) {
206 perror ("asprintf");
207 connection_set_status (conn, -1);
208 free (worker);
209 goto wait;
211 worker->conn = conn;
212 err = pthread_create (&workers[nworkers], NULL, connection_worker,
213 worker);
214 if (err) {
215 errno = err;
216 perror ("pthread_create");
217 connection_set_status (conn, -1);
218 free (worker);
219 goto wait;
223 wait:
224 while (nworkers)
225 pthread_join (workers[--nworkers], NULL);
226 free (workers);
229 /* Finalize (for filters), called just before close. */
230 lock_request (conn);
231 if (backend)
232 r = backend->finalize (backend, conn);
233 else
234 r = 0;
235 unlock_request (conn);
236 if (r == -1)
237 goto done;
239 ret = connection_get_status (conn);
240 done:
241 free_connection (conn);
242 return ret;
246 handle_single_connection (int sockin, int sockout)
248 int r;
250 lock_connection ();
251 r = _handle_single_connection (sockin, sockout);
252 unlock_connection ();
254 return r;
257 static struct connection *
258 new_connection (int sockin, int sockout, int nworkers)
260 struct connection *conn;
261 int opt;
262 socklen_t optlen = sizeof opt;
263 struct backend *b;
265 conn = calloc (1, sizeof *conn);
266 if (conn == NULL) {
267 perror ("malloc");
268 return NULL;
270 conn->handles = calloc (backend->i + 1, sizeof *conn->handles);
271 if (conn->handles == NULL) {
272 perror ("malloc");
273 free (conn);
274 return NULL;
276 conn->nr_handles = backend->i + 1;
277 memset (conn->handles, -1, conn->nr_handles * sizeof *conn->handles);
278 for_each_backend (b)
279 conn->handles[b->i].handle = NULL;
281 conn->status = 1;
282 conn->nworkers = nworkers;
283 if (nworkers) {
284 #ifdef HAVE_PIPE2
285 if (pipe2 (conn->status_pipe, O_NONBLOCK | O_CLOEXEC)) {
286 perror ("pipe2");
287 free (conn);
288 return NULL;
290 #else
291 /* If we were fully parallel, then this function could be
292 * accepting connections in one thread while another thread could
293 * be in a plugin trying to fork. But plugins.c forced
294 * thread_model to serialize_all_requests when it detects a lack
295 * of atomic CLOEXEC, at which point, we can use a mutex to ensure
296 * we aren't accepting until the plugin is not running, making
297 * non-atomicity okay.
299 assert (backend->thread_model (backend) <=
300 NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS);
301 lock_request (NULL);
302 if (pipe (conn->status_pipe)) {
303 perror ("pipe");
304 free (conn);
305 unlock_request (NULL);
306 return NULL;
308 if (set_nonblock (set_cloexec (conn->status_pipe[0])) == -1) {
309 perror ("fcntl");
310 close (conn->status_pipe[1]);
311 free (conn);
312 unlock_request (NULL);
313 return NULL;
315 if (set_nonblock (set_cloexec (conn->status_pipe[1])) == -1) {
316 perror ("fcntl");
317 close (conn->status_pipe[0]);
318 free (conn);
319 unlock_request (NULL);
320 return NULL;
322 unlock_request (NULL);
323 #endif
325 else
326 conn->status_pipe[0] = conn->status_pipe[1] = -1;
327 conn->sockin = sockin;
328 conn->sockout = sockout;
329 pthread_mutex_init (&conn->request_lock, NULL);
330 pthread_mutex_init (&conn->read_lock, NULL);
331 pthread_mutex_init (&conn->write_lock, NULL);
332 pthread_mutex_init (&conn->status_lock, NULL);
334 conn->recv = raw_recv;
335 if (getsockopt (sockout, SOL_SOCKET, SO_TYPE, &opt, &optlen) == 0)
336 conn->send = raw_send_socket;
337 else
338 conn->send = raw_send_other;
339 conn->close = raw_close;
341 threadlocal_set_conn (conn);
343 return conn;
346 static void
347 free_connection (struct connection *conn)
349 if (!conn)
350 return;
352 threadlocal_set_conn (NULL);
353 conn->close (conn);
354 if (listen_stdin) {
355 int fd;
357 /* Restore something to stdin/out so the rest of our code can
358 * continue to assume that all new fds will be above stderr.
359 * Swap directions to get EBADF on improper use of stdin/out.
361 fd = open ("/dev/null", O_WRONLY | O_CLOEXEC);
362 assert (fd == 0);
363 fd = open ("/dev/null", O_RDONLY | O_CLOEXEC);
364 assert (fd == 1);
367 /* Don't call the plugin again if quit has been set because the main
368 * thread will be in the process of unloading it. The plugin.unload
369 * callback should always be called.
371 if (!quit && connection_get_handle (conn, 0)) {
372 lock_request (conn);
373 backend->close (backend, conn);
374 unlock_request (conn);
377 if (conn->status_pipe[0] >= 0) {
378 close (conn->status_pipe[0]);
379 close (conn->status_pipe[1]);
382 pthread_mutex_destroy (&conn->request_lock);
383 pthread_mutex_destroy (&conn->read_lock);
384 pthread_mutex_destroy (&conn->write_lock);
385 pthread_mutex_destroy (&conn->status_lock);
387 free (conn->handles);
388 free (conn);
391 /* Write buffer to conn->sockout with send() and either succeed completely
392 * (returns 0) or fail (returns -1). flags may include SEND_MORE as a hint
393 * that this send will be followed by related data.
395 static int
396 raw_send_socket (struct connection *conn, const void *vbuf, size_t len,
397 int flags)
399 int sock = conn->sockout;
400 const char *buf = vbuf;
401 ssize_t r;
402 int f = 0;
404 #ifdef MSG_MORE
405 if (flags & SEND_MORE)
406 f |= MSG_MORE;
407 #endif
408 while (len > 0) {
409 r = send (sock, buf, len, f);
410 if (r == -1) {
411 if (errno == EINTR || errno == EAGAIN)
412 continue;
413 return -1;
415 buf += r;
416 len -= r;
419 return 0;
422 /* Write buffer to conn->sockout with write() and either succeed completely
423 * (returns 0) or fail (returns -1). flags is ignored.
425 static int
426 raw_send_other (struct connection *conn, const void *vbuf, size_t len,
427 int flags)
429 int sock = conn->sockout;
430 const char *buf = vbuf;
431 ssize_t r;
433 while (len > 0) {
434 r = write (sock, buf, len);
435 if (r == -1) {
436 if (errno == EINTR || errno == EAGAIN)
437 continue;
438 return -1;
440 buf += r;
441 len -= r;
444 return 0;
447 /* Read buffer from conn->sockin and either succeed completely
448 * (returns > 0), read an EOF (returns 0), or fail (returns -1).
450 static int
451 raw_recv (struct connection *conn, void *vbuf, size_t len)
453 int sock = conn->sockin;
454 char *buf = vbuf;
455 ssize_t r;
456 bool first_read = true;
458 while (len > 0) {
459 r = read (sock, buf, len);
460 if (r == -1) {
461 if (errno == EINTR || errno == EAGAIN)
462 continue;
463 return -1;
465 if (r == 0) {
466 if (first_read)
467 return 0;
468 /* Partial record read. This is an error. */
469 errno = EBADMSG;
470 return -1;
472 first_read = false;
473 buf += r;
474 len -= r;
477 return 1;
480 /* There's no place in the NBD protocol to send back errors from
481 * close, so this function ignores errors.
483 static void
484 raw_close (struct connection *conn)
486 if (conn->sockin >= 0)
487 close (conn->sockin);
488 if (conn->sockout >= 0 && conn->sockin != conn->sockout)
489 close (conn->sockout);