From a2abdc13531bdd874df3c6d410370d52e24307fd Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Fri, 18 Jan 2013 10:42:14 +1100 Subject: [PATCH] common/io: Rewrite socket handling code to read all available data This improves the processing of packets considerably. It has been observed that there can be as many as 10 packets in the socket buffer and the current code of reading a single packet from a socket at a time is not very optimal. This change reads all the bytes from socket buffer and then parses to extract multiple packets. If there are multiple packets, set up a timed event to process next packet. Signed-off-by: Amitay Isaacs (This used to be ctdb commit d788bc8f7212b7dc1587ae592242dc8c876f4053) --- ctdb/common/ctdb_io.c | 160 +++++++++++++++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 68 deletions(-) diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c index 3ac1b63b75e..b4224c4e0fb 100644 --- a/ctdb/common/ctdb_io.c +++ b/ctdb/common/ctdb_io.c @@ -30,9 +30,10 @@ #include /* structures for packet queueing - see common/ctdb_io.c */ -struct ctdb_partial { +struct ctdb_buffer { uint8_t *data; uint32_t length; + uint32_t size; }; struct ctdb_queue_pkt { @@ -44,7 +45,7 @@ struct ctdb_queue_pkt { struct ctdb_queue { struct ctdb_context *ctdb; - struct ctdb_partial partial; /* partial input packet */ + struct ctdb_buffer buffer; /* input buffer */ struct ctdb_queue_pkt *out_queue, *out_queue_tail; uint32_t out_queue_length; struct fd_event *fde; @@ -63,6 +64,75 @@ int ctdb_queue_length(struct ctdb_queue *queue) return queue->out_queue_length; } +static void queue_process(struct ctdb_queue *queue); + +static void queue_process_event(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private_data) +{ + struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); + + queue_process(queue); +} + +/* + * This function is used to process data in queue buffer. + * + * Queue callback function can end up freeing the queue, there should not be a + * loop processing packets from queue buffer. Instead set up a timed event for + * immediate run to process remaining packets from buffer. + */ +static void queue_process(struct ctdb_queue *queue) +{ + uint32_t pkt_size; + uint8_t *data; + + if (queue->buffer.length < sizeof(pkt_size)) { + return; + } + + pkt_size = *(uint32_t *)queue->buffer.data; + if (pkt_size == 0) { + DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n")); + goto failed; + } + + if (queue->buffer.length < pkt_size) { + DEBUG(DEBUG_DEBUG, ("Partial packet data read\n")); + return; + } + + /* Extract complete packet */ + data = talloc_size(queue, pkt_size); + if (data == NULL) { + DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size)); + return; + } + memcpy(data, queue->buffer.data, pkt_size); + + /* Shift packet out from buffer */ + if (queue->buffer.length > pkt_size) { + memmove(queue->buffer.data, + queue->buffer.data + pkt_size, + queue->buffer.length - pkt_size); + } + queue->buffer.length -= pkt_size; + + if (queue->buffer.length > 0) { + /* There is more data to be processed, setup timed event */ + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_process_event, queue); + } + + /* It is the responsibility of the callback to free 'data' */ + queue->callback(data, pkt_size, queue->private_data); + return; + +failed: + queue->callback(NULL, 0, queue->private_data); + +} + + /* called when an incoming connection is readable This function MUST be safe for reentry via the queue callback! @@ -70,10 +140,6 @@ int ctdb_queue_length(struct ctdb_queue *queue) static void queue_io_read(struct ctdb_queue *queue) { int num_ready = 0; - uint32_t sz_bytes_req; - uint32_t pkt_size; - uint32_t pkt_bytes_remaining; - uint32_t to_read; ssize_t nread; uint8_t *data; @@ -91,77 +157,33 @@ static void queue_io_read(struct ctdb_queue *queue) goto failed; } - if (queue->partial.data == NULL) { - /* starting fresh, allocate buf for size bytes */ - sz_bytes_req = sizeof(pkt_size); - queue->partial.data = talloc_size(queue, sz_bytes_req); - if (queue->partial.data == NULL) { - DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", - sz_bytes_req)); + if (queue->buffer.data == NULL) { + /* starting fresh, allocate buf to read data */ + queue->buffer.data = talloc_size(queue, num_ready); + if (queue->buffer.data == NULL) { + DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready)); goto failed; } - } else if (queue->partial.length < sizeof(pkt_size)) { - /* yet to find out the packet length */ - sz_bytes_req = sizeof(pkt_size) - queue->partial.length; - } else { - /* partial packet, length known, full buf allocated */ - sz_bytes_req = 0; - } - data = queue->partial.data; - - if (sz_bytes_req > 0) { - to_read = MIN(sz_bytes_req, num_ready); - nread = read(queue->fd, data + queue->partial.length, - to_read); - if (nread <= 0) { - DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread)); + queue->buffer.size = num_ready; + } else if (queue->buffer.length + num_ready > queue->buffer.size) { + /* extending buffer */ + data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.length + num_ready); + if (data == NULL) { + DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.length + num_ready)); goto failed; } - queue->partial.length += nread; - - if (nread < sz_bytes_req) { - /* not enough to know the length */ - DEBUG(DEBUG_DEBUG,("Partial packet length read\n")); - return; - } - /* size now known, allocate buffer for the full packet */ - queue->partial.data = talloc_realloc_size(queue, data, - *(uint32_t *)data); - if (queue->partial.data == NULL) { - DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", - *(uint32_t *)data)); - goto failed; - } - data = queue->partial.data; - num_ready -= nread; + queue->buffer.data = data; + queue->buffer.size = queue->buffer.length + num_ready; } - pkt_size = *(uint32_t *)data; - if (pkt_size == 0) { - DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n")); - goto failed; - } - - pkt_bytes_remaining = pkt_size - queue->partial.length; - to_read = MIN(pkt_bytes_remaining, num_ready); - nread = read(queue->fd, data + queue->partial.length, - to_read); + nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready); if (nread <= 0) { - DEBUG(DEBUG_ERR,("read error nread=%d\n", - (int)nread)); + DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread)); goto failed; } - queue->partial.length += nread; - - if (queue->partial.length < pkt_size) { - DEBUG(DEBUG_DEBUG,("Partial packet data read\n")); - return; - } + queue->buffer.length += nread; - queue->partial.data = NULL; - queue->partial.length = 0; - /* it is the responsibility of the callback to free 'data' */ - queue->callback(data, pkt_size, queue->private_data); + queue_process(queue); return; failed: @@ -354,6 +376,9 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) /* If someone sets up this pointer, they want to know if the queue is freed */ static int queue_destructor(struct ctdb_queue *queue) { + TALLOC_FREE(queue->buffer.data); + queue->buffer.length = 0; + queue->buffer.size = 0; if (queue->destroyed != NULL) *queue->destroyed = true; return 0; @@ -364,7 +389,6 @@ static int queue_destructor(struct ctdb_queue *queue) */ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, int fd, int alignment, - ctdb_queue_cb_fn_t callback, void *private_data, const char *fmt, ...) { -- 2.11.4.GIT