1 /* Evented TCP listener
2 * Copyright (c) 2007 Ry Dahl <ry.d4hl@gmail.com>
3 * This software is released under the "MIT License". See README file for details.
23 /* Private function */
24 void tcp_peer_stop_read_watcher(tcp_peer
*peer
);
26 /* Returns the number of bytes remaining to write */
27 int tcp_peer_write(tcp_peer
*peer
, const char *data
, int length
)
30 //tcp_warning("Trying to write to a peer that isn't open.");
34 int sent
= send(peer
->fd
, data
, length
, MSG_HAVEMORE
);
36 //tcp_warning("Error writing: %s", strerror(errno));
40 ev_timer_again(peer
->parent
->loop
, &(peer
->timeout_watcher
));
45 void tcp_peer_on_timeout( struct ev_loop
*loop
46 , struct ev_timer
*watcher
50 tcp_peer
*peer
= (tcp_peer
*)(watcher
->data
);
52 assert(peer
->parent
->loop
== loop
);
53 assert(&(peer
->timeout_watcher
) == watcher
);
56 tcp_info("peer timed out");
59 void tcp_peer_on_readable( struct ev_loop
*loop
60 , struct ev_io
*watcher
64 tcp_peer
*peer
= (tcp_peer
*)(watcher
->data
);
67 // check for error in revents
68 if(EV_ERROR
& revents
) {
69 tcp_error("tcp_peer_on_readable() got error event, closing peer");
74 assert(peer
->parent
->open
);
75 assert(peer
->parent
->loop
== loop
);
76 assert(&(peer
->read_watcher
) == watcher
);
78 if(peer
->read_cb
== NULL
) return;
80 length
= recv(peer
->fd
, peer
->read_buffer
, TCP_CHUNKSIZE
, 0);
83 //g_debug("zero length read? what to do?");
85 //tcp_peer_stop_read_watcher(peer);
87 } else if(length
< 0) {
88 if(errno
== EBADF
|| errno
== ECONNRESET
)
89 g_debug("errno says Connection reset by peer");
91 tcp_error("Error recving data: %s", strerror(errno
));
95 ev_timer_again(loop
, &(peer
->timeout_watcher
));
96 // g_debug("Read %d bytes", length);
98 peer
->read_cb(peer
->read_buffer
, length
, peer
->read_cb_data
);
99 /* Cannot access peer beyond this point because it's possible that the
104 tcp_peer_close(peer
);
107 tcp_peer
* tcp_peer_new(tcp_listener
*listener
)
110 tcp_peer
*peer
= NULL
;
113 /* Get next availible peer */
114 for(i
=0; i
< TCP_MAX_PEERS
; i
++)
115 if(!listener
->peers
[i
].open
) {
116 peer
= &(listener
->peers
[i
]);
121 g_message("Too many peers. Refusing connections.");
125 peer
->open
= TRUE
; /* set open ASAP */
129 // for(i = 0; i < TCP_MAX_PEERS; i++)
130 // if(listener->peers[i].open) count += 1;
131 // tcp_info("%d open connections", count);
133 peer
->parent
= listener
;
135 peer
->fd
= accept(listener
->fd
, (struct sockaddr
*)&(peer
->sockaddr
), &len
);
137 tcp_error("Could not get peer socket");
142 int r
= fcntl(peer
->fd
, F_SETFL
, O_NONBLOCK
);
145 peer
->read_watcher
.data
= peer
;
146 ev_init(&(peer
->read_watcher
), tcp_peer_on_readable
);
147 ev_io_set(&(peer
->read_watcher
), peer
->fd
, EV_READ
| EV_ERROR
);
148 ev_io_start(listener
->loop
, &(peer
->read_watcher
));
150 peer
->timeout_watcher
.data
= peer
;
151 ev_timer_init(&(peer
->timeout_watcher
), tcp_peer_on_timeout
, TCP_TIMEOUT
, TCP_TIMEOUT
);
152 ev_timer_start(listener
->loop
, &(peer
->timeout_watcher
));
157 tcp_peer_close(peer
);
161 void tcp_peer_stop_read_watcher(tcp_peer
*peer
)
163 //assert(peer->open);
164 assert(peer
->parent
->open
);
165 //assert(peer->read_watcher);
166 ev_io_stop(peer
->parent
->loop
, &(peer
->read_watcher
));
167 //g_debug("killing read watcher");
171 void tcp_peer_close(tcp_peer
*peer
)
174 ev_io_stop(peer
->parent
->loop
, &(peer
->read_watcher
));
175 ev_timer_stop(peer
->parent
->loop
, &(peer
->timeout_watcher
));
181 tcp_listener
* tcp_listener_new(struct ev_loop
*loop
)
185 /* note the following will automatically set all of the peer structures
186 * to have peer->open == FALSE */
187 tcp_listener
*listener
= g_new0(tcp_listener
, 1);
189 listener
->fd
= socket(PF_INET
, SOCK_STREAM
, 0);
190 r
= fcntl(listener
->fd
, F_SETFL
, O_NONBLOCK
);
194 r
= setsockopt(listener
->fd
, SOL_SOCKET
, SO_REUSEADDR
, &flags
, sizeof(flags
));
197 listener
->loop
= loop
;
198 listener
->open
= FALSE
;
203 void tcp_listener_free(tcp_listener
*listener
)
205 tcp_listener_close(listener
);
208 g_debug("tcp listener freed.");
211 void tcp_listener_close(tcp_listener
*listener
)
215 g_debug("close listener\n");
216 assert(listener
->open
);
219 for(i
=0; i
< TCP_MAX_PEERS
; i
++)
220 tcp_peer_close(peer
);
222 if(listener
->port_s
) {
223 free(listener
->port_s
);
224 listener
->port_s
= NULL
;
226 if(listener
->dns_info
) {
227 free(listener
->dns_info
);
228 listener
->dns_info
= NULL
;
230 if(listener
->accept_watcher
) {
231 printf("killing accept watcher\n");
232 ev_io_stop(listener
->loop
, listener
->accept_watcher
);
233 free(listener
->accept_watcher
);
234 listener
->accept_watcher
= NULL
;
238 listener
->open
= FALSE
;
241 void tcp_listener_accept( struct ev_loop
*loop
242 , struct ev_io
*watcher
246 tcp_listener
*listener
= (tcp_listener
*)(watcher
->data
);
249 assert(listener
->open
);
250 assert(listener
->loop
== loop
);
252 // check for error in revents
253 if(EV_ERROR
& revents
) {
254 tcp_error("tcp_peer_on_readable() got error event, closing free");
255 tcp_listener_free(listener
);
259 peer
= tcp_peer_new(listener
);
261 if(listener
->accept_cb
!= NULL
&& peer
)
262 listener
->accept_cb(peer
, listener
->accept_cb_data
);
267 void tcp_listener_listen ( tcp_listener
*listener
270 , tcp_listener_accept_cb_t accept_cb
271 , void *accept_cb_data
276 listener
->sockaddr
.sin_family
= AF_INET
;
277 listener
->sockaddr
.sin_port
= htons(port
);
279 /* for easy access to the port */
280 listener
->port_s
= malloc(sizeof(char)*8);
281 sprintf(listener
->port_s
, "%d", port
);
283 listener
->dns_info
= gethostbyname(address
);
284 if (!(listener
->dns_info
&& listener
->dns_info
->h_addr
)) {
285 tcp_error("Could not look up hostname %s", address
);
288 memmove(&(listener
->sockaddr
.sin_addr
), listener
->dns_info
->h_addr
, sizeof(struct in_addr
));
290 /* Other socket options. These could probably be fine tuned.
291 * SO_SNDBUF set buffer size for output
292 * SO_RCVBUF set buffer size for input
293 * SO_SNDLOWAT set minimum count for output
294 * SO_RCVLOWAT set minimum count for input
295 * SO_SNDTIMEO set timeout value for output
296 * SO_RCVTIMEO set timeout value for input
298 r
= bind(listener
->fd
, (struct sockaddr
*)&(listener
->sockaddr
), sizeof(listener
->sockaddr
));
300 tcp_error("Failed to bind to %s %d", address
, port
);
304 r
= listen(listener
->fd
, TCP_MAX_PEERS
);
306 tcp_error("listen() failed");
310 assert(listener
->open
== FALSE
);
311 listener
->open
= TRUE
;
313 listener
->accept_watcher
= g_new0(struct ev_io
, 1);
314 listener
->accept_watcher
->data
= listener
;
315 listener
->accept_cb
= accept_cb
;
316 listener
->accept_cb_data
= accept_cb_data
;
318 ev_init (listener
->accept_watcher
, tcp_listener_accept
);
319 ev_io_set (listener
->accept_watcher
, listener
->fd
, EV_READ
| EV_ERROR
);
320 ev_io_start (listener
->loop
, listener
->accept_watcher
);
324 tcp_listener_close(listener
);
328 char* tcp_listener_address(tcp_listener
*listener
)
330 if(listener
->dns_info
)
331 return listener
->dns_info
->h_name
;