Added a timeout watcher to TCP class - probably isn't working properly yet.
[ebb.git] / tcp.c
blobee7705f59baf1489af3ff5e3eabfe096a858f166
1 /* Evented TCP Server
2 * Copyright (c) 2007 Ry Dahl <ry.d4hl@gmail.com>
3 * This software is released under the "MIT License". See README file for details.
4 */
6 /* TODO: add timeouts for clients */
7 #include <unistd.h>
8 #include <fcntl.h>
9 #include <sys/types.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
13 #include <stdio.h>
14 #include <string.h>
15 #include <stdlib.h>
16 #include <errno.h>
18 #include <ev.h>
19 #include <glib.h>
21 #include <assert.h>
23 #include "tcp.h"
25 #define TCP_CHUNKSIZE (16*1024)
27 /* Private function */
28 void tcp_client_stop_read_watcher(tcp_client *client);
30 /* Returns the number of bytes remaining to write */
31 int tcp_client_write(tcp_client *client, const char *data, int length)
33 if(!client->open) {
34 tcp_warning("Trying to write to a client that isn't open.");
35 return 0;
37 assert(client->open);
38 int sent = send(client->fd, data, length, 0);
39 if(sent < 0) {
40 tcp_warning("Error writing: %s", strerror(errno));
41 tcp_client_close(client);
42 return 0;
44 ev_timer_again(client->parent->loop, client->timeout_watcher);
46 return sent;
49 void tcp_client_on_timeout( struct ev_loop *loop
50 , struct ev_timer *watcher
51 , int revents
54 tcp_client *client = (tcp_client*)(watcher->data);
56 assert(client->parent->loop == loop);
57 assert(client->timeout_watcher == watcher);
59 tcp_client_close(client);
60 tcp_info("client timed out");
63 void tcp_client_on_readable( struct ev_loop *loop
64 , struct ev_io *watcher
65 , int revents
68 tcp_client *client = (tcp_client*)(watcher->data);
69 int length;
71 // check for error in revents
72 if(EV_ERROR & revents) {
73 tcp_error("tcp_client_on_readable() got error event, closing client");
74 goto error;
77 assert(client->open);
78 assert(client->parent->open);
79 assert(client->parent->loop == loop);
80 assert(client->read_watcher == watcher);
82 if(client->read_cb == NULL) return;
84 length = recv(client->fd, client->read_buffer, TCP_CHUNKSIZE, 0);
86 if(length == 0) {
87 g_debug("zero length read? what to do? killing read watcher");
88 tcp_client_stop_read_watcher(client);
89 return;
90 } else if(length < 0) {
91 if(errno == EBADF || errno == ECONNRESET)
92 g_debug("errno says Connection reset by peer");
93 else
94 tcp_error("Error recving data: %s", strerror(errno));
95 goto error;
98 ev_timer_again(loop, client->timeout_watcher);
99 // g_debug("Read %d bytes", length);
101 client->read_cb(client->read_buffer, length, client->read_cb_data);
102 /* Cannot access client beyond this point because it's possible that the
103 * user has freed it.
105 return;
106 error:
107 tcp_client_close(client);
110 tcp_client* tcp_client_new(tcp_server *server)
112 socklen_t len;
113 tcp_client *client;
115 client = g_new0(tcp_client, 1);
117 client->parent = server;
119 client->fd = accept(server->fd, (struct sockaddr*)&(client->sockaddr), &len);
120 if(client->fd < 0) {
121 tcp_error("Could not get client socket");
122 goto error;
125 client->open = TRUE;
127 int r = fcntl(client->fd, F_SETFL, O_NONBLOCK);
128 if(r < 0) {
129 tcp_error("Setting nonblock mode on socket failed");
130 goto error;
133 client->read_buffer = (char*)malloc(sizeof(char)*TCP_CHUNKSIZE);
135 client->read_watcher = g_new0(struct ev_io, 1);
136 client->read_watcher->data = client;
137 ev_init(client->read_watcher, tcp_client_on_readable);
138 ev_io_set(client->read_watcher, client->fd, EV_READ | EV_ERROR);
139 ev_io_start(server->loop, client->read_watcher);
141 client->timeout_watcher = g_new0(struct ev_timer, 1);
142 client->timeout_watcher->data = client;
143 ev_timer_init(client->timeout_watcher, tcp_client_on_timeout, 60., 60.);
144 ev_timer_start(server->loop, client->timeout_watcher);
146 return client;
148 error:
149 tcp_client_close(client);
150 return NULL;
153 void tcp_client_stop_read_watcher(tcp_client *client)
155 //assert(client->open);
156 assert(client->parent->open);
157 //assert(client->read_watcher);
159 if(client->read_watcher != NULL) {
160 //g_debug("killing read watcher");
161 ev_io_stop(client->parent->loop, client->read_watcher);
162 free(client->read_watcher);
163 client->read_watcher = NULL;
167 void tcp_client_free(tcp_client *client)
169 tcp_client_close(client);
170 free(client->read_buffer);
171 free(client);
172 //g_debug("tcp client closed");
175 void tcp_client_close(tcp_client *client)
177 if(client->open) {
178 tcp_client_stop_read_watcher(client);
180 ev_timer_stop(client->parent->loop, client->timeout_watcher);
181 free(client->timeout_watcher);
182 client->timeout_watcher = NULL;
184 close(client->fd);
185 client->open = FALSE;
186 //g_debug("tcp client closed");
190 tcp_server* tcp_server_new()
192 int r;
194 tcp_server *server = g_new0(tcp_server, 1);
196 server->fd = socket(PF_INET, SOCK_STREAM, 0);
197 r = fcntl(server->fd, F_SETFL, O_NONBLOCK);
198 if(r < 0) {
199 tcp_error("Setting nonblock mode on socket failed");
200 goto error;
203 int flags = 1;
204 r = setsockopt(server->fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
205 if(r < 0) {
206 tcp_error("failed to set setsock to reuseaddr");
207 goto error;
210 r = setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
211 if(r < 0) {
212 tcp_error("failed to set socket to nodelay");
213 tcp_server_free(server);
214 return NULL;
218 server->loop = ev_loop_new(0);
220 server->clients = g_queue_new();
221 server->open = FALSE;
222 return server;
224 error:
225 tcp_server_free(server);
226 return NULL;
229 void tcp_server_free(tcp_server *server)
231 tcp_server_close(server);
232 g_queue_free(server->clients);
233 free(server);
235 g_debug("tcp server freed.");
238 void tcp_server_close(tcp_server *server)
240 printf("closeserver\n");
241 assert(server->open);
243 tcp_client *client;
244 while((client = g_queue_pop_head(server->clients)))
245 tcp_client_close(client);
247 if(server->port_s) {
248 free(server->port_s);
249 server->port_s = NULL;
251 if(server->dns_info) {
252 free(server->dns_info);
253 server->dns_info = NULL;
255 if(server->accept_watcher) {
256 printf("killing accept watcher\n");
257 ev_io_stop(server->loop, server->accept_watcher);
258 free(server->accept_watcher);
259 server->accept_watcher = NULL;
261 ev_unloop(server->loop, EVUNLOOP_ALL);
262 ev_loop_destroy (server->loop);
263 server->loop = NULL;
265 close(server->fd);
266 server->open = FALSE;
269 void tcp_server_accept( struct ev_loop *loop
270 , struct ev_io *watcher
271 , int revents
274 tcp_server *server = (tcp_server*)(watcher->data);
275 tcp_client *client;
277 assert(server->open);
278 assert(server->loop == loop);
280 // check for error in revents
281 if(EV_ERROR & revents) {
282 tcp_error("tcp_client_on_readable() got error event, closing free");
283 tcp_server_free(server);
284 return;
287 client = tcp_client_new(server);
288 g_queue_push_head(server->clients, (gpointer)client);
290 if(server->accept_cb != NULL)
291 server->accept_cb(client, server->accept_cb_data);
293 return;
296 void tcp_server_listen ( tcp_server *server
297 , char *address
298 , int port
299 , int backlog
300 , tcp_server_accept_cb_t accept_cb
301 , void *accept_cb_data
304 int r;
306 server->sockaddr.sin_family = AF_INET;
307 server->sockaddr.sin_port = htons(port);
309 /* for easy access to the port */
310 server->port_s = malloc(sizeof(char)*8);
311 sprintf(server->port_s, "%d", port);
313 server->dns_info = gethostbyname(address);
314 if (!(server->dns_info && server->dns_info->h_addr)) {
315 tcp_error("Could not look up hostname %s", address);
316 goto error;
318 memmove(&(server->sockaddr.sin_addr), server->dns_info->h_addr, sizeof(struct in_addr));
320 /* Other socket options. These could probably be fine tuned.
321 * SO_SNDBUF set buffer size for output
322 * SO_RCVBUF set buffer size for input
323 * SO_SNDLOWAT set minimum count for output
324 * SO_RCVLOWAT set minimum count for input
325 * SO_SNDTIMEO set timeout value for output
326 * SO_RCVTIMEO set timeout value for input
328 r = bind(server->fd, (struct sockaddr*)&(server->sockaddr), sizeof(server->sockaddr));
329 if(r < 0) {
330 tcp_error("Failed to bind to %s %d", address, port);
331 goto error;
334 r = listen(server->fd, backlog);
335 if(r < 0) {
336 tcp_error("listen() failed");
337 goto error;
340 assert(server->open == FALSE);
341 server->open = TRUE;
343 server->accept_watcher = g_new0(struct ev_io, 1);
344 server->accept_watcher->data = server;
345 server->accept_cb = accept_cb;
346 server->accept_cb_data = accept_cb_data;
348 ev_init (server->accept_watcher, tcp_server_accept);
349 ev_io_set (server->accept_watcher, server->fd, EV_READ | EV_ERROR);
350 ev_io_start (server->loop, server->accept_watcher);
351 ev_loop (server->loop, 0);
352 return;
354 error:
355 tcp_server_close(server);
356 return;
359 char* tcp_server_address(tcp_server *server)
361 if(server->dns_info)
362 return server->dns_info->h_name;
363 else
364 return NULL;