1 /* Evented TCP listener
2 * Copyright (c) 2007 Ry Dahl <ry.d4hl@gmail.com>
3 * This software is released under the "MIT License". See README file for details.
23 /* Private function */
24 void tcp_peer_stop_read_watcher(tcp_peer
*peer
);
26 /* Returns the number of bytes remaining to write */
27 int tcp_peer_write(tcp_peer
*peer
, const char *data
, int length
)
30 //tcp_warning("Trying to write to a peer that isn't open.");
34 int sent
= send(peer
->fd
, data
, length
, 0);
36 tcp_warning("Error writing: %s", strerror(errno
));
40 ev_timer_again(peer
->parent
->loop
, &(peer
->timeout_watcher
));
45 void tcp_peer_on_timeout( struct ev_loop
*loop
46 , struct ev_timer
*watcher
50 tcp_peer
*peer
= (tcp_peer
*)(watcher
->data
);
52 assert(peer
->parent
->loop
== loop
);
53 assert(&(peer
->timeout_watcher
) == watcher
);
56 tcp_info("peer timed out");
59 void tcp_peer_on_readable( struct ev_loop
*loop
60 , struct ev_io
*watcher
64 tcp_peer
*peer
= (tcp_peer
*)(watcher
->data
);
67 // check for error in revents
68 if(EV_ERROR
& revents
) {
69 tcp_error("tcp_peer_on_readable() got error event, closing peer");
74 assert(peer
->parent
->open
);
75 assert(peer
->parent
->loop
== loop
);
76 assert(&(peer
->read_watcher
) == watcher
);
78 if(peer
->read_cb
== NULL
) return;
80 length
= recv(peer
->fd
, peer
->read_buffer
, TCP_CHUNKSIZE
, 0);
83 //g_debug("zero length read? what to do?");
85 //tcp_peer_stop_read_watcher(peer);
87 } else if(length
< 0) {
88 if(errno
== EBADF
|| errno
== ECONNRESET
)
89 g_debug("errno says Connection reset by peer");
91 tcp_error("Error recving data: %s", strerror(errno
));
95 ev_timer_again(loop
, &(peer
->timeout_watcher
));
96 // g_debug("Read %d bytes", length);
98 peer
->read_cb(peer
->read_buffer
, length
, peer
->read_cb_data
);
99 /* Cannot access peer beyond this point because it's possible that the
104 tcp_peer_close(peer
);
107 tcp_peer
* tcp_peer_new(tcp_listener
*listener
)
110 tcp_peer
*peer
= NULL
;
113 /* Get next availible peer */
114 for(i
=0; i
< TCP_MAX_PEERS
; i
++)
115 if(!listener
->peers
[i
].open
) {
116 peer
= &(listener
->peers
[i
]);
121 /* refuse connections */
122 g_error("Too many peers");
127 peer
->open
= TRUE
; /* set open ASAP */
131 // for(i = 0; i < TCP_MAX_PEERS; i++)
132 // if(listener->peers[i].open) count += 1;
133 // tcp_info("%d open connections", count);
136 peer
->parent
= listener
;
138 peer
->fd
= accept(listener
->fd
, (struct sockaddr
*)&(peer
->sockaddr
), &len
);
140 tcp_error("Could not get peer socket");
145 // int r = fcntl(peer->fd, F_SETFL, O_NONBLOCK);
147 // tcp_error("Setting nonblock mode on socket failed");
151 peer
->read_watcher
.data
= peer
;
152 ev_init(&(peer
->read_watcher
), tcp_peer_on_readable
);
153 ev_io_set(&(peer
->read_watcher
), peer
->fd
, EV_READ
| EV_ERROR
);
154 ev_io_start(listener
->loop
, &(peer
->read_watcher
));
156 peer
->timeout_watcher
.data
= peer
;
157 ev_timer_init(&(peer
->timeout_watcher
), tcp_peer_on_timeout
, TCP_TIMEOUT
, TCP_TIMEOUT
);
158 ev_timer_start(listener
->loop
, &(peer
->timeout_watcher
));
163 tcp_peer_close(peer
);
167 void tcp_peer_stop_read_watcher(tcp_peer
*peer
)
169 //assert(peer->open);
170 assert(peer
->parent
->open
);
171 //assert(peer->read_watcher);
172 ev_io_stop(peer
->parent
->loop
, &(peer
->read_watcher
));
173 //g_debug("killing read watcher");
177 void tcp_peer_close(tcp_peer
*peer
)
180 ev_io_stop(peer
->parent
->loop
, &(peer
->read_watcher
));
181 ev_timer_stop(peer
->parent
->loop
, &(peer
->timeout_watcher
));
187 tcp_listener
* tcp_listener_new(struct ev_loop
*loop
)
191 /* note the following will automatically set all of the peer structures
192 * to have peer->open == FALSE */
193 tcp_listener
*listener
= g_new0(tcp_listener
, 1);
195 listener
->fd
= socket(PF_INET
, SOCK_STREAM
, 0);
196 r
= fcntl(listener
->fd
, F_SETFL
, O_NONBLOCK
);
200 r
= setsockopt(listener
->fd
, SOL_SOCKET
, SO_REUSEADDR
, &flags
, sizeof(flags
));
203 listener
->loop
= loop
;
204 listener
->open
= FALSE
;
207 struct sigaction sigact
;
208 sigact
.sa_handler
= SIG_IGN
;
209 sigemptyset(&sigact
.sa_mask
);
211 r
= sigaction(SIGPIPE
, &sigact
, 0);
217 tcp_listener_free(listener
);
221 void tcp_listener_free(tcp_listener
*listener
)
223 tcp_listener_close(listener
);
226 g_debug("tcp listener freed.");
229 void tcp_listener_close(tcp_listener
*listener
)
233 g_debug("close listener\n");
234 assert(listener
->open
);
237 for(i
=0; i
< TCP_MAX_PEERS
; i
++)
238 tcp_peer_close(peer
);
240 if(listener
->port_s
) {
241 free(listener
->port_s
);
242 listener
->port_s
= NULL
;
244 if(listener
->dns_info
) {
245 free(listener
->dns_info
);
246 listener
->dns_info
= NULL
;
248 if(listener
->accept_watcher
) {
249 printf("killing accept watcher\n");
250 ev_io_stop(listener
->loop
, listener
->accept_watcher
);
251 free(listener
->accept_watcher
);
252 listener
->accept_watcher
= NULL
;
256 listener
->open
= FALSE
;
259 void tcp_listener_accept( struct ev_loop
*loop
260 , struct ev_io
*watcher
264 tcp_listener
*listener
= (tcp_listener
*)(watcher
->data
);
267 assert(listener
->open
);
268 assert(listener
->loop
== loop
);
270 // check for error in revents
271 if(EV_ERROR
& revents
) {
272 tcp_error("tcp_peer_on_readable() got error event, closing free");
273 tcp_listener_free(listener
);
277 peer
= tcp_peer_new(listener
);
279 if(listener
->accept_cb
!= NULL
)
280 listener
->accept_cb(peer
, listener
->accept_cb_data
);
285 void tcp_listener_listen ( tcp_listener
*listener
288 , tcp_listener_accept_cb_t accept_cb
289 , void *accept_cb_data
294 listener
->sockaddr
.sin_family
= AF_INET
;
295 listener
->sockaddr
.sin_port
= htons(port
);
297 /* for easy access to the port */
298 listener
->port_s
= malloc(sizeof(char)*8);
299 sprintf(listener
->port_s
, "%d", port
);
301 listener
->dns_info
= gethostbyname(address
);
302 if (!(listener
->dns_info
&& listener
->dns_info
->h_addr
)) {
303 tcp_error("Could not look up hostname %s", address
);
306 memmove(&(listener
->sockaddr
.sin_addr
), listener
->dns_info
->h_addr
, sizeof(struct in_addr
));
308 /* Other socket options. These could probably be fine tuned.
309 * SO_SNDBUF set buffer size for output
310 * SO_RCVBUF set buffer size for input
311 * SO_SNDLOWAT set minimum count for output
312 * SO_RCVLOWAT set minimum count for input
313 * SO_SNDTIMEO set timeout value for output
314 * SO_RCVTIMEO set timeout value for input
316 r
= bind(listener
->fd
, (struct sockaddr
*)&(listener
->sockaddr
), sizeof(listener
->sockaddr
));
318 tcp_error("Failed to bind to %s %d", address
, port
);
322 r
= listen(listener
->fd
, TCP_MAX_PEERS
);
324 tcp_error("listen() failed");
328 assert(listener
->open
== FALSE
);
329 listener
->open
= TRUE
;
331 listener
->accept_watcher
= g_new0(struct ev_io
, 1);
332 listener
->accept_watcher
->data
= listener
;
333 listener
->accept_cb
= accept_cb
;
334 listener
->accept_cb_data
= accept_cb_data
;
336 ev_init (listener
->accept_watcher
, tcp_listener_accept
);
337 ev_io_set (listener
->accept_watcher
, listener
->fd
, EV_READ
| EV_ERROR
);
338 ev_io_start (listener
->loop
, listener
->accept_watcher
);
342 tcp_listener_close(listener
);
346 char* tcp_listener_address(tcp_listener
*listener
)
348 if(listener
->dns_info
)
349 return listener
->dns_info
->h_name
;