3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
32 #define QUEUE_BUFFER_SIZE (16*1024)
34 /* structures for packet queueing - see common/ctdb_io.c */
42 struct ctdb_queue_pkt
{
43 struct ctdb_queue_pkt
*next
, *prev
;
51 struct ctdb_context
*ctdb
;
52 struct tevent_immediate
*im
;
53 struct ctdb_buffer buffer
; /* input buffer */
54 struct ctdb_queue_pkt
*out_queue
, *out_queue_tail
;
55 uint32_t out_queue_length
;
60 ctdb_queue_cb_fn_t callback
;
67 int ctdb_queue_length(struct ctdb_queue
*queue
)
69 return queue
->out_queue_length
;
72 static void queue_process(struct ctdb_queue
*queue
);
74 static void queue_process_event(struct tevent_context
*ev
, struct tevent_immediate
*im
,
77 struct ctdb_queue
*queue
= talloc_get_type(private_data
, struct ctdb_queue
);
83 * This function is used to process data in queue buffer.
85 * Queue callback function can end up freeing the queue, there should not be a
86 * loop processing packets from queue buffer. Instead set up a timed event for
87 * immediate run to process remaining packets from buffer.
89 static void queue_process(struct ctdb_queue
*queue
)
94 if (queue
->buffer
.length
< sizeof(pkt_size
)) {
98 pkt_size
= *(uint32_t *)queue
->buffer
.data
;
100 DEBUG(DEBUG_CRIT
, ("Invalid packet of length 0\n"));
104 if (queue
->buffer
.length
< pkt_size
) {
105 if (pkt_size
> QUEUE_BUFFER_SIZE
) {
106 queue
->buffer
.extend
= pkt_size
;
111 /* Extract complete packet */
112 data
= talloc_size(queue
, pkt_size
);
114 DEBUG(DEBUG_ERR
, ("read error alloc failed for %u\n", pkt_size
));
117 memcpy(data
, queue
->buffer
.data
, pkt_size
);
119 /* Shift packet out from buffer */
120 if (queue
->buffer
.length
> pkt_size
) {
121 memmove(queue
->buffer
.data
,
122 queue
->buffer
.data
+ pkt_size
,
123 queue
->buffer
.length
- pkt_size
);
125 queue
->buffer
.length
-= pkt_size
;
127 if (queue
->buffer
.length
> 0) {
128 /* There is more data to be processed, schedule an event */
129 tevent_schedule_immediate(queue
->im
, queue
->ctdb
->ev
,
130 queue_process_event
, queue
);
132 if (queue
->buffer
.size
> QUEUE_BUFFER_SIZE
) {
133 TALLOC_FREE(queue
->buffer
.data
);
134 queue
->buffer
.size
= 0;
138 /* It is the responsibility of the callback to free 'data' */
139 queue
->callback(data
, pkt_size
, queue
->private_data
);
143 queue
->callback(NULL
, 0, queue
->private_data
);
149 called when an incoming connection is readable
150 This function MUST be safe for reentry via the queue callback!
152 static void queue_io_read(struct ctdb_queue
*queue
)
159 /* check how much data is available on the socket for immediately
160 guaranteed nonblocking access.
161 as long as we are careful never to try to read more than this
162 we know all reads will be successful and will neither block
163 nor fail with a "data not available right now" error
165 if (ioctl(queue
->fd
, FIONREAD
, &num_ready
) != 0) {
168 if (num_ready
== 0) {
169 /* the descriptor has been closed */
173 if (queue
->buffer
.data
== NULL
) {
174 /* starting fresh, allocate buf to read data */
175 queue
->buffer
.data
= talloc_size(queue
, QUEUE_BUFFER_SIZE
);
176 if (queue
->buffer
.data
== NULL
) {
177 DEBUG(DEBUG_ERR
, ("read error alloc failed for %u\n", num_ready
));
180 queue
->buffer
.size
= QUEUE_BUFFER_SIZE
;
181 } else if (queue
->buffer
.extend
> 0) {
182 /* extending buffer */
183 data
= talloc_realloc_size(queue
, queue
->buffer
.data
, queue
->buffer
.extend
);
185 DEBUG(DEBUG_ERR
, ("read error realloc failed for %u\n", queue
->buffer
.extend
));
188 queue
->buffer
.data
= data
;
189 queue
->buffer
.size
= queue
->buffer
.extend
;
190 queue
->buffer
.extend
= 0;
193 navail
= queue
->buffer
.size
- queue
->buffer
.length
;
194 if (num_ready
> navail
) {
199 nread
= read(queue
->fd
, queue
->buffer
.data
+ queue
->buffer
.length
, num_ready
);
201 DEBUG(DEBUG_ERR
, ("read error nread=%d\n", (int)nread
));
204 queue
->buffer
.length
+= nread
;
207 queue_process(queue
);
211 queue
->callback(NULL
, 0, queue
->private_data
);
215 /* used when an event triggers a dead queue */
216 static void queue_dead(struct event_context
*ev
, struct tevent_immediate
*im
,
219 struct ctdb_queue
*queue
= talloc_get_type(private_data
, struct ctdb_queue
);
220 queue
->callback(NULL
, 0, queue
->private_data
);
225 called when an incoming connection is writeable
227 static void queue_io_write(struct ctdb_queue
*queue
)
229 while (queue
->out_queue
) {
230 struct ctdb_queue_pkt
*pkt
= queue
->out_queue
;
232 if (queue
->ctdb
->flags
& CTDB_FLAG_TORTURE
) {
233 n
= write(queue
->fd
, pkt
->data
, 1);
235 n
= write(queue
->fd
, pkt
->data
, pkt
->length
);
238 if (n
== -1 && errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
239 if (pkt
->length
!= pkt
->full_length
) {
240 /* partial packet sent - we have to drop it */
241 DLIST_REMOVE(queue
->out_queue
, pkt
);
242 queue
->out_queue_length
--;
245 talloc_free(queue
->fde
);
248 tevent_schedule_immediate(queue
->im
, queue
->ctdb
->ev
,
254 if (n
!= pkt
->length
) {
260 DLIST_REMOVE(queue
->out_queue
, pkt
);
261 queue
->out_queue_length
--;
265 EVENT_FD_NOT_WRITEABLE(queue
->fde
);
269 called when an incoming connection is readable or writeable
271 static void queue_io_handler(struct event_context
*ev
, struct fd_event
*fde
,
272 uint16_t flags
, void *private_data
)
274 struct ctdb_queue
*queue
= talloc_get_type(private_data
, struct ctdb_queue
);
276 if (flags
& EVENT_FD_READ
) {
277 queue_io_read(queue
);
279 queue_io_write(queue
);
285 queue a packet for sending
287 int ctdb_queue_send(struct ctdb_queue
*queue
, uint8_t *data
, uint32_t length
)
289 struct ctdb_req_header
*hdr
= (struct ctdb_req_header
*)data
;
290 struct ctdb_queue_pkt
*pkt
;
291 uint32_t length2
, full_length
;
293 if (queue
->alignment
) {
294 /* enforce the length and alignment rules from the tcp packet allocator */
295 length2
= (length
+(queue
->alignment
-1)) & ~(queue
->alignment
-1);
296 *(uint32_t *)data
= length2
;
301 if (length2
!= length
) {
302 memset(data
+length
, 0, length2
-length
);
305 full_length
= length2
;
307 /* if the queue is empty then try an immediate write, avoiding
308 queue overhead. This relies on non-blocking sockets */
309 if (queue
->out_queue
== NULL
&& queue
->fd
!= -1 &&
310 !(queue
->ctdb
->flags
& CTDB_FLAG_TORTURE
)) {
311 ssize_t n
= write(queue
->fd
, data
, length2
);
312 if (n
== -1 && errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
313 talloc_free(queue
->fde
);
316 tevent_schedule_immediate(queue
->im
, queue
->ctdb
->ev
,
318 /* yes, we report success, as the dead node is
319 handled via a separate event */
326 if (length2
== 0) return 0;
330 queue
, offsetof(struct ctdb_queue_pkt
, buf
) + length2
);
331 CTDB_NO_MEMORY(queue
->ctdb
, pkt
);
332 talloc_set_name_const(pkt
, "struct ctdb_queue_pkt");
334 pkt
->data
= pkt
->buf
;
335 memcpy(pkt
->data
, data
, length2
);
337 pkt
->length
= length2
;
338 pkt
->full_length
= full_length
;
340 if (queue
->out_queue
== NULL
&& queue
->fd
!= -1) {
341 EVENT_FD_WRITEABLE(queue
->fde
);
344 DLIST_ADD_END(queue
->out_queue
, pkt
, NULL
);
346 queue
->out_queue_length
++;
348 if (queue
->ctdb
->tunable
.verbose_memory_names
!= 0) {
349 switch (hdr
->operation
) {
350 case CTDB_REQ_CONTROL
: {
351 struct ctdb_req_control
*c
= (struct ctdb_req_control
*)hdr
;
352 talloc_set_name(pkt
, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
353 queue
->name
, (unsigned)c
->opcode
, (unsigned long long)c
->srvid
, (unsigned)c
->datalen
);
356 case CTDB_REQ_MESSAGE
: {
357 struct ctdb_req_message
*m
= (struct ctdb_req_message
*)hdr
;
358 talloc_set_name(pkt
, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
359 queue
->name
, (unsigned long long)m
->srvid
, (unsigned)m
->datalen
);
363 talloc_set_name(pkt
, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
364 queue
->name
, (unsigned)hdr
->operation
, (unsigned)hdr
->length
,
365 (unsigned)hdr
->srcnode
, (unsigned)hdr
->destnode
);
375 setup the fd used by the queue
377 int ctdb_queue_set_fd(struct ctdb_queue
*queue
, int fd
)
380 talloc_free(queue
->fde
);
384 queue
->fde
= event_add_fd(queue
->ctdb
->ev
, queue
, fd
, EVENT_FD_READ
,
385 queue_io_handler
, queue
);
386 if (queue
->fde
== NULL
) {
389 tevent_fd_set_auto_close(queue
->fde
);
391 if (queue
->out_queue
) {
392 EVENT_FD_WRITEABLE(queue
->fde
);
399 /* If someone sets up this pointer, they want to know if the queue is freed */
400 static int queue_destructor(struct ctdb_queue
*queue
)
402 TALLOC_FREE(queue
->buffer
.data
);
403 queue
->buffer
.length
= 0;
404 queue
->buffer
.size
= 0;
405 if (queue
->destroyed
!= NULL
)
406 *queue
->destroyed
= true;
411 setup a packet queue on a socket
413 struct ctdb_queue
*ctdb_queue_setup(struct ctdb_context
*ctdb
,
414 TALLOC_CTX
*mem_ctx
, int fd
, int alignment
,
415 ctdb_queue_cb_fn_t callback
,
416 void *private_data
, const char *fmt
, ...)
418 struct ctdb_queue
*queue
;
421 queue
= talloc_zero(mem_ctx
, struct ctdb_queue
);
422 CTDB_NO_MEMORY_NULL(ctdb
, queue
);
424 queue
->name
= talloc_vasprintf(mem_ctx
, fmt
, ap
);
426 CTDB_NO_MEMORY_NULL(ctdb
, queue
->name
);
428 queue
->im
= tevent_create_immediate(queue
);
429 CTDB_NO_MEMORY_NULL(ctdb
, queue
->im
);
433 queue
->alignment
= alignment
;
434 queue
->private_data
= private_data
;
435 queue
->callback
= callback
;
437 if (ctdb_queue_set_fd(queue
, fd
) != 0) {
442 talloc_set_destructor(queue
, queue_destructor
);