torture: convert torture_comment() -> torture_result() so we can knownfail flapping...
[Samba/wip.git] / ctdb / common / ctdb_io.c
blob351006db2d354674684828055eee0e5d50df5734
1 /*
2 ctdb database library
3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
5 data.
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
30 #include <stdarg.h>
32 #define QUEUE_BUFFER_SIZE (16*1024)
34 /* structures for packet queueing - see common/ctdb_io.c */
35 struct ctdb_buffer {
36 uint8_t *data;
37 uint32_t length;
38 uint32_t size;
39 uint32_t extend;
42 struct ctdb_queue_pkt {
43 struct ctdb_queue_pkt *next, *prev;
44 uint8_t *data;
45 uint32_t length;
46 uint32_t full_length;
49 struct ctdb_queue {
50 struct ctdb_context *ctdb;
51 struct tevent_immediate *im;
52 struct ctdb_buffer buffer; /* input buffer */
53 struct ctdb_queue_pkt *out_queue, *out_queue_tail;
54 uint32_t out_queue_length;
55 struct fd_event *fde;
56 int fd;
57 size_t alignment;
58 void *private_data;
59 ctdb_queue_cb_fn_t callback;
60 bool *destroyed;
61 const char *name;
66 int ctdb_queue_length(struct ctdb_queue *queue)
68 return queue->out_queue_length;
71 static void queue_process(struct ctdb_queue *queue);
73 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
74 void *private_data)
76 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
78 queue_process(queue);
82 * This function is used to process data in queue buffer.
84 * Queue callback function can end up freeing the queue, there should not be a
85 * loop processing packets from queue buffer. Instead set up a timed event for
86 * immediate run to process remaining packets from buffer.
88 static void queue_process(struct ctdb_queue *queue)
90 uint32_t pkt_size;
91 uint8_t *data;
93 if (queue->buffer.length < sizeof(pkt_size)) {
94 return;
97 pkt_size = *(uint32_t *)queue->buffer.data;
98 if (pkt_size == 0) {
99 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
100 goto failed;
103 if (queue->buffer.length < pkt_size) {
104 if (pkt_size > QUEUE_BUFFER_SIZE) {
105 queue->buffer.extend = pkt_size;
107 return;
110 /* Extract complete packet */
111 data = talloc_size(queue, pkt_size);
112 if (data == NULL) {
113 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
114 return;
116 memcpy(data, queue->buffer.data, pkt_size);
118 /* Shift packet out from buffer */
119 if (queue->buffer.length > pkt_size) {
120 memmove(queue->buffer.data,
121 queue->buffer.data + pkt_size,
122 queue->buffer.length - pkt_size);
124 queue->buffer.length -= pkt_size;
126 if (queue->buffer.length > 0) {
127 /* There is more data to be processed, schedule an event */
128 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
129 queue_process_event, queue);
130 } else {
131 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
132 TALLOC_FREE(queue->buffer.data);
133 queue->buffer.size = 0;
137 /* It is the responsibility of the callback to free 'data' */
138 queue->callback(data, pkt_size, queue->private_data);
139 return;
141 failed:
142 queue->callback(NULL, 0, queue->private_data);
148 called when an incoming connection is readable
149 This function MUST be safe for reentry via the queue callback!
151 static void queue_io_read(struct ctdb_queue *queue)
153 int num_ready = 0;
154 ssize_t nread;
155 uint8_t *data;
156 int navail;
158 /* check how much data is available on the socket for immediately
159 guaranteed nonblocking access.
160 as long as we are careful never to try to read more than this
161 we know all reads will be successful and will neither block
162 nor fail with a "data not available right now" error
164 if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
165 return;
167 if (num_ready == 0) {
168 /* the descriptor has been closed */
169 goto failed;
172 if (queue->buffer.data == NULL) {
173 /* starting fresh, allocate buf to read data */
174 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
175 if (queue->buffer.data == NULL) {
176 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
177 goto failed;
179 queue->buffer.size = QUEUE_BUFFER_SIZE;
180 } else if (queue->buffer.extend > 0) {
181 /* extending buffer */
182 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
183 if (data == NULL) {
184 DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
185 goto failed;
187 queue->buffer.data = data;
188 queue->buffer.size = queue->buffer.extend;
189 queue->buffer.extend = 0;
192 navail = queue->buffer.size - queue->buffer.length;
193 if (num_ready > navail) {
194 num_ready = navail;
197 if (num_ready > 0) {
198 nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
199 if (nread <= 0) {
200 DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
201 goto failed;
203 queue->buffer.length += nread;
206 queue_process(queue);
207 return;
209 failed:
210 queue->callback(NULL, 0, queue->private_data);
214 /* used when an event triggers a dead queue */
215 static void queue_dead(struct event_context *ev, struct tevent_immediate *im,
216 void *private_data)
218 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
219 queue->callback(NULL, 0, queue->private_data);
224 called when an incoming connection is writeable
226 static void queue_io_write(struct ctdb_queue *queue)
228 while (queue->out_queue) {
229 struct ctdb_queue_pkt *pkt = queue->out_queue;
230 ssize_t n;
231 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
232 n = write(queue->fd, pkt->data, 1);
233 } else {
234 n = write(queue->fd, pkt->data, pkt->length);
237 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
238 if (pkt->length != pkt->full_length) {
239 /* partial packet sent - we have to drop it */
240 DLIST_REMOVE(queue->out_queue, pkt);
241 queue->out_queue_length--;
242 talloc_free(pkt);
244 talloc_free(queue->fde);
245 queue->fde = NULL;
246 queue->fd = -1;
247 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
248 queue_dead, queue);
249 return;
251 if (n <= 0) return;
253 if (n != pkt->length) {
254 pkt->length -= n;
255 pkt->data += n;
256 return;
259 DLIST_REMOVE(queue->out_queue, pkt);
260 queue->out_queue_length--;
261 talloc_free(pkt);
264 EVENT_FD_NOT_WRITEABLE(queue->fde);
268 called when an incoming connection is readable or writeable
270 static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
271 uint16_t flags, void *private_data)
273 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
275 if (flags & EVENT_FD_READ) {
276 queue_io_read(queue);
277 } else {
278 queue_io_write(queue);
284 queue a packet for sending
286 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
288 struct ctdb_queue_pkt *pkt;
289 uint32_t length2, full_length;
291 if (queue->alignment) {
292 /* enforce the length and alignment rules from the tcp packet allocator */
293 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
294 *(uint32_t *)data = length2;
295 } else {
296 length2 = length;
299 if (length2 != length) {
300 memset(data+length, 0, length2-length);
303 full_length = length2;
305 /* if the queue is empty then try an immediate write, avoiding
306 queue overhead. This relies on non-blocking sockets */
307 if (queue->out_queue == NULL && queue->fd != -1 &&
308 !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
309 ssize_t n = write(queue->fd, data, length2);
310 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
311 talloc_free(queue->fde);
312 queue->fde = NULL;
313 queue->fd = -1;
314 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
315 queue_dead, queue);
316 /* yes, we report success, as the dead node is
317 handled via a separate event */
318 return 0;
320 if (n > 0) {
321 data += n;
322 length2 -= n;
324 if (length2 == 0) return 0;
327 pkt = talloc(queue, struct ctdb_queue_pkt);
328 CTDB_NO_MEMORY(queue->ctdb, pkt);
330 pkt->data = talloc_memdup(pkt, data, length2);
331 CTDB_NO_MEMORY(queue->ctdb, pkt->data);
333 pkt->length = length2;
334 pkt->full_length = full_length;
336 if (queue->out_queue == NULL && queue->fd != -1) {
337 EVENT_FD_WRITEABLE(queue->fde);
340 DLIST_ADD_END(queue->out_queue, pkt, NULL);
342 queue->out_queue_length++;
344 if (queue->ctdb->tunable.verbose_memory_names != 0) {
345 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
346 switch (hdr->operation) {
347 case CTDB_REQ_CONTROL: {
348 struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
349 talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
350 queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
351 break;
353 case CTDB_REQ_MESSAGE: {
354 struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
355 talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
356 queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
357 break;
359 default:
360 talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
361 queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
362 (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
363 break;
367 return 0;
372 setup the fd used by the queue
374 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
376 queue->fd = fd;
377 talloc_free(queue->fde);
378 queue->fde = NULL;
380 if (fd != -1) {
381 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
382 queue_io_handler, queue);
383 if (queue->fde == NULL) {
384 return -1;
386 tevent_fd_set_auto_close(queue->fde);
388 if (queue->out_queue) {
389 EVENT_FD_WRITEABLE(queue->fde);
393 return 0;
396 /* If someone sets up this pointer, they want to know if the queue is freed */
397 static int queue_destructor(struct ctdb_queue *queue)
399 TALLOC_FREE(queue->buffer.data);
400 queue->buffer.length = 0;
401 queue->buffer.size = 0;
402 if (queue->destroyed != NULL)
403 *queue->destroyed = true;
404 return 0;
408 setup a packet queue on a socket
410 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
411 TALLOC_CTX *mem_ctx, int fd, int alignment,
412 ctdb_queue_cb_fn_t callback,
413 void *private_data, const char *fmt, ...)
415 struct ctdb_queue *queue;
416 va_list ap;
418 queue = talloc_zero(mem_ctx, struct ctdb_queue);
419 CTDB_NO_MEMORY_NULL(ctdb, queue);
420 va_start(ap, fmt);
421 queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
422 va_end(ap);
423 CTDB_NO_MEMORY_NULL(ctdb, queue->name);
425 queue->im= tevent_create_immediate(queue);
426 CTDB_NO_MEMORY_NULL(ctdb, queue->im);
428 queue->ctdb = ctdb;
429 queue->fd = fd;
430 queue->alignment = alignment;
431 queue->private_data = private_data;
432 queue->callback = callback;
433 if (fd != -1) {
434 if (ctdb_queue_set_fd(queue, fd) != 0) {
435 talloc_free(queue);
436 return NULL;
439 talloc_set_destructor(queue, queue_destructor);
441 return queue;