autobuild: Use make -j on samba-libs/samba-static build as well
[Samba.git] / ctdb / common / comm.c
blob1bbb46050b5ae878bf926f7e4204e4e8130b4e85
1 /*
2 Communication endpoint implementation
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tdb.h>
27 #include "lib/util/tevent_unix.h"
29 #include "pkt_read.h"
30 #include "pkt_write.h"
31 #include "comm.h"
33 static bool set_nonblocking(int fd)
35 int v;
37 v = fcntl(fd, F_GETFL, 0);
38 if (v == -1) {
39 return false;
41 if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
42 return false;
44 return true;
48 * Communication endpoint around a socket
51 #define SMALL_PKT_SIZE 1024
53 struct comm_context {
54 int fd;
55 comm_read_handler_fn read_handler;
56 void *read_private_data;
57 comm_dead_handler_fn dead_handler;
58 void *dead_private_data;
59 uint8_t small_pkt[SMALL_PKT_SIZE];
60 struct tevent_req *read_req, *write_req;
61 struct tevent_fd *fde;
62 struct tevent_queue *queue;
65 static void comm_fd_handler(struct tevent_context *ev,
66 struct tevent_fd *fde,
67 uint16_t flags, void *private_data);
68 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
69 struct tevent_context *ev,
70 struct comm_context *comm,
71 uint8_t *buf, size_t buflen);
72 static void comm_read_failed(struct tevent_req *req);
75 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
76 comm_read_handler_fn read_handler, void *read_private_data,
77 comm_dead_handler_fn dead_handler, void *dead_private_data,
78 struct comm_context **result)
80 struct comm_context *comm;
82 if (fd < 0) {
83 return EINVAL;
86 if (dead_handler == NULL) {
87 return EINVAL;
90 /* Socket queue relies on non-blocking sockets. */
91 if (!set_nonblocking(fd)) {
92 return EIO;
95 comm = talloc_zero(mem_ctx, struct comm_context);
96 if (comm == NULL) {
97 return ENOMEM;
100 comm->fd = fd;
101 comm->read_handler = read_handler;
102 comm->read_private_data = read_private_data;
103 comm->dead_handler = dead_handler;
104 comm->dead_private_data = dead_private_data;
106 comm->queue = tevent_queue_create(comm, "comm write queue");
107 if (comm->queue == NULL) {
108 goto fail;
111 /* Set up to write packets */
112 comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
113 comm_fd_handler, comm);
114 if (comm->fde == NULL) {
115 goto fail;
118 /* Set up to read packets */
119 if (read_handler != NULL) {
120 struct tevent_req *req;
122 req = comm_read_send(comm, ev, comm, comm->small_pkt,
123 SMALL_PKT_SIZE);
124 if (req == NULL) {
125 goto fail;
128 tevent_req_set_callback(req, comm_read_failed, comm);
129 comm->read_req = req;
132 *result = comm;
133 return 0;
135 fail:
136 talloc_free(comm);
137 return ENOMEM;
142 * Read packets
145 struct comm_read_state {
146 struct tevent_context *ev;
147 struct comm_context *comm;
148 uint8_t *buf;
149 size_t buflen;
150 struct tevent_req *subreq;
153 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
154 static void comm_read_done(struct tevent_req *subreq);
156 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
157 struct tevent_context *ev,
158 struct comm_context *comm,
159 uint8_t *buf, size_t buflen)
161 struct tevent_req *req, *subreq;
162 struct comm_read_state *state;
164 req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
165 if (req == NULL) {
166 return NULL;
169 state->ev = ev;
170 state->comm = comm;
171 state->buf = buf;
172 state->buflen = buflen;
174 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
175 state->buf, state->buflen,
176 comm_read_more, NULL);
177 if (tevent_req_nomem(subreq, req)) {
178 return tevent_req_post(req, ev);
180 state->subreq = subreq;
182 tevent_req_set_callback(subreq, comm_read_done, req);
183 return req;
186 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
188 uint32_t packet_len;
190 if (buflen < sizeof(uint32_t)) {
191 return sizeof(uint32_t) - buflen;
194 packet_len = *(uint32_t *)buf;
196 return packet_len - buflen;
199 static void comm_read_done(struct tevent_req *subreq)
201 struct tevent_req *req = tevent_req_callback_data(
202 subreq, struct tevent_req);
203 struct comm_read_state *state = tevent_req_data(
204 req, struct comm_read_state);
205 struct comm_context *comm = state->comm;
206 ssize_t nread;
207 uint8_t *buf;
208 bool free_buf;
209 int err = 0;
211 nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
212 TALLOC_FREE(subreq);
213 state->subreq = NULL;
214 if (nread == -1) {
215 tevent_req_error(req, err);
216 return;
219 comm->read_handler(buf, nread, comm->read_private_data);
221 if (free_buf) {
222 talloc_free(buf);
225 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
226 state->buf, state->buflen,
227 comm_read_more, NULL);
228 if (tevent_req_nomem(subreq, req)) {
229 return;
231 state->subreq = subreq;
233 tevent_req_set_callback(subreq, comm_read_done, req);
236 static void comm_read_recv(struct tevent_req *req, int *perr)
238 int err;
240 if (tevent_req_is_unix_error(req, &err)) {
241 if (perr != NULL) {
242 *perr = err;
247 static void comm_read_failed(struct tevent_req *req)
249 struct comm_context *comm = tevent_req_callback_data(
250 req, struct comm_context);
252 comm_read_recv(req, NULL);
253 TALLOC_FREE(req);
254 comm->read_req = NULL;
255 if (comm->dead_handler != NULL) {
256 comm->dead_handler(comm->dead_private_data);
262 * Write packets
265 struct comm_write_state {
266 struct tevent_context *ev;
267 struct comm_context *comm;
268 struct tevent_req *subreq;
269 uint8_t *buf;
270 size_t buflen, nwritten;
273 static void comm_write_trigger(struct tevent_req *req, void *private_data);
274 static void comm_write_done(struct tevent_req *subreq);
276 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
277 struct tevent_context *ev,
278 struct comm_context *comm,
279 uint8_t *buf, size_t buflen)
281 struct tevent_req *req;
282 struct comm_write_state *state;
284 req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
285 if (req == NULL) {
286 return NULL;
289 state->ev = ev;
290 state->comm = comm;
291 state->buf = buf;
292 state->buflen = buflen;
294 if (!tevent_queue_add_entry(comm->queue, ev, req,
295 comm_write_trigger, NULL)) {
296 talloc_free(req);
297 return NULL;
300 return req;
303 static void comm_write_trigger(struct tevent_req *req, void *private_data)
305 struct comm_write_state *state = tevent_req_data(
306 req, struct comm_write_state);
307 struct comm_context *comm = state->comm;
308 struct tevent_req *subreq;
310 comm->write_req = req;
312 subreq = pkt_write_send(state, state->ev, comm->fd,
313 state->buf, state->buflen);
314 if (tevent_req_nomem(subreq, req)) {
315 return;
318 state->subreq = subreq;
319 tevent_req_set_callback(subreq, comm_write_done, req);
320 TEVENT_FD_WRITEABLE(comm->fde);
323 static void comm_write_done(struct tevent_req *subreq)
325 struct tevent_req *req = tevent_req_callback_data(
326 subreq, struct tevent_req);
327 struct comm_write_state *state = tevent_req_data(
328 req, struct comm_write_state);
329 struct comm_context *comm = state->comm;
330 ssize_t nwritten;
331 int err = 0;
333 TEVENT_FD_NOT_WRITEABLE(comm->fde);
334 nwritten = pkt_write_recv(subreq, &err);
335 TALLOC_FREE(subreq);
336 state->subreq = NULL;
337 comm->write_req = NULL;
338 if (nwritten == -1) {
339 if (err == EPIPE) {
340 comm->dead_handler(comm->dead_private_data);
342 tevent_req_error(req, err);
343 return;
346 state->nwritten = nwritten;
347 tevent_req_done(req);
350 bool comm_write_recv(struct tevent_req *req, int *perr)
352 struct comm_write_state *state = tevent_req_data(
353 req, struct comm_write_state);
354 int err;
356 if (tevent_req_is_unix_error(req, &err)) {
357 if (perr != NULL) {
358 *perr = err;
360 return false;
363 if (state->nwritten != state->buflen) {
364 *perr = EIO;
365 return false;
368 *perr = 0;
369 return true;
372 static void comm_fd_handler(struct tevent_context *ev,
373 struct tevent_fd *fde,
374 uint16_t flags, void *private_data)
376 struct comm_context *comm = talloc_get_type_abort(
377 private_data, struct comm_context);
379 if (flags & TEVENT_FD_READ) {
380 struct comm_read_state *read_state;
382 if (comm->read_req == NULL) {
383 /* This should never happen */
384 abort();
387 read_state = tevent_req_data(comm->read_req,
388 struct comm_read_state);
389 pkt_read_handler(ev, fde, flags, read_state->subreq);
392 if (flags & TEVENT_FD_WRITE) {
393 struct comm_write_state *write_state;
395 if (comm->write_req == NULL) {
396 /* This should never happen */
397 abort();
400 write_state = tevent_req_data(comm->write_req,
401 struct comm_write_state);
402 pkt_write_handler(ev, fde, flags, write_state->subreq);