Adding a little script to run benchmarks. it save the data in a marshalled
[ebb.git] / tcp.c
blob31bbdccb6d3ebe3f71d098755377bf60e443d4b1
1 /* Evented TCP listener
2 * Copyright (c) 2007 Ry Dahl <ry.d4hl@gmail.com>
3 * This software is released under the "MIT License". See README file for details.
4 */
5 #include <unistd.h>
6 #include <fcntl.h>
7 #include <sys/types.h>
8 #include <arpa/inet.h>
9 #include <netdb.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <stdlib.h>
14 #include <errno.h>
15 #include <signal.h>
17 #include <ev.h>
18 #include <glib.h>
19 #include <assert.h>
21 #include "tcp.h"
23 /* Private function */
24 void tcp_peer_stop_read_watcher(tcp_peer *peer);
26 /* Returns the number of bytes remaining to write */
27 int tcp_peer_write(tcp_peer *peer, const char *data, int length)
29 if(!peer->open) {
30 //tcp_warning("Trying to write to a peer that isn't open.");
31 return 0;
34 int sent = send(peer->fd, data, length, MSG_HAVEMORE);
35 if(sent < 0) {
36 //tcp_warning("Error writing: %s", strerror(errno));
37 tcp_peer_close(peer);
38 return 0;
40 ev_timer_again(peer->parent->loop, &(peer->timeout_watcher));
42 return sent;
45 void tcp_peer_on_timeout( struct ev_loop *loop
46 , struct ev_timer *watcher
47 , int revents
50 tcp_peer *peer = (tcp_peer*)(watcher->data);
52 assert(peer->parent->loop == loop);
53 assert(&(peer->timeout_watcher) == watcher);
55 tcp_peer_close(peer);
56 tcp_info("peer timed out");
59 void tcp_peer_on_readable( struct ev_loop *loop
60 , struct ev_io *watcher
61 , int revents
64 tcp_peer *peer = (tcp_peer*)(watcher->data);
65 int length;
67 // check for error in revents
68 if(EV_ERROR & revents) {
69 tcp_error("tcp_peer_on_readable() got error event, closing peer");
70 goto error;
73 assert(peer->open);
74 assert(peer->parent->open);
75 assert(peer->parent->loop == loop);
76 assert(&(peer->read_watcher) == watcher);
78 if(peer->read_cb == NULL) return;
80 length = recv(peer->fd, peer->read_buffer, TCP_CHUNKSIZE, 0);
82 if(length == 0) {
83 //g_debug("zero length read? what to do?");
84 tcp_peer_close(peer);
85 //tcp_peer_stop_read_watcher(peer);
86 return;
87 } else if(length < 0) {
88 if(errno == EBADF || errno == ECONNRESET)
89 g_debug("errno says Connection reset by peer");
90 else
91 tcp_error("Error recving data: %s", strerror(errno));
92 goto error;
95 ev_timer_again(loop, &(peer->timeout_watcher));
96 // g_debug("Read %d bytes", length);
98 peer->read_cb(peer->read_buffer, length, peer->read_cb_data);
99 /* Cannot access peer beyond this point because it's possible that the
100 * user has freed it.
102 return;
103 error:
104 tcp_peer_close(peer);
107 tcp_peer* tcp_peer_new(tcp_listener *listener)
109 socklen_t len;
110 tcp_peer *peer = NULL;
111 int i;
113 /* Get next availible peer */
114 for(i=0; i < TCP_MAX_PEERS; i++)
115 if(!listener->peers[i].open) {
116 peer = &(listener->peers[i]);
117 break;
120 if(peer == NULL) {
121 g_message("Too many peers. Refusing connections.");
122 return NULL;
125 peer->open = TRUE; /* set open ASAP */
127 /* DEBUG */
128 // int count = 0;
129 // for(i = 0; i < TCP_MAX_PEERS; i++)
130 // if(listener->peers[i].open) count += 1;
131 // tcp_info("%d open connections", count);
133 peer->parent = listener;
135 peer->fd = accept(listener->fd, (struct sockaddr*)&(peer->sockaddr), &len);
136 if(peer->fd < 0) {
137 tcp_error("Could not get peer socket");
138 goto error;
142 int r = fcntl(peer->fd, F_SETFL, O_NONBLOCK);
143 assert(r >= 0);
145 peer->read_watcher.data = peer;
146 ev_init(&(peer->read_watcher), tcp_peer_on_readable);
147 ev_io_set(&(peer->read_watcher), peer->fd, EV_READ | EV_ERROR);
148 ev_io_start(listener->loop, &(peer->read_watcher));
150 peer->timeout_watcher.data = peer;
151 ev_timer_init(&(peer->timeout_watcher), tcp_peer_on_timeout, TCP_TIMEOUT, TCP_TIMEOUT);
152 ev_timer_start(listener->loop, &(peer->timeout_watcher));
154 return peer;
156 error:
157 tcp_peer_close(peer);
158 return NULL;
161 void tcp_peer_stop_read_watcher(tcp_peer *peer)
163 //assert(peer->open);
164 assert(peer->parent->open);
165 //assert(peer->read_watcher);
166 ev_io_stop(peer->parent->loop, &(peer->read_watcher));
167 //g_debug("killing read watcher");
171 void tcp_peer_close(tcp_peer *peer)
173 if(peer->open) {
174 ev_io_stop(peer->parent->loop, &(peer->read_watcher));
175 ev_timer_stop(peer->parent->loop, &(peer->timeout_watcher));
176 close(peer->fd);
177 peer->open = FALSE;
181 tcp_listener* tcp_listener_new(struct ev_loop *loop)
183 int r;
185 /* note the following will automatically set all of the peer structures
186 * to have peer->open == FALSE */
187 tcp_listener *listener = g_new0(tcp_listener, 1);
189 listener->fd = socket(PF_INET, SOCK_STREAM, 0);
190 r = fcntl(listener->fd, F_SETFL, O_NONBLOCK);
191 assert(r >= 0);
193 int flags = 1;
194 r = setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
195 assert(r >= 0);
197 listener->loop = loop;
198 listener->open = FALSE;
200 return listener;
203 void tcp_listener_free(tcp_listener *listener)
205 tcp_listener_close(listener);
206 free(listener);
208 g_debug("tcp listener freed.");
211 void tcp_listener_close(tcp_listener *listener)
213 int i;
215 g_debug("close listener\n");
216 assert(listener->open);
218 tcp_peer *peer;
219 for(i=0; i < TCP_MAX_PEERS; i++)
220 tcp_peer_close(peer);
222 if(listener->port_s) {
223 free(listener->port_s);
224 listener->port_s = NULL;
226 if(listener->dns_info) {
227 free(listener->dns_info);
228 listener->dns_info = NULL;
230 if(listener->accept_watcher) {
231 printf("killing accept watcher\n");
232 ev_io_stop(listener->loop, listener->accept_watcher);
233 free(listener->accept_watcher);
234 listener->accept_watcher = NULL;
237 close(listener->fd);
238 listener->open = FALSE;
241 void tcp_listener_accept( struct ev_loop *loop
242 , struct ev_io *watcher
243 , int revents
246 tcp_listener *listener = (tcp_listener*)(watcher->data);
247 tcp_peer *peer;
249 assert(listener->open);
250 assert(listener->loop == loop);
252 // check for error in revents
253 if(EV_ERROR & revents) {
254 tcp_error("tcp_peer_on_readable() got error event, closing free");
255 tcp_listener_free(listener);
256 return;
259 peer = tcp_peer_new(listener);
261 if(listener->accept_cb != NULL && peer)
262 listener->accept_cb(peer, listener->accept_cb_data);
264 return;
267 void tcp_listener_listen ( tcp_listener *listener
268 , char *address
269 , int port
270 , tcp_listener_accept_cb_t accept_cb
271 , void *accept_cb_data
274 int r;
276 listener->sockaddr.sin_family = AF_INET;
277 listener->sockaddr.sin_port = htons(port);
279 /* for easy access to the port */
280 listener->port_s = malloc(sizeof(char)*8);
281 sprintf(listener->port_s, "%d", port);
283 listener->dns_info = gethostbyname(address);
284 if (!(listener->dns_info && listener->dns_info->h_addr)) {
285 tcp_error("Could not look up hostname %s", address);
286 goto error;
288 memmove(&(listener->sockaddr.sin_addr), listener->dns_info->h_addr, sizeof(struct in_addr));
290 /* Other socket options. These could probably be fine tuned.
291 * SO_SNDBUF set buffer size for output
292 * SO_RCVBUF set buffer size for input
293 * SO_SNDLOWAT set minimum count for output
294 * SO_RCVLOWAT set minimum count for input
295 * SO_SNDTIMEO set timeout value for output
296 * SO_RCVTIMEO set timeout value for input
298 r = bind(listener->fd, (struct sockaddr*)&(listener->sockaddr), sizeof(listener->sockaddr));
299 if(r < 0) {
300 tcp_error("Failed to bind to %s %d", address, port);
301 goto error;
304 r = listen(listener->fd, TCP_MAX_PEERS);
305 if(r < 0) {
306 tcp_error("listen() failed");
307 goto error;
310 assert(listener->open == FALSE);
311 listener->open = TRUE;
313 listener->accept_watcher = g_new0(struct ev_io, 1);
314 listener->accept_watcher->data = listener;
315 listener->accept_cb = accept_cb;
316 listener->accept_cb_data = accept_cb_data;
318 ev_init (listener->accept_watcher, tcp_listener_accept);
319 ev_io_set (listener->accept_watcher, listener->fd, EV_READ | EV_ERROR);
320 ev_io_start (listener->loop, listener->accept_watcher);
322 return;
323 error:
324 tcp_listener_close(listener);
325 return;
328 char* tcp_listener_address(tcp_listener *listener)
330 if(listener->dns_info)
331 return listener->dns_info->h_name;
332 else
333 return NULL;