ctdb: Fix verbose_memory_names
[Samba.git] / ctdb / common / ctdb_io.c
blobb5f8a7274c48e3e7d56bb02d63e207d001b9ed0b
1 /*
2 ctdb database library
3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
5 data.
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
30 #include <stdarg.h>
32 #define QUEUE_BUFFER_SIZE (16*1024)
34 /* structures for packet queueing - see common/ctdb_io.c */
35 struct ctdb_buffer {
36 uint8_t *data;
37 uint32_t length;
38 uint32_t size;
39 uint32_t extend;
42 struct ctdb_queue_pkt {
43 struct ctdb_queue_pkt *next, *prev;
44 uint8_t *data;
45 uint32_t length;
46 uint32_t full_length;
47 uint8_t buf[];
50 struct ctdb_queue {
51 struct ctdb_context *ctdb;
52 struct tevent_immediate *im;
53 struct ctdb_buffer buffer; /* input buffer */
54 struct ctdb_queue_pkt *out_queue, *out_queue_tail;
55 uint32_t out_queue_length;
56 struct fd_event *fde;
57 int fd;
58 size_t alignment;
59 void *private_data;
60 ctdb_queue_cb_fn_t callback;
61 bool *destroyed;
62 const char *name;
67 int ctdb_queue_length(struct ctdb_queue *queue)
69 return queue->out_queue_length;
72 static void queue_process(struct ctdb_queue *queue);
74 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
75 void *private_data)
77 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
79 queue_process(queue);
83 * This function is used to process data in queue buffer.
85 * Queue callback function can end up freeing the queue, there should not be a
86 * loop processing packets from queue buffer. Instead set up a timed event for
87 * immediate run to process remaining packets from buffer.
89 static void queue_process(struct ctdb_queue *queue)
91 uint32_t pkt_size;
92 uint8_t *data;
94 if (queue->buffer.length < sizeof(pkt_size)) {
95 return;
98 pkt_size = *(uint32_t *)queue->buffer.data;
99 if (pkt_size == 0) {
100 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
101 goto failed;
104 if (queue->buffer.length < pkt_size) {
105 if (pkt_size > QUEUE_BUFFER_SIZE) {
106 queue->buffer.extend = pkt_size;
108 return;
111 /* Extract complete packet */
112 data = talloc_size(queue, pkt_size);
113 if (data == NULL) {
114 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
115 return;
117 memcpy(data, queue->buffer.data, pkt_size);
119 /* Shift packet out from buffer */
120 if (queue->buffer.length > pkt_size) {
121 memmove(queue->buffer.data,
122 queue->buffer.data + pkt_size,
123 queue->buffer.length - pkt_size);
125 queue->buffer.length -= pkt_size;
127 if (queue->buffer.length > 0) {
128 /* There is more data to be processed, schedule an event */
129 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
130 queue_process_event, queue);
131 } else {
132 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
133 TALLOC_FREE(queue->buffer.data);
134 queue->buffer.size = 0;
138 /* It is the responsibility of the callback to free 'data' */
139 queue->callback(data, pkt_size, queue->private_data);
140 return;
142 failed:
143 queue->callback(NULL, 0, queue->private_data);
149 called when an incoming connection is readable
150 This function MUST be safe for reentry via the queue callback!
152 static void queue_io_read(struct ctdb_queue *queue)
154 int num_ready = 0;
155 ssize_t nread;
156 uint8_t *data;
157 int navail;
159 /* check how much data is available on the socket for immediately
160 guaranteed nonblocking access.
161 as long as we are careful never to try to read more than this
162 we know all reads will be successful and will neither block
163 nor fail with a "data not available right now" error
165 if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
166 return;
168 if (num_ready == 0) {
169 /* the descriptor has been closed */
170 goto failed;
173 if (queue->buffer.data == NULL) {
174 /* starting fresh, allocate buf to read data */
175 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
176 if (queue->buffer.data == NULL) {
177 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
178 goto failed;
180 queue->buffer.size = QUEUE_BUFFER_SIZE;
181 } else if (queue->buffer.extend > 0) {
182 /* extending buffer */
183 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
184 if (data == NULL) {
185 DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
186 goto failed;
188 queue->buffer.data = data;
189 queue->buffer.size = queue->buffer.extend;
190 queue->buffer.extend = 0;
193 navail = queue->buffer.size - queue->buffer.length;
194 if (num_ready > navail) {
195 num_ready = navail;
198 if (num_ready > 0) {
199 nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
200 if (nread <= 0) {
201 DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
202 goto failed;
204 queue->buffer.length += nread;
207 queue_process(queue);
208 return;
210 failed:
211 queue->callback(NULL, 0, queue->private_data);
215 /* used when an event triggers a dead queue */
216 static void queue_dead(struct event_context *ev, struct tevent_immediate *im,
217 void *private_data)
219 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
220 queue->callback(NULL, 0, queue->private_data);
225 called when an incoming connection is writeable
227 static void queue_io_write(struct ctdb_queue *queue)
229 while (queue->out_queue) {
230 struct ctdb_queue_pkt *pkt = queue->out_queue;
231 ssize_t n;
232 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
233 n = write(queue->fd, pkt->data, 1);
234 } else {
235 n = write(queue->fd, pkt->data, pkt->length);
238 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
239 if (pkt->length != pkt->full_length) {
240 /* partial packet sent - we have to drop it */
241 DLIST_REMOVE(queue->out_queue, pkt);
242 queue->out_queue_length--;
243 talloc_free(pkt);
245 talloc_free(queue->fde);
246 queue->fde = NULL;
247 queue->fd = -1;
248 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
249 queue_dead, queue);
250 return;
252 if (n <= 0) return;
254 if (n != pkt->length) {
255 pkt->length -= n;
256 pkt->data += n;
257 return;
260 DLIST_REMOVE(queue->out_queue, pkt);
261 queue->out_queue_length--;
262 talloc_free(pkt);
265 EVENT_FD_NOT_WRITEABLE(queue->fde);
269 called when an incoming connection is readable or writeable
271 static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
272 uint16_t flags, void *private_data)
274 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
276 if (flags & EVENT_FD_READ) {
277 queue_io_read(queue);
278 } else {
279 queue_io_write(queue);
285 queue a packet for sending
287 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
289 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
290 struct ctdb_queue_pkt *pkt;
291 uint32_t length2, full_length;
293 if (queue->alignment) {
294 /* enforce the length and alignment rules from the tcp packet allocator */
295 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
296 *(uint32_t *)data = length2;
297 } else {
298 length2 = length;
301 if (length2 != length) {
302 memset(data+length, 0, length2-length);
305 full_length = length2;
307 /* if the queue is empty then try an immediate write, avoiding
308 queue overhead. This relies on non-blocking sockets */
309 if (queue->out_queue == NULL && queue->fd != -1 &&
310 !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
311 ssize_t n = write(queue->fd, data, length2);
312 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
313 talloc_free(queue->fde);
314 queue->fde = NULL;
315 queue->fd = -1;
316 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
317 queue_dead, queue);
318 /* yes, we report success, as the dead node is
319 handled via a separate event */
320 return 0;
322 if (n > 0) {
323 data += n;
324 length2 -= n;
326 if (length2 == 0) return 0;
329 pkt = talloc_size(
330 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
331 CTDB_NO_MEMORY(queue->ctdb, pkt);
332 talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
334 pkt->data = pkt->buf;
335 memcpy(pkt->data, data, length2);
337 pkt->length = length2;
338 pkt->full_length = full_length;
340 if (queue->out_queue == NULL && queue->fd != -1) {
341 EVENT_FD_WRITEABLE(queue->fde);
344 DLIST_ADD_END(queue->out_queue, pkt, NULL);
346 queue->out_queue_length++;
348 if (queue->ctdb->tunable.verbose_memory_names != 0) {
349 switch (hdr->operation) {
350 case CTDB_REQ_CONTROL: {
351 struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
352 talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
353 queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
354 break;
356 case CTDB_REQ_MESSAGE: {
357 struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
358 talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
359 queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
360 break;
362 default:
363 talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
364 queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
365 (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
366 break;
370 return 0;
375 setup the fd used by the queue
377 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
379 queue->fd = fd;
380 talloc_free(queue->fde);
381 queue->fde = NULL;
383 if (fd != -1) {
384 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
385 queue_io_handler, queue);
386 if (queue->fde == NULL) {
387 return -1;
389 tevent_fd_set_auto_close(queue->fde);
391 if (queue->out_queue) {
392 EVENT_FD_WRITEABLE(queue->fde);
396 return 0;
399 /* If someone sets up this pointer, they want to know if the queue is freed */
400 static int queue_destructor(struct ctdb_queue *queue)
402 TALLOC_FREE(queue->buffer.data);
403 queue->buffer.length = 0;
404 queue->buffer.size = 0;
405 if (queue->destroyed != NULL)
406 *queue->destroyed = true;
407 return 0;
411 setup a packet queue on a socket
413 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
414 TALLOC_CTX *mem_ctx, int fd, int alignment,
415 ctdb_queue_cb_fn_t callback,
416 void *private_data, const char *fmt, ...)
418 struct ctdb_queue *queue;
419 va_list ap;
421 queue = talloc_zero(mem_ctx, struct ctdb_queue);
422 CTDB_NO_MEMORY_NULL(ctdb, queue);
423 va_start(ap, fmt);
424 queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
425 va_end(ap);
426 CTDB_NO_MEMORY_NULL(ctdb, queue->name);
428 queue->im= tevent_create_immediate(queue);
429 CTDB_NO_MEMORY_NULL(ctdb, queue->im);
431 queue->ctdb = ctdb;
432 queue->fd = fd;
433 queue->alignment = alignment;
434 queue->private_data = private_data;
435 queue->callback = callback;
436 if (fd != -1) {
437 if (ctdb_queue_set_fd(queue, fd) != 0) {
438 talloc_free(queue);
439 return NULL;
442 talloc_set_destructor(queue, queue_destructor);
444 return queue;