lib: Remove unused parmlist code
[Samba.git] / ctdb / common / ctdb_io.c
blob53486f43da63d568939c0c935ccac58e12ecf274
1 /*
2 ctdb database library
3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
5 data.
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
30 #include <stdarg.h>
32 #define QUEUE_BUFFER_SIZE (16*1024)
34 /* structures for packet queueing - see common/ctdb_io.c */
35 struct ctdb_buffer {
36 uint8_t *data;
37 uint32_t length;
38 uint32_t size;
39 uint32_t extend;
42 struct ctdb_queue_pkt {
43 struct ctdb_queue_pkt *next, *prev;
44 uint8_t *data;
45 uint32_t length;
46 uint32_t full_length;
47 uint8_t buf[];
50 struct ctdb_queue {
51 struct ctdb_context *ctdb;
52 struct tevent_immediate *im;
53 struct ctdb_buffer buffer; /* input buffer */
54 struct ctdb_queue_pkt *out_queue, *out_queue_tail;
55 uint32_t out_queue_length;
56 struct fd_event *fde;
57 int fd;
58 size_t alignment;
59 void *private_data;
60 ctdb_queue_cb_fn_t callback;
61 bool *destroyed;
62 const char *name;
67 int ctdb_queue_length(struct ctdb_queue *queue)
69 return queue->out_queue_length;
72 static void queue_process(struct ctdb_queue *queue);
74 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
75 void *private_data)
77 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
79 queue_process(queue);
83 * This function is used to process data in queue buffer.
85 * Queue callback function can end up freeing the queue, there should not be a
86 * loop processing packets from queue buffer. Instead set up a timed event for
87 * immediate run to process remaining packets from buffer.
89 static void queue_process(struct ctdb_queue *queue)
91 uint32_t pkt_size;
92 uint8_t *data;
94 if (queue->buffer.length < sizeof(pkt_size)) {
95 return;
98 pkt_size = *(uint32_t *)queue->buffer.data;
99 if (pkt_size == 0) {
100 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
101 goto failed;
104 if (queue->buffer.length < pkt_size) {
105 if (pkt_size > QUEUE_BUFFER_SIZE) {
106 queue->buffer.extend = pkt_size;
108 return;
111 /* Extract complete packet */
112 data = talloc_size(queue, pkt_size);
113 if (data == NULL) {
114 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
115 return;
117 memcpy(data, queue->buffer.data, pkt_size);
119 /* Shift packet out from buffer */
120 if (queue->buffer.length > pkt_size) {
121 memmove(queue->buffer.data,
122 queue->buffer.data + pkt_size,
123 queue->buffer.length - pkt_size);
125 queue->buffer.length -= pkt_size;
127 if (queue->buffer.length > 0) {
128 /* There is more data to be processed, schedule an event */
129 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
130 queue_process_event, queue);
131 } else {
132 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
133 TALLOC_FREE(queue->buffer.data);
134 queue->buffer.size = 0;
138 /* It is the responsibility of the callback to free 'data' */
139 queue->callback(data, pkt_size, queue->private_data);
140 return;
142 failed:
143 queue->callback(NULL, 0, queue->private_data);
149 called when an incoming connection is readable
150 This function MUST be safe for reentry via the queue callback!
152 static void queue_io_read(struct ctdb_queue *queue)
154 int num_ready = 0;
155 ssize_t nread;
156 uint8_t *data;
157 int navail;
159 /* check how much data is available on the socket for immediately
160 guaranteed nonblocking access.
161 as long as we are careful never to try to read more than this
162 we know all reads will be successful and will neither block
163 nor fail with a "data not available right now" error
165 if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
166 return;
168 if (num_ready == 0) {
169 /* the descriptor has been closed */
170 goto failed;
173 if (queue->buffer.data == NULL) {
174 /* starting fresh, allocate buf to read data */
175 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
176 if (queue->buffer.data == NULL) {
177 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
178 goto failed;
180 queue->buffer.size = QUEUE_BUFFER_SIZE;
181 } else if (queue->buffer.extend > 0) {
182 /* extending buffer */
183 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
184 if (data == NULL) {
185 DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
186 goto failed;
188 queue->buffer.data = data;
189 queue->buffer.size = queue->buffer.extend;
190 queue->buffer.extend = 0;
193 navail = queue->buffer.size - queue->buffer.length;
194 if (num_ready > navail) {
195 num_ready = navail;
198 if (num_ready > 0) {
199 nread = sys_read(queue->fd,
200 queue->buffer.data + queue->buffer.length,
201 num_ready);
202 if (nread <= 0) {
203 DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
204 goto failed;
206 queue->buffer.length += nread;
209 queue_process(queue);
210 return;
212 failed:
213 queue->callback(NULL, 0, queue->private_data);
217 /* used when an event triggers a dead queue */
218 static void queue_dead(struct event_context *ev, struct tevent_immediate *im,
219 void *private_data)
221 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
222 queue->callback(NULL, 0, queue->private_data);
227 called when an incoming connection is writeable
229 static void queue_io_write(struct ctdb_queue *queue)
231 while (queue->out_queue) {
232 struct ctdb_queue_pkt *pkt = queue->out_queue;
233 ssize_t n;
234 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
235 n = write(queue->fd, pkt->data, 1);
236 } else {
237 n = write(queue->fd, pkt->data, pkt->length);
240 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
241 if (pkt->length != pkt->full_length) {
242 /* partial packet sent - we have to drop it */
243 DLIST_REMOVE(queue->out_queue, pkt);
244 queue->out_queue_length--;
245 talloc_free(pkt);
247 talloc_free(queue->fde);
248 queue->fde = NULL;
249 queue->fd = -1;
250 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
251 queue_dead, queue);
252 return;
254 if (n <= 0) return;
256 if (n != pkt->length) {
257 pkt->length -= n;
258 pkt->data += n;
259 return;
262 DLIST_REMOVE(queue->out_queue, pkt);
263 queue->out_queue_length--;
264 talloc_free(pkt);
267 EVENT_FD_NOT_WRITEABLE(queue->fde);
271 called when an incoming connection is readable or writeable
273 static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
274 uint16_t flags, void *private_data)
276 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
278 if (flags & EVENT_FD_READ) {
279 queue_io_read(queue);
280 } else {
281 queue_io_write(queue);
287 queue a packet for sending
289 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
291 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
292 struct ctdb_queue_pkt *pkt;
293 uint32_t length2, full_length;
295 if (queue->alignment) {
296 /* enforce the length and alignment rules from the tcp packet allocator */
297 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
298 *(uint32_t *)data = length2;
299 } else {
300 length2 = length;
303 if (length2 != length) {
304 memset(data+length, 0, length2-length);
307 full_length = length2;
309 /* if the queue is empty then try an immediate write, avoiding
310 queue overhead. This relies on non-blocking sockets */
311 if (queue->out_queue == NULL && queue->fd != -1 &&
312 !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
313 ssize_t n = write(queue->fd, data, length2);
314 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
315 talloc_free(queue->fde);
316 queue->fde = NULL;
317 queue->fd = -1;
318 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
319 queue_dead, queue);
320 /* yes, we report success, as the dead node is
321 handled via a separate event */
322 return 0;
324 if (n > 0) {
325 data += n;
326 length2 -= n;
328 if (length2 == 0) return 0;
331 pkt = talloc_size(
332 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
333 CTDB_NO_MEMORY(queue->ctdb, pkt);
334 talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
336 pkt->data = pkt->buf;
337 memcpy(pkt->data, data, length2);
339 pkt->length = length2;
340 pkt->full_length = full_length;
342 if (queue->out_queue == NULL && queue->fd != -1) {
343 EVENT_FD_WRITEABLE(queue->fde);
346 DLIST_ADD_END(queue->out_queue, pkt, NULL);
348 queue->out_queue_length++;
350 if (queue->ctdb->tunable.verbose_memory_names != 0) {
351 switch (hdr->operation) {
352 case CTDB_REQ_CONTROL: {
353 struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
354 talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
355 queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
356 break;
358 case CTDB_REQ_MESSAGE: {
359 struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
360 talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
361 queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
362 break;
364 default:
365 talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
366 queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
367 (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
368 break;
372 return 0;
377 setup the fd used by the queue
379 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
381 queue->fd = fd;
382 talloc_free(queue->fde);
383 queue->fde = NULL;
385 if (fd != -1) {
386 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
387 queue_io_handler, queue);
388 if (queue->fde == NULL) {
389 return -1;
391 tevent_fd_set_auto_close(queue->fde);
393 if (queue->out_queue) {
394 EVENT_FD_WRITEABLE(queue->fde);
398 return 0;
401 /* If someone sets up this pointer, they want to know if the queue is freed */
402 static int queue_destructor(struct ctdb_queue *queue)
404 TALLOC_FREE(queue->buffer.data);
405 queue->buffer.length = 0;
406 queue->buffer.size = 0;
407 if (queue->destroyed != NULL)
408 *queue->destroyed = true;
409 return 0;
413 setup a packet queue on a socket
415 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
416 TALLOC_CTX *mem_ctx, int fd, int alignment,
417 ctdb_queue_cb_fn_t callback,
418 void *private_data, const char *fmt, ...)
420 struct ctdb_queue *queue;
421 va_list ap;
423 queue = talloc_zero(mem_ctx, struct ctdb_queue);
424 CTDB_NO_MEMORY_NULL(ctdb, queue);
425 va_start(ap, fmt);
426 queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
427 va_end(ap);
428 CTDB_NO_MEMORY_NULL(ctdb, queue->name);
430 queue->im= tevent_create_immediate(queue);
431 CTDB_NO_MEMORY_NULL(ctdb, queue->im);
433 queue->ctdb = ctdb;
434 queue->fd = fd;
435 queue->alignment = alignment;
436 queue->private_data = private_data;
437 queue->callback = callback;
438 if (fd != -1) {
439 if (ctdb_queue_set_fd(queue, fd) != 0) {
440 talloc_free(queue);
441 return NULL;
444 talloc_set_destructor(queue, queue_destructor);
446 return queue;