2 Communication endpoint implementation
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
27 #include "lib/util/blocking.h"
28 #include "lib/util/tevent_unix.h"
31 #include "pkt_write.h"
35 * Communication endpoint around a socket
38 #define SMALL_PKT_SIZE 1024
42 comm_read_handler_fn read_handler
;
43 void *read_private_data
;
44 comm_dead_handler_fn dead_handler
;
45 void *dead_private_data
;
46 uint8_t small_pkt
[SMALL_PKT_SIZE
];
47 struct tevent_req
*read_req
, *write_req
;
48 struct tevent_fd
*fde
;
49 struct tevent_queue
*queue
;
52 static void comm_fd_handler(struct tevent_context
*ev
,
53 struct tevent_fd
*fde
,
54 uint16_t flags
, void *private_data
);
55 static struct tevent_req
*comm_read_send(TALLOC_CTX
*mem_ctx
,
56 struct tevent_context
*ev
,
57 struct comm_context
*comm
,
58 uint8_t *buf
, size_t buflen
);
59 static void comm_read_failed(struct tevent_req
*req
);
62 int comm_setup(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
, int fd
,
63 comm_read_handler_fn read_handler
, void *read_private_data
,
64 comm_dead_handler_fn dead_handler
, void *dead_private_data
,
65 struct comm_context
**result
)
67 struct comm_context
*comm
;
74 if (dead_handler
== NULL
) {
78 /* Socket queue relies on non-blocking sockets. */
79 ret
= set_blocking(fd
, false);
84 comm
= talloc_zero(mem_ctx
, struct comm_context
);
90 comm
->read_handler
= read_handler
;
91 comm
->read_private_data
= read_private_data
;
92 comm
->dead_handler
= dead_handler
;
93 comm
->dead_private_data
= dead_private_data
;
95 comm
->queue
= tevent_queue_create(comm
, "comm write queue");
96 if (comm
->queue
== NULL
) {
100 /* Set up to write packets */
101 comm
->fde
= tevent_add_fd(ev
, comm
, fd
, TEVENT_FD_READ
,
102 comm_fd_handler
, comm
);
103 if (comm
->fde
== NULL
) {
107 /* Set up to read packets */
108 if (read_handler
!= NULL
) {
109 struct tevent_req
*req
;
111 req
= comm_read_send(comm
, ev
, comm
, comm
->small_pkt
,
117 tevent_req_set_callback(req
, comm_read_failed
, comm
);
118 comm
->read_req
= req
;
134 struct comm_read_state
{
135 struct tevent_context
*ev
;
136 struct comm_context
*comm
;
139 struct tevent_req
*subreq
;
142 static ssize_t
comm_read_more(uint8_t *buf
, size_t buflen
, void *private_data
);
143 static void comm_read_done(struct tevent_req
*subreq
);
145 static struct tevent_req
*comm_read_send(TALLOC_CTX
*mem_ctx
,
146 struct tevent_context
*ev
,
147 struct comm_context
*comm
,
148 uint8_t *buf
, size_t buflen
)
150 struct tevent_req
*req
, *subreq
;
151 struct comm_read_state
*state
;
153 req
= tevent_req_create(mem_ctx
, &state
, struct comm_read_state
);
161 state
->buflen
= buflen
;
163 subreq
= pkt_read_send(state
, state
->ev
, comm
->fd
, sizeof(uint32_t),
164 state
->buf
, state
->buflen
,
165 comm_read_more
, NULL
);
166 if (tevent_req_nomem(subreq
, req
)) {
167 return tevent_req_post(req
, ev
);
169 state
->subreq
= subreq
;
171 tevent_req_set_callback(subreq
, comm_read_done
, req
);
175 static ssize_t
comm_read_more(uint8_t *buf
, size_t buflen
, void *private_data
)
179 if (buflen
< sizeof(uint32_t)) {
180 return sizeof(uint32_t) - buflen
;
183 packet_len
= *(uint32_t *)buf
;
185 return packet_len
- buflen
;
188 static void comm_read_done(struct tevent_req
*subreq
)
190 struct tevent_req
*req
= tevent_req_callback_data(
191 subreq
, struct tevent_req
);
192 struct comm_read_state
*state
= tevent_req_data(
193 req
, struct comm_read_state
);
194 struct comm_context
*comm
= state
->comm
;
200 nread
= pkt_read_recv(subreq
, state
, &buf
, &free_buf
, &err
);
202 state
->subreq
= NULL
;
204 tevent_req_error(req
, err
);
208 comm
->read_handler(buf
, nread
, comm
->read_private_data
);
214 subreq
= pkt_read_send(state
, state
->ev
, comm
->fd
, sizeof(uint32_t),
215 state
->buf
, state
->buflen
,
216 comm_read_more
, NULL
);
217 if (tevent_req_nomem(subreq
, req
)) {
220 state
->subreq
= subreq
;
222 tevent_req_set_callback(subreq
, comm_read_done
, req
);
225 static void comm_read_recv(struct tevent_req
*req
, int *perr
)
229 if (tevent_req_is_unix_error(req
, &err
)) {
236 static void comm_read_failed(struct tevent_req
*req
)
238 struct comm_context
*comm
= tevent_req_callback_data(
239 req
, struct comm_context
);
241 comm_read_recv(req
, NULL
);
243 comm
->read_req
= NULL
;
244 if (comm
->dead_handler
!= NULL
) {
245 comm
->dead_handler(comm
->dead_private_data
);
254 struct comm_write_entry
{
255 struct comm_context
*comm
;
256 struct tevent_queue_entry
*qentry
;
257 struct tevent_req
*req
;
260 struct comm_write_state
{
261 struct tevent_context
*ev
;
262 struct comm_context
*comm
;
263 struct comm_write_entry
*entry
;
264 struct tevent_req
*subreq
;
266 size_t buflen
, nwritten
;
269 static int comm_write_entry_destructor(struct comm_write_entry
*entry
);
270 static void comm_write_trigger(struct tevent_req
*req
, void *private_data
);
271 static void comm_write_done(struct tevent_req
*subreq
);
273 struct tevent_req
*comm_write_send(TALLOC_CTX
*mem_ctx
,
274 struct tevent_context
*ev
,
275 struct comm_context
*comm
,
276 uint8_t *buf
, size_t buflen
)
278 struct tevent_req
*req
;
279 struct comm_write_state
*state
;
280 struct comm_write_entry
*entry
;
282 req
= tevent_req_create(mem_ctx
, &state
, struct comm_write_state
);
290 state
->buflen
= buflen
;
292 entry
= talloc_zero(state
, struct comm_write_entry
);
293 if (tevent_req_nomem(entry
, req
)) {
294 return tevent_req_post(req
, ev
);
299 entry
->qentry
= tevent_queue_add_entry(comm
->queue
, ev
, req
,
300 comm_write_trigger
, NULL
);
301 if (tevent_req_nomem(entry
->qentry
, req
)) {
302 return tevent_req_post(req
, ev
);
305 state
->entry
= entry
;
306 talloc_set_destructor(entry
, comm_write_entry_destructor
);
311 static int comm_write_entry_destructor(struct comm_write_entry
*entry
)
313 struct comm_context
*comm
= entry
->comm
;
315 if (comm
->write_req
== entry
->req
) {
316 comm
->write_req
= NULL
;
317 TEVENT_FD_NOT_WRITEABLE(comm
->fde
);
320 TALLOC_FREE(entry
->qentry
);
324 static void comm_write_trigger(struct tevent_req
*req
, void *private_data
)
326 struct comm_write_state
*state
= tevent_req_data(
327 req
, struct comm_write_state
);
328 struct comm_context
*comm
= state
->comm
;
329 struct tevent_req
*subreq
;
331 comm
->write_req
= req
;
333 subreq
= pkt_write_send(state
, state
->ev
, comm
->fd
,
334 state
->buf
, state
->buflen
);
335 if (tevent_req_nomem(subreq
, req
)) {
339 state
->subreq
= subreq
;
340 tevent_req_set_callback(subreq
, comm_write_done
, req
);
341 TEVENT_FD_WRITEABLE(comm
->fde
);
344 static void comm_write_done(struct tevent_req
*subreq
)
346 struct tevent_req
*req
= tevent_req_callback_data(
347 subreq
, struct tevent_req
);
348 struct comm_write_state
*state
= tevent_req_data(
349 req
, struct comm_write_state
);
350 struct comm_context
*comm
= state
->comm
;
354 TEVENT_FD_NOT_WRITEABLE(comm
->fde
);
355 nwritten
= pkt_write_recv(subreq
, &err
);
357 state
->subreq
= NULL
;
358 comm
->write_req
= NULL
;
359 if (nwritten
== -1) {
361 comm
->dead_handler(comm
->dead_private_data
);
363 tevent_req_error(req
, err
);
367 state
->nwritten
= nwritten
;
368 state
->entry
->qentry
= NULL
;
369 TALLOC_FREE(state
->entry
);
370 tevent_req_done(req
);
373 bool comm_write_recv(struct tevent_req
*req
, int *perr
)
375 struct comm_write_state
*state
= tevent_req_data(
376 req
, struct comm_write_state
);
379 if (tevent_req_is_unix_error(req
, &err
)) {
386 if (state
->nwritten
!= state
->buflen
) {
395 static void comm_fd_handler(struct tevent_context
*ev
,
396 struct tevent_fd
*fde
,
397 uint16_t flags
, void *private_data
)
399 struct comm_context
*comm
= talloc_get_type_abort(
400 private_data
, struct comm_context
);
402 if (flags
& TEVENT_FD_READ
) {
403 struct comm_read_state
*read_state
;
405 if (comm
->read_req
== NULL
) {
406 /* This should never happen */
410 read_state
= tevent_req_data(comm
->read_req
,
411 struct comm_read_state
);
412 pkt_read_handler(ev
, fde
, flags
, read_state
->subreq
);
415 if (flags
& TEVENT_FD_WRITE
) {
416 struct comm_write_state
*write_state
;
418 if (comm
->write_req
== NULL
) {
419 TEVENT_FD_NOT_WRITEABLE(comm
->fde
);
423 write_state
= tevent_req_data(comm
->write_req
,
424 struct comm_write_state
);
425 pkt_write_handler(ev
, fde
, flags
, write_state
->subreq
);