selftest: tests for vfs_fruite file-id behavior
[Samba.git] / ctdb / common / comm.c
blob12f49702a90fa9ccca72f0c76c43b744c49ec7d7
1 /*
2 Communication endpoint implementation
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
24 #include <talloc.h>
25 #include <tdb.h>
27 #include "lib/util/blocking.h"
28 #include "lib/util/tevent_unix.h"
30 #include "pkt_read.h"
31 #include "pkt_write.h"
32 #include "comm.h"
35 * Communication endpoint around a socket
38 #define SMALL_PKT_SIZE 1024
40 struct comm_context {
41 int fd;
42 comm_read_handler_fn read_handler;
43 void *read_private_data;
44 comm_dead_handler_fn dead_handler;
45 void *dead_private_data;
46 uint8_t small_pkt[SMALL_PKT_SIZE];
47 struct tevent_req *read_req, *write_req;
48 struct tevent_fd *fde;
49 struct tevent_queue *queue;
52 static void comm_fd_handler(struct tevent_context *ev,
53 struct tevent_fd *fde,
54 uint16_t flags, void *private_data);
55 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
56 struct tevent_context *ev,
57 struct comm_context *comm,
58 uint8_t *buf, size_t buflen);
59 static void comm_read_failed(struct tevent_req *req);
62 int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
63 comm_read_handler_fn read_handler, void *read_private_data,
64 comm_dead_handler_fn dead_handler, void *dead_private_data,
65 struct comm_context **result)
67 struct comm_context *comm;
68 int ret;
70 if (fd < 0) {
71 return EINVAL;
74 if (dead_handler == NULL) {
75 return EINVAL;
78 /* Socket queue relies on non-blocking sockets. */
79 ret = set_blocking(fd, false);
80 if (ret == -1) {
81 return EIO;
84 comm = talloc_zero(mem_ctx, struct comm_context);
85 if (comm == NULL) {
86 return ENOMEM;
89 comm->fd = fd;
90 comm->read_handler = read_handler;
91 comm->read_private_data = read_private_data;
92 comm->dead_handler = dead_handler;
93 comm->dead_private_data = dead_private_data;
95 comm->queue = tevent_queue_create(comm, "comm write queue");
96 if (comm->queue == NULL) {
97 goto fail;
100 /* Set up to write packets */
101 comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
102 comm_fd_handler, comm);
103 if (comm->fde == NULL) {
104 goto fail;
107 /* Set up to read packets */
108 if (read_handler != NULL) {
109 struct tevent_req *req;
111 req = comm_read_send(comm, ev, comm, comm->small_pkt,
112 SMALL_PKT_SIZE);
113 if (req == NULL) {
114 goto fail;
117 tevent_req_set_callback(req, comm_read_failed, comm);
118 comm->read_req = req;
121 *result = comm;
122 return 0;
124 fail:
125 talloc_free(comm);
126 return ENOMEM;
131 * Read packets
134 struct comm_read_state {
135 struct tevent_context *ev;
136 struct comm_context *comm;
137 uint8_t *buf;
138 size_t buflen;
139 struct tevent_req *subreq;
142 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
143 static void comm_read_done(struct tevent_req *subreq);
145 static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
146 struct tevent_context *ev,
147 struct comm_context *comm,
148 uint8_t *buf, size_t buflen)
150 struct tevent_req *req, *subreq;
151 struct comm_read_state *state;
153 req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
154 if (req == NULL) {
155 return NULL;
158 state->ev = ev;
159 state->comm = comm;
160 state->buf = buf;
161 state->buflen = buflen;
163 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
164 state->buf, state->buflen,
165 comm_read_more, NULL);
166 if (tevent_req_nomem(subreq, req)) {
167 return tevent_req_post(req, ev);
169 state->subreq = subreq;
171 tevent_req_set_callback(subreq, comm_read_done, req);
172 return req;
175 static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
177 uint32_t packet_len;
179 if (buflen < sizeof(uint32_t)) {
180 return sizeof(uint32_t) - buflen;
183 packet_len = *(uint32_t *)buf;
185 return packet_len - buflen;
188 static void comm_read_done(struct tevent_req *subreq)
190 struct tevent_req *req = tevent_req_callback_data(
191 subreq, struct tevent_req);
192 struct comm_read_state *state = tevent_req_data(
193 req, struct comm_read_state);
194 struct comm_context *comm = state->comm;
195 ssize_t nread;
196 uint8_t *buf;
197 bool free_buf;
198 int err = 0;
200 nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
201 TALLOC_FREE(subreq);
202 state->subreq = NULL;
203 if (nread == -1) {
204 tevent_req_error(req, err);
205 return;
208 comm->read_handler(buf, nread, comm->read_private_data);
210 if (free_buf) {
211 talloc_free(buf);
214 subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
215 state->buf, state->buflen,
216 comm_read_more, NULL);
217 if (tevent_req_nomem(subreq, req)) {
218 return;
220 state->subreq = subreq;
222 tevent_req_set_callback(subreq, comm_read_done, req);
225 static void comm_read_recv(struct tevent_req *req, int *perr)
227 int err;
229 if (tevent_req_is_unix_error(req, &err)) {
230 if (perr != NULL) {
231 *perr = err;
236 static void comm_read_failed(struct tevent_req *req)
238 struct comm_context *comm = tevent_req_callback_data(
239 req, struct comm_context);
241 comm_read_recv(req, NULL);
242 TALLOC_FREE(req);
243 comm->read_req = NULL;
244 if (comm->dead_handler != NULL) {
245 comm->dead_handler(comm->dead_private_data);
251 * Write packets
254 struct comm_write_entry {
255 struct comm_context *comm;
256 struct tevent_queue_entry *qentry;
257 struct tevent_req *req;
260 struct comm_write_state {
261 struct tevent_context *ev;
262 struct comm_context *comm;
263 struct comm_write_entry *entry;
264 struct tevent_req *subreq;
265 uint8_t *buf;
266 size_t buflen, nwritten;
269 static int comm_write_entry_destructor(struct comm_write_entry *entry);
270 static void comm_write_trigger(struct tevent_req *req, void *private_data);
271 static void comm_write_done(struct tevent_req *subreq);
273 struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
274 struct tevent_context *ev,
275 struct comm_context *comm,
276 uint8_t *buf, size_t buflen)
278 struct tevent_req *req;
279 struct comm_write_state *state;
280 struct comm_write_entry *entry;
282 req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
283 if (req == NULL) {
284 return NULL;
287 state->ev = ev;
288 state->comm = comm;
289 state->buf = buf;
290 state->buflen = buflen;
292 entry = talloc_zero(state, struct comm_write_entry);
293 if (tevent_req_nomem(entry, req)) {
294 return tevent_req_post(req, ev);
297 entry->comm = comm;
298 entry->req = req;
299 entry->qentry = tevent_queue_add_entry(comm->queue, ev, req,
300 comm_write_trigger, NULL);
301 if (tevent_req_nomem(entry->qentry, req)) {
302 return tevent_req_post(req, ev);
305 state->entry = entry;
306 talloc_set_destructor(entry, comm_write_entry_destructor);
308 return req;
311 static int comm_write_entry_destructor(struct comm_write_entry *entry)
313 struct comm_context *comm = entry->comm;
315 if (comm->write_req == entry->req) {
316 comm->write_req = NULL;
317 TEVENT_FD_NOT_WRITEABLE(comm->fde);
320 TALLOC_FREE(entry->qentry);
321 return 0;
324 static void comm_write_trigger(struct tevent_req *req, void *private_data)
326 struct comm_write_state *state = tevent_req_data(
327 req, struct comm_write_state);
328 struct comm_context *comm = state->comm;
329 struct tevent_req *subreq;
331 comm->write_req = req;
333 subreq = pkt_write_send(state, state->ev, comm->fd,
334 state->buf, state->buflen);
335 if (tevent_req_nomem(subreq, req)) {
336 return;
339 state->subreq = subreq;
340 tevent_req_set_callback(subreq, comm_write_done, req);
341 TEVENT_FD_WRITEABLE(comm->fde);
344 static void comm_write_done(struct tevent_req *subreq)
346 struct tevent_req *req = tevent_req_callback_data(
347 subreq, struct tevent_req);
348 struct comm_write_state *state = tevent_req_data(
349 req, struct comm_write_state);
350 struct comm_context *comm = state->comm;
351 ssize_t nwritten;
352 int err = 0;
354 TEVENT_FD_NOT_WRITEABLE(comm->fde);
355 nwritten = pkt_write_recv(subreq, &err);
356 TALLOC_FREE(subreq);
357 state->subreq = NULL;
358 comm->write_req = NULL;
359 if (nwritten == -1) {
360 if (err == EPIPE) {
361 comm->dead_handler(comm->dead_private_data);
363 tevent_req_error(req, err);
364 return;
367 state->nwritten = nwritten;
368 state->entry->qentry = NULL;
369 TALLOC_FREE(state->entry);
370 tevent_req_done(req);
373 bool comm_write_recv(struct tevent_req *req, int *perr)
375 struct comm_write_state *state = tevent_req_data(
376 req, struct comm_write_state);
377 int err;
379 if (tevent_req_is_unix_error(req, &err)) {
380 if (perr != NULL) {
381 *perr = err;
383 return false;
386 if (state->nwritten != state->buflen) {
387 *perr = EIO;
388 return false;
391 *perr = 0;
392 return true;
395 static void comm_fd_handler(struct tevent_context *ev,
396 struct tevent_fd *fde,
397 uint16_t flags, void *private_data)
399 struct comm_context *comm = talloc_get_type_abort(
400 private_data, struct comm_context);
402 if (flags & TEVENT_FD_READ) {
403 struct comm_read_state *read_state;
405 if (comm->read_req == NULL) {
406 /* This should never happen */
407 abort();
410 read_state = tevent_req_data(comm->read_req,
411 struct comm_read_state);
412 pkt_read_handler(ev, fde, flags, read_state->subreq);
415 if (flags & TEVENT_FD_WRITE) {
416 struct comm_write_state *write_state;
418 if (comm->write_req == NULL) {
419 TEVENT_FD_NOT_WRITEABLE(comm->fde);
420 return;
423 write_state = tevent_req_data(comm->write_req,
424 struct comm_write_state);
425 pkt_write_handler(ev, fde, flags, write_state->subreq);