4 #include <event2/event.h>
5 #include <event2/listener.h>
6 #include <event2/buffer.h>
16 SERVER_STATE_CONNECTING
,
17 SERVER_STATE_CONNECTED
,
18 SERVER_STATE_REQUEST_SENT
,
23 TAILQ_ENTRY(server
) next
;
24 enum server_state state
;
28 struct http_conn
*conn
;
29 struct client
*client
;
31 TAILQ_HEAD(server_list
, server
);
36 CLIENT_STATE_DISCARD_INPUT
,
41 enum client_state state
;
42 struct http_request_list requests
;
44 struct http_conn
*conn
;
45 struct server
*server
;
48 static void on_client_error(struct http_conn
*, enum http_conn_error
, void *);
49 static void on_client_request(struct http_conn
*, struct http_request
*, void *);
50 static void on_client_read_body(struct http_conn
*, struct evbuffer
*, void *);
51 static void on_client_msg_complete(struct http_conn
*, void *);
52 static void on_client_write_more(struct http_conn
*, void *);
53 static void on_client_flush(struct http_conn
*, void *);
55 static void on_server_connected(struct http_conn
*, void *);
56 static void on_server_error(struct http_conn
*, enum http_conn_error
, void *);
57 static void on_server_continuation(struct http_conn
*, void *);
58 static void on_server_response(struct http_conn
*, struct http_response
*, void *);
59 static void on_server_read_body(struct http_conn
*, struct evbuffer
*, void *);
60 static void on_server_msg_complete(struct http_conn
*, void *);
61 static void on_server_write_more(struct http_conn
*, void *);
62 static void on_server_flush(struct http_conn
*, void *);
64 static const struct http_cbs client_methods
= {
71 on_client_msg_complete
,
76 static const struct http_cbs server_methods
= {
80 on_server_continuation
,
83 on_server_msg_complete
,
88 static struct event_base
*proxy_event_base
;
89 static struct evdns_base
*proxy_evdns_base
;
90 static struct evconnlistener
*listener
= NULL
;
91 static struct server_list idle_servers
;
92 static size_t max_pending_requests
= 8;
94 static struct server
*
95 server_new(const char *host
, int port
, struct client
*client
)
97 struct server
*server
;
99 server
= mem_calloc(1, sizeof(*server
));
100 server
->host
= mem_strdup(host
);
102 server
->client
= client
;
103 server
->conn
= http_conn_new(proxy_event_base
, -1, HTTP_SERVER
,
104 &server_methods
, server
);
105 log_debug("proxy: new server: %p, %s:%d",
106 server
, server
->host
, server
->port
);
112 server_match(const struct server
*server
, const char *host
, int port
)
114 return (!evutil_ascii_strcasecmp(server
->host
, host
) &&
115 server
->port
== port
);
119 server_free(struct server
*server
)
126 log_debug("proxy: freeing server: %p, %s:%d",
127 server
, server
->host
, server
->port
);
129 TAILQ_FOREACH(tmp
, &idle_servers
, next
) {
131 log_fatal("proxy: idle server %p still queued!",
135 mem_free(server
->host
);
136 http_conn_free(server
->conn
);
141 server_connect(struct server
*server
)
143 server
->state
= SERVER_STATE_CONNECTING
;
144 log_debug("proxy: server %p, %s:%d connecting",
145 server
, server
->host
, server
->port
);
146 // XXX AF_UNSPEC seems to cause crashes w/ IPv6 queries
147 return http_conn_connect(server
->conn
, proxy_evdns_base
, AF_INET
,
148 server
->host
, server
->port
);
151 static struct client
*
152 client_new(evutil_socket_t sock
)
154 struct client
*client
;
156 client
= mem_calloc(1, sizeof(*client
));
157 TAILQ_INIT(&client
->requests
);
158 client
->conn
= http_conn_new(proxy_event_base
, sock
, HTTP_CLIENT
,
159 &client_methods
, client
);
161 log_debug("proxy: new client %p", client
);
167 client_free(struct client
*client
)
169 struct http_request
*req
;
174 log_debug("proxy: freeing client: %p", client
);
176 while ((req
= TAILQ_FIRST(&client
->requests
))) {
177 TAILQ_REMOVE(&client
->requests
, req
, next
);
178 http_request_free(req
);
181 server_free(client
->server
);
182 http_conn_free(client
->conn
);
187 client_scrub_request(struct client
*client
, struct http_request
*req
)
189 if (req
->meth
== METH_CONNECT
) {
190 assert(req
->url
->host
&& req
->url
->port
>= 1);
191 // XXX we could filter host/port here
195 if (!req
->url
->host
) {
196 http_conn_send_error(client
->conn
, 403, "Forbidden");
199 if (evutil_ascii_strcasecmp(req
->url
->scheme
, "http")) {
200 http_conn_send_error(client
->conn
, 400, "Invalid URL");
204 if (req
->url
->port
< 0)
207 if (!headers_has_key(req
->headers
, "Host")) {
209 size_t len
= strlen(req
->url
->host
) + 6;
210 host
= mem_calloc(1, len
);
211 evutil_snprintf(host
, len
, "%s:%d", req
->url
->host
,
213 headers_add_key_val(req
->headers
, "Host", host
);
217 // XXX remove proxy auth msgs?
222 http_request_free(req
);
227 client_disassociate_server(struct client
*client
)
232 if (http_conn_is_persistent(client
->server
->conn
)) {
233 assert(client
->server
->state
== SERVER_STATE_IDLE
);
234 TAILQ_INSERT_TAIL(&idle_servers
, client
->server
, next
);
235 client
->server
->client
= NULL
;
236 client
->server
= NULL
;
238 server_free(client
->server
);
239 client
->server
= NULL
;
243 /* find a server to handle our current req */
245 client_associate_server(struct client
*client
)
249 struct http_request
*req
;
251 req
= TAILQ_FIRST(&client
->requests
);
256 assert(url
&& url
->host
!= NULL
&& url
->port
> 0);
258 /* should we remove our current server? */
259 if (client
->server
&&
260 (!server_match(client
->server
, url
->host
, url
->port
) ||
261 req
->meth
== METH_CONNECT
)) {
262 client_disassociate_server(client
);
265 /* nothing more to do here... */
266 if (req
->meth
== METH_CONNECT
)
269 /* try to find an idle server */
270 TAILQ_FOREACH(it
, &idle_servers
, next
) {
271 if (server_match(it
, url
->host
, url
->port
)) {
272 TAILQ_REMOVE(&idle_servers
, it
, next
);
273 assert(it
->client
== NULL
);
276 log_debug("proxy: idle server %p, %s:%d associated to "
277 "client %p", it
, it
->host
, it
->port
, client
);
282 /* we didn't find one. lets setup a new one. */
283 client
->server
= server_new(url
->host
, url
->port
, client
);
285 return server_connect(client
->server
);
289 client_start_reading_request_body(struct client
*client
, int on_continue
)
291 assert(client
->server
!= NULL
);
293 /* should we wait for the server to send 100 continue? */
294 if (!on_continue
&& http_conn_expect_continue(client
->conn
))
297 if (http_conn_current_message_has_body(client
->conn
) &&
298 client
->nrequests
== 1) {
299 http_conn_write_continue(client
->conn
);
300 http_conn_set_output_encoding(client
->server
->conn
,
301 http_conn_get_current_message_body_encoding(client
->conn
));
302 http_conn_start_reading(client
->conn
);
306 /* returns 1 when there's a request we can dispatch with the associated
309 client_dispatch_request(struct client
*client
)
311 struct http_request
*req
;
312 struct server
*server
= client
->server
;
314 req
= TAILQ_FIRST(&client
->requests
);
318 if (req
->meth
== METH_CONNECT
) {
319 assert(server
== NULL
);
320 http_conn_start_tunnel(client
->conn
, proxy_evdns_base
, AF_INET
,
321 req
->url
->host
, req
->url
->port
);
325 assert(server
!= NULL
);
326 if (server
->state
== SERVER_STATE_REQUEST_SENT
||
327 server
->state
< SERVER_STATE_CONNECTED
)
330 /* it might be nice to support pipelining... */
331 if (server_match(server
, req
->url
->host
, req
->url
->port
)) {
332 log_debug("proxy: writing %s request from client %p to "
334 http_method_to_string(req
->meth
),
335 server
->client
, server
,
336 server
->host
, server
->port
);
337 http_conn_write_request(server
->conn
, req
);
338 // XXX we may want to wait for 100-continue
339 client_start_reading_request_body(client
, 0);
340 server
->state
= SERVER_STATE_REQUEST_SENT
;
348 client_discard_input(struct client
*client
)
350 if (http_conn_current_message_has_body(client
->conn
)) {
351 log_debug("proxy: will discard client msg body");
352 http_conn_disable_persistence(client
->conn
);
353 client
->state
= CLIENT_STATE_DISCARD_INPUT
;
358 client_close_on_flush(struct client
*client
)
360 log_debug("proxy: will close client on flush");
361 client
->state
= CLIENT_STATE_CLOSING
;
362 http_conn_disable_persistence(client
->conn
);
363 http_conn_stop_reading(client
->conn
);
364 http_conn_flush(client
->conn
);
368 client_write_response(struct client
*client
, struct http_response
*resp
)
370 struct http_request
*req
;
372 req
= TAILQ_FIRST(&client
->requests
);
375 log_debug("proxy: got response for %s from %p, %s:%d: %s %d",
376 http_method_to_string(req
->meth
),
377 client
->server
, client
->server
->host
, client
->server
->port
,
378 http_version_to_string(resp
->vers
), resp
->code
);
380 if (HTTP_ERROR_RESPONSE(resp
->code
))
381 client_discard_input(client
);
383 if (req
->meth
== METH_HEAD
)
384 http_conn_set_current_message_bodyless(client
->server
->conn
);
386 http_conn_set_output_encoding(client
->conn
, TE_IDENTITY
);
387 if (http_conn_current_message_has_body(client
->server
->conn
) &&
388 http_conn_is_persistent(client
->conn
) &&
389 http_conn_get_current_message_body_length(client
->server
->conn
) < 0)
390 http_conn_set_output_encoding(client
->conn
, TE_CHUNKED
);
392 http_conn_write_response(client
->conn
, resp
);
396 client_request_serviced(struct client
*client
)
398 struct http_request
*req
;
400 req
= TAILQ_FIRST(&client
->requests
);
401 assert(req
&& client
->nrequests
> 0);
402 log_debug("proxy: request for client %p, %s %s serviced",
403 client
, http_method_to_string(req
->meth
),
404 http_version_to_string(req
->vers
));
405 TAILQ_REMOVE(&client
->requests
, req
, next
);
406 http_request_free(req
);
409 if (client
->state
== CLIENT_STATE_ACTIVE
) {
411 client
->server
->state
= SERVER_STATE_IDLE
;
412 if (client
->nrequests
) {
413 client_associate_server(client
);
414 client_dispatch_request(client
);
416 client_disassociate_server(client
);
418 if (!http_conn_is_persistent(client
->conn
)) {
419 client_close_on_flush(client
);
420 } else if (!http_conn_current_message_has_body(client
->conn
) &&
421 client
->nrequests
< max_pending_requests
) {
422 http_conn_start_reading(client
->conn
);
428 client_notice_server_failed(struct client
*client
, const char *msg
)
430 struct http_request
*req
;
431 struct server
*server
= client
->server
;
433 client
->server
= NULL
;
435 while ((req
= TAILQ_FIRST(&client
->requests
))) {
436 if (evutil_ascii_strcasecmp(req
->url
->host
, server
->host
) ||
437 req
->url
->port
!= server
->port
)
439 http_conn_send_error(client
->conn
, 502, "%s", msg
);
440 client_request_serviced(client
);
444 /* http event slots */
447 on_client_error(struct http_conn
*conn
, enum http_conn_error err
, void *arg
)
449 struct client
*client
= arg
;
452 case ERROR_CONNECT_FAILED
:
453 case ERROR_IDLE_CONN_TIMEDOUT
:
454 log_info("proxy: closing idle client connection.");
457 case ERROR_CLIENT_EXPECTATION_FAILED
:
458 client_discard_input(client
);
459 http_conn_send_error(conn
, 417, "Expectation failed");
461 case ERROR_TUNNEL_CONNECT_FAILED
:
462 // XXX need a better msg here
463 http_conn_send_error(conn
, 504,
464 "Connection failed");
465 client_request_serviced(client
);
467 case ERROR_HEADER_PARSE_FAILED
:
468 client_close_on_flush(client
);
469 http_conn_send_error(conn
, 400,
470 "Couldn't parse client request");
472 case ERROR_CLIENT_POST_WITHOUT_LENGTH
:
473 client_close_on_flush(client
);
474 http_conn_send_error(conn
, 400,
475 "POST or PUT request without length");
477 case ERROR_CHUNK_PARSE_FAILED
:
478 client_close_on_flush(client
);
479 http_conn_send_error(conn
, 400,
480 "Chunk parse failed");
482 case ERROR_TUNNEL_CLOSED
:
483 log_debug("proxy: tunnel closed.");
486 case ERROR_INCOMPLETE_HEADERS
:
487 case ERROR_INCOMPLETE_BODY
:
488 case ERROR_WRITE_FAILED
:
490 log_warn("proxy: client error: %s",
491 http_conn_error_to_string(err
));
498 on_client_request(struct http_conn
*conn
, struct http_request
*req
, void *arg
)
500 struct client
*client
= arg
;
502 assert(client
->state
== CLIENT_STATE_ACTIVE
);
504 if (client_scrub_request(client
, req
) < 0)
507 TAILQ_INSERT_TAIL(&client
->requests
, req
, next
);
508 if (++client
->nrequests
> max_pending_requests
||
509 http_conn_current_message_has_body(conn
))
510 http_conn_stop_reading(conn
);
512 log_debug("proxy: new %s request for %s:%d (pipeline %u)",
513 http_method_to_string(req
->meth
),
514 req
->url
->host
, req
->url
->port
,
515 (unsigned)client
->nrequests
);
517 if (req
->meth
== METH_CONNECT
)
518 client
->state
= CLIENT_STATE_TUNNEL
;
520 if (!client
->server
&& client_associate_server(client
) < 0)
523 client_dispatch_request(client
);
527 on_client_read_body(struct http_conn
*conn
, struct evbuffer
*buf
, void *arg
)
529 struct client
*client
= arg
;
531 if (client
->state
== CLIENT_STATE_DISCARD_INPUT
)
532 evbuffer_drain(buf
, -1);
533 else if (!http_conn_write_buf(client
->server
->conn
, buf
))
534 http_conn_stop_reading(conn
);
538 on_client_msg_complete(struct http_conn
*conn
, void *arg
)
540 struct client
*client
= arg
;
542 log_debug("proxy: finished reading client's message");
544 if (client
->state
== CLIENT_STATE_DISCARD_INPUT
)
545 client_close_on_flush(client
);
546 else if (http_conn_current_message_has_body(conn
))
547 http_conn_write_finished(client
->server
->conn
);
552 on_client_write_more(struct http_conn
*conn
, void *arg
)
554 struct client
*client
= arg
;
556 http_conn_start_reading(client
->server
->conn
);
560 on_client_flush(struct http_conn
*conn
, void *arg
)
562 struct client
*client
= arg
;
564 if (client
->state
== CLIENT_STATE_CLOSING
)
569 on_server_connected(struct http_conn
*conn
, void *arg
)
571 struct server
*server
= arg
;
573 assert(server
->state
== SERVER_STATE_CONNECTING
);
574 server
->state
= SERVER_STATE_CONNECTED
;
575 log_debug("proxy: server %p, %s:%d finished connecting",
576 server
, server
->host
, server
->port
);
577 client_dispatch_request(server
->client
);
581 on_server_error(struct http_conn
*conn
, enum http_conn_error err
, void *arg
)
583 struct server
*server
= arg
;
586 switch (server
->state
) {
587 case SERVER_STATE_CONNECTING
:
588 case SERVER_STATE_CONNECTED
:
589 case SERVER_STATE_REQUEST_SENT
:
590 // XXX if we haven't serviced any reqs on this server yet,
591 // we should try resending the first request
592 if (err
== ERROR_CONNECT_FAILED
) {
593 assert(server
->state
== SERVER_STATE_CONNECTING
);
594 msg
= conn_get_connect_error();
595 log_error("proxy: connection to %s:%d failed: %s",
596 log_scrub(server
->host
), server
->port
, msg
);
598 msg
= http_conn_error_to_string(err
);
599 log_error("proxy: error while communicating with "
600 "%s:%d: %s", log_scrub(server
->host
),
603 assert(server
->client
!= NULL
);
604 client_notice_server_failed(server
->client
, msg
);
606 case SERVER_STATE_IDLE
:
607 assert(server
->client
== NULL
);
608 TAILQ_REMOVE(&idle_servers
, server
, next
);
609 log_debug("proxy: idle server connection %p, %s:%d closed",
610 server
, server
->host
, server
->port
);
613 log_fatal("proxy: error cb called in invalid state");
620 on_server_continuation(struct http_conn
*conn
, void *arg
)
622 struct server
*server
= arg
;
624 log_debug("proxy: got 100 continue from server");
625 client_start_reading_request_body(server
->client
, 1);
629 on_server_response(struct http_conn
*conn
, struct http_response
*resp
,
632 struct server
*server
= arg
;
634 // XXX we should probably disable persistence on the server's
635 // connection if it sends an error response while we're sending
637 client_write_response(server
->client
, resp
);
639 if (http_conn_current_message_has_body(conn
))
640 log_debug("proxy: will copy body from server %p to client %p",
641 server
, server
->client
);
643 http_response_free(resp
);
647 on_server_read_body(struct http_conn
*conn
, struct evbuffer
*buf
, void *arg
)
649 struct server
*server
= arg
;
651 if (!http_conn_write_buf(server
->client
->conn
, buf
))
652 http_conn_stop_reading(conn
);
656 on_server_msg_complete(struct http_conn
*conn
, void *arg
)
658 struct server
*server
= arg
;
660 if (http_conn_current_message_has_body(conn
))
661 http_conn_write_finished(server
->client
->conn
);
662 client_request_serviced(server
->client
);
666 on_server_write_more(struct http_conn
*conn
, void *arg
)
668 struct server
*server
= arg
;
670 http_conn_start_reading(server
->client
->conn
);
674 on_server_flush(struct http_conn
*conn
, void *arg
)
679 client_accept(struct evconnlistener
*ecs
, evutil_socket_t s
,
680 struct sockaddr
*addr
, int len
, void *arg
)
682 struct client
*client
;
684 log_info("proxy: new client connection from %s",
687 client
= client_new(s
);
689 // XXX do we want to keep track of the client obj somehow?
695 proxy_client_set_max_pending_requests(size_t nreqs
)
697 max_pending_requests
= nreqs
;
701 proxy_client_get_max_pending_requests(void)
703 return max_pending_requests
;
707 proxy_init(struct event_base
*base
, struct evdns_base
*dns
,
708 const struct sockaddr
*listen_here
, int socklen
)
710 struct evconnlistener
*lcs
= NULL
;
712 TAILQ_INIT(&idle_servers
);
714 lcs
= evconnlistener_new_bind(base
, client_accept
, NULL
,
715 LEV_OPT_CLOSE_ON_FREE
|
717 -1, listen_here
, socklen
);
720 log_socket_error("proxy: couldn't listen on %s",
721 format_addr(listen_here
));
725 log_notice("proxy: listening on %s", format_addr(listen_here
));
728 proxy_event_base
= base
;
729 proxy_evdns_base
= dns
;