Avoid recursion when processing residual inbuf data.
[shim.git] / proxy.c
blob6765d969a85fd74fae1d861509f603412aafabf5
1 #include <sys/queue.h>
2 #include <assert.h>
3 #include <string.h>
4 #include <event2/event.h>
5 #include <event2/listener.h>
6 #include <event2/buffer.h>
7 #include "proxy.h"
8 #include "conn.h"
9 #include "httpconn.h"
10 #include "util.h"
11 #include "headers.h"
12 #include "log.h"
14 enum server_state {
15 SERVER_STATE_INITIAL,
16 SERVER_STATE_CONNECTING,
17 SERVER_STATE_CONNECTED,
18 SERVER_STATE_REQUEST_SENT,
19 SERVER_STATE_IDLE
22 struct server {
23 TAILQ_ENTRY(server) next;
24 enum server_state state;
25 size_t nserviced;
26 char *host;
27 int port;
28 struct http_conn *conn;
29 struct client *client;
31 TAILQ_HEAD(server_list, server);
33 enum client_state {
34 CLIENT_STATE_ACTIVE,
35 CLIENT_STATE_TUNNEL,
36 CLIENT_STATE_DISCARD_INPUT,
37 CLIENT_STATE_CLOSING
40 struct client {
41 enum client_state state;
42 struct http_request_list requests;
43 size_t nrequests;
44 struct http_conn *conn;
45 struct server *server;
48 static void on_client_error(struct http_conn *, enum http_conn_error, void *);
49 static void on_client_request(struct http_conn *, struct http_request *, void *);
50 static void on_client_read_body(struct http_conn *, struct evbuffer *, void *);
51 static void on_client_msg_complete(struct http_conn *, void *);
52 static void on_client_write_more(struct http_conn *, void *);
53 static void on_client_flush(struct http_conn *, void *);
55 static void on_server_connected(struct http_conn *, void *);
56 static void on_server_error(struct http_conn *, enum http_conn_error, void *);
57 static void on_server_continuation(struct http_conn *, void *);
58 static void on_server_response(struct http_conn *, struct http_response *, void *);
59 static void on_server_read_body(struct http_conn *, struct evbuffer *, void *);
60 static void on_server_msg_complete(struct http_conn *, void *);
61 static void on_server_write_more(struct http_conn *, void *);
62 static void on_server_flush(struct http_conn *, void *);
64 static const struct http_cbs client_methods = {
66 on_client_error,
67 on_client_request,
70 on_client_read_body,
71 on_client_msg_complete,
72 on_client_write_more,
73 on_client_flush
76 static const struct http_cbs server_methods = {
77 on_server_connected,
78 on_server_error,
80 on_server_continuation,
81 on_server_response,
82 on_server_read_body,
83 on_server_msg_complete,
84 on_server_write_more,
85 on_server_flush
88 static struct event_base *proxy_event_base;
89 static struct evdns_base *proxy_evdns_base;
90 static struct evconnlistener *listener = NULL;
91 static struct server_list idle_servers;
92 static size_t max_pending_requests = 8;
94 static struct server *
95 server_new(const char *host, int port, struct client *client)
97 struct server *server;
99 server = mem_calloc(1, sizeof(*server));
100 server->host = mem_strdup(host);
101 server->port = port;
102 server->client = client;
103 server->conn = http_conn_new(proxy_event_base, -1, HTTP_SERVER,
104 &server_methods, server);
105 log_debug("proxy: new server: %p, %s:%d",
106 server, server->host, server->port);
108 return server;
111 static inline int
112 server_match(const struct server *server, const char *host, int port)
114 return (!evutil_ascii_strcasecmp(server->host, host) &&
115 server->port == port);
118 static void
119 server_free(struct server *server)
121 struct server *tmp;
123 if (!server)
124 return;
126 log_debug("proxy: freeing server: %p, %s:%d",
127 server, server->host, server->port);
129 TAILQ_FOREACH(tmp, &idle_servers, next) {
130 if (tmp == server)
131 log_fatal("proxy: idle server %p still queued!",
132 server);
135 mem_free(server->host);
136 http_conn_free(server->conn);
137 mem_free(server);
140 static int
141 server_connect(struct server *server)
143 server->state = SERVER_STATE_CONNECTING;
144 log_debug("proxy: server %p, %s:%d connecting",
145 server, server->host, server->port);
146 // XXX AF_UNSPEC seems to cause crashes w/ IPv6 queries
147 return http_conn_connect(server->conn, proxy_evdns_base, AF_INET,
148 server->host, server->port);
151 static struct client *
152 client_new(evutil_socket_t sock)
154 struct client *client;
156 client = mem_calloc(1, sizeof(*client));
157 TAILQ_INIT(&client->requests);
158 client->conn = http_conn_new(proxy_event_base, sock, HTTP_CLIENT,
159 &client_methods, client);
161 log_debug("proxy: new client %p", client);
163 return client;
166 static void
167 client_free(struct client *client)
169 struct http_request *req;
171 if (!client)
172 return;
174 log_debug("proxy: freeing client: %p", client);
176 while ((req = TAILQ_FIRST(&client->requests))) {
177 TAILQ_REMOVE(&client->requests, req, next);
178 http_request_free(req);
181 server_free(client->server);
182 http_conn_free(client->conn);
183 mem_free(client);
186 static int
187 client_scrub_request(struct client *client, struct http_request *req)
189 if (req->meth == METH_CONNECT) {
190 assert(req->url->host && req->url->port >= 1);
191 // XXX we could filter host/port here
192 return 0;
195 if (!req->url->host) {
196 http_conn_send_error(client->conn, 403, "Forbidden");
197 goto fail;
199 if (evutil_ascii_strcasecmp(req->url->scheme, "http")) {
200 http_conn_send_error(client->conn, 400, "Invalid URL");
201 goto fail;
204 if (req->url->port < 0)
205 req->url->port = 80;
207 if (!headers_has_key(req->headers, "Host")) {
208 char *host;
209 size_t len = strlen(req->url->host) + 6;
210 host = mem_calloc(1, len);
211 evutil_snprintf(host, len, "%s:%d", req->url->host,
212 req->url->port);
213 headers_add_key_val(req->headers, "Host", host);
214 mem_free(host);
217 // XXX remove proxy auth msgs?
219 return 0;
221 fail:
222 http_request_free(req);
223 return -1;
226 static void
227 client_disassociate_server(struct client *client)
229 if (!client->server)
230 return;
232 if (http_conn_is_persistent(client->server->conn)) {
233 assert(client->server->state == SERVER_STATE_IDLE);
234 TAILQ_INSERT_TAIL(&idle_servers, client->server, next);
235 client->server->client = NULL;
236 client->server = NULL;
237 } else {
238 server_free(client->server);
239 client->server = NULL;
243 /* find a server to handle our current req */
244 static int
245 client_associate_server(struct client *client)
247 struct server *it;
248 struct url *url;
249 struct http_request *req;
251 req = TAILQ_FIRST(&client->requests);
252 if (!req)
253 return 0;
255 url = req->url;
256 assert(url && url->host != NULL && url->port > 0);
258 /* should we remove our current server? */
259 if (client->server &&
260 (!server_match(client->server, url->host, url->port) ||
261 req->meth == METH_CONNECT)) {
262 client_disassociate_server(client);
265 /* nothing more to do here... */
266 if (req->meth == METH_CONNECT)
267 return 0;
269 /* try to find an idle server */
270 TAILQ_FOREACH(it, &idle_servers, next) {
271 if (server_match(it, url->host, url->port)) {
272 TAILQ_REMOVE(&idle_servers, it, next);
273 assert(it->client == NULL);
274 client->server = it;
275 it->client = client;
276 log_debug("proxy: idle server %p, %s:%d associated to "
277 "client %p", it, it->host, it->port, client);
278 return 0;
282 /* we didn't find one. lets setup a new one. */
283 client->server = server_new(url->host, url->port, client);
285 return server_connect(client->server);
288 static void
289 client_start_reading_request_body(struct client *client, int on_continue)
291 assert(client->server != NULL);
293 /* should we wait for the server to send 100 continue? */
294 if (!on_continue && http_conn_expect_continue(client->conn))
295 return;
297 if (http_conn_current_message_has_body(client->conn) &&
298 client->nrequests == 1) {
299 http_conn_write_continue(client->conn);
300 http_conn_set_output_encoding(client->server->conn,
301 http_conn_get_current_message_body_encoding(client->conn));
302 http_conn_start_reading(client->conn);
306 /* returns 1 when there's a request we can dispatch with the associated
307 server. */
308 static int
309 client_dispatch_request(struct client *client)
311 struct http_request *req;
312 struct server *server = client->server;
314 req = TAILQ_FIRST(&client->requests);
315 if (!req)
316 return 0;
318 if (req->meth == METH_CONNECT) {
319 assert(server == NULL);
320 http_conn_start_tunnel(client->conn, proxy_evdns_base, AF_INET,
321 req->url->host, req->url->port);
322 return 0;
325 assert(server != NULL);
326 if (server->state == SERVER_STATE_REQUEST_SENT ||
327 server->state < SERVER_STATE_CONNECTED)
328 return 0;
330 /* it might be nice to support pipelining... */
331 if (server_match(server, req->url->host, req->url->port)) {
332 log_debug("proxy: writing %s request from client %p to "
333 "server %p, %s:%d",
334 http_method_to_string(req->meth),
335 server->client, server,
336 server->host, server->port);
337 http_conn_write_request(server->conn, req);
338 // XXX we may want to wait for 100-continue
339 client_start_reading_request_body(client, 0);
340 server->state = SERVER_STATE_REQUEST_SENT;
341 return 1;
344 return 0;
347 static void
348 client_discard_input(struct client *client)
350 if (http_conn_current_message_has_body(client->conn)) {
351 log_debug("proxy: will discard client msg body");
352 http_conn_disable_persistence(client->conn);
353 client->state = CLIENT_STATE_DISCARD_INPUT;
357 static void
358 client_close_on_flush(struct client *client)
360 log_debug("proxy: will close client on flush");
361 client->state = CLIENT_STATE_CLOSING;
362 http_conn_disable_persistence(client->conn);
363 http_conn_stop_reading(client->conn);
364 http_conn_flush(client->conn);
367 static void
368 client_write_response(struct client *client, struct http_response *resp)
370 struct http_request *req;
372 req = TAILQ_FIRST(&client->requests);
373 assert(req != NULL);
375 log_debug("proxy: got response for %s from %p, %s:%d: %s %d",
376 http_method_to_string(req->meth),
377 client->server, client->server->host, client->server->port,
378 http_version_to_string(resp->vers), resp->code);
380 if (HTTP_ERROR_RESPONSE(resp->code))
381 client_discard_input(client);
383 if (req->meth == METH_HEAD)
384 http_conn_set_current_message_bodyless(client->server->conn);
386 http_conn_set_output_encoding(client->conn, TE_IDENTITY);
387 if (http_conn_current_message_has_body(client->server->conn) &&
388 http_conn_is_persistent(client->conn) &&
389 http_conn_get_current_message_body_length(client->server->conn) < 0)
390 http_conn_set_output_encoding(client->conn, TE_CHUNKED);
392 http_conn_write_response(client->conn, resp);
395 static void
396 client_request_serviced(struct client *client)
398 struct http_request *req;
400 req = TAILQ_FIRST(&client->requests);
401 assert(req && client->nrequests > 0);
402 log_debug("proxy: request for client %p, %s %s serviced",
403 client, http_method_to_string(req->meth),
404 http_version_to_string(req->vers));
405 TAILQ_REMOVE(&client->requests, req, next);
406 http_request_free(req);
407 client->nrequests--;
409 if (client->state == CLIENT_STATE_ACTIVE) {
410 if (client->server)
411 client->server->state = SERVER_STATE_IDLE;
412 if (client->nrequests) {
413 client_associate_server(client);
414 client_dispatch_request(client);
415 } else
416 client_disassociate_server(client);
418 if (!http_conn_is_persistent(client->conn)) {
419 client_close_on_flush(client);
420 } else if (!http_conn_current_message_has_body(client->conn) &&
421 client->nrequests < max_pending_requests) {
422 http_conn_start_reading(client->conn);
427 static void
428 client_notice_server_failed(struct client *client, const char *msg)
430 struct http_request *req;
431 struct server *server = client->server;
433 client->server = NULL;
435 while ((req = TAILQ_FIRST(&client->requests))) {
436 if (evutil_ascii_strcasecmp(req->url->host, server->host) ||
437 req->url->port != server->port)
438 break;
439 http_conn_send_error(client->conn, 502, "%s", msg);
440 client_request_serviced(client);
444 /* http event slots */
446 static void
447 on_client_error(struct http_conn *conn, enum http_conn_error err, void *arg)
449 struct client *client = arg;
451 switch (err) {
452 case ERROR_CONNECT_FAILED:
453 case ERROR_IDLE_CONN_TIMEDOUT:
454 log_info("proxy: closing idle client connection.");
455 client_free(client);
456 break;
457 case ERROR_CLIENT_EXPECTATION_FAILED:
458 client_discard_input(client);
459 http_conn_send_error(conn, 417, "Expectation failed");
460 break;
461 case ERROR_TUNNEL_CONNECT_FAILED:
462 // XXX need a better msg here
463 http_conn_send_error(conn, 504,
464 "Connection failed");
465 client_request_serviced(client);
466 break;
467 case ERROR_HEADER_PARSE_FAILED:
468 client_close_on_flush(client);
469 http_conn_send_error(conn, 400,
470 "Couldn't parse client request");
471 break;
472 case ERROR_CLIENT_POST_WITHOUT_LENGTH:
473 client_close_on_flush(client);
474 http_conn_send_error(conn, 400,
475 "POST or PUT request without length");
476 break;
477 case ERROR_CHUNK_PARSE_FAILED:
478 client_close_on_flush(client);
479 http_conn_send_error(conn, 400,
480 "Chunk parse failed");
481 break;
482 case ERROR_TUNNEL_CLOSED:
483 log_debug("proxy: tunnel closed.");
484 client_free(client);
485 break;
486 case ERROR_INCOMPLETE_HEADERS:
487 case ERROR_INCOMPLETE_BODY:
488 case ERROR_WRITE_FAILED:
489 default:
490 log_warn("proxy: client error: %s",
491 http_conn_error_to_string(err));
492 client_free(client);
493 break;
497 static void
498 on_client_request(struct http_conn *conn, struct http_request *req, void *arg)
500 struct client *client = arg;
502 assert(client->state == CLIENT_STATE_ACTIVE);
504 if (client_scrub_request(client, req) < 0)
505 return;
507 TAILQ_INSERT_TAIL(&client->requests, req, next);
508 if (++client->nrequests > max_pending_requests ||
509 http_conn_current_message_has_body(conn))
510 http_conn_stop_reading(conn);
512 log_debug("proxy: new %s request for %s:%d (pipeline %u)",
513 http_method_to_string(req->meth),
514 req->url->host, req->url->port,
515 (unsigned)client->nrequests);
517 if (req->meth == METH_CONNECT)
518 client->state = CLIENT_STATE_TUNNEL;
520 if (!client->server && client_associate_server(client) < 0)
521 return;
523 client_dispatch_request(client);
526 static void
527 on_client_read_body(struct http_conn *conn, struct evbuffer *buf, void *arg)
529 struct client *client = arg;
531 if (client->state == CLIENT_STATE_DISCARD_INPUT)
532 evbuffer_drain(buf, -1);
533 else if (!http_conn_write_buf(client->server->conn, buf))
534 http_conn_stop_reading(conn);
537 static void
538 on_client_msg_complete(struct http_conn *conn, void *arg)
540 struct client *client = arg;
542 log_debug("proxy: finished reading client's message");
544 if (client->state == CLIENT_STATE_DISCARD_INPUT)
545 client_close_on_flush(client);
546 else if (http_conn_current_message_has_body(conn))
547 http_conn_write_finished(client->server->conn);
551 static void
552 on_client_write_more(struct http_conn *conn, void *arg)
554 struct client *client = arg;
556 http_conn_start_reading(client->server->conn);
559 static void
560 on_client_flush(struct http_conn *conn, void *arg)
562 struct client *client = arg;
564 if (client->state == CLIENT_STATE_CLOSING)
565 client_free(client);
568 static void
569 on_server_connected(struct http_conn *conn, void *arg)
571 struct server *server = arg;
573 assert(server->state == SERVER_STATE_CONNECTING);
574 server->state = SERVER_STATE_CONNECTED;
575 log_debug("proxy: server %p, %s:%d finished connecting",
576 server, server->host, server->port);
577 client_dispatch_request(server->client);
580 static void
581 on_server_error(struct http_conn *conn, enum http_conn_error err, void *arg)
583 struct server *server = arg;
584 const char *msg;
586 switch (server->state) {
587 case SERVER_STATE_CONNECTING:
588 case SERVER_STATE_CONNECTED:
589 case SERVER_STATE_REQUEST_SENT:
590 // XXX if we haven't serviced any reqs on this server yet,
591 // we should try resending the first request
592 if (err == ERROR_CONNECT_FAILED) {
593 assert(server->state == SERVER_STATE_CONNECTING);
594 msg = conn_get_connect_error();
595 log_error("proxy: connection to %s:%d failed: %s",
596 log_scrub(server->host), server->port, msg);
597 } else {
598 msg = http_conn_error_to_string(err);
599 log_error("proxy: error while communicating with "
600 "%s:%d: %s", log_scrub(server->host),
601 server->port, msg);
603 assert(server->client != NULL);
604 client_notice_server_failed(server->client, msg);
605 break;
606 case SERVER_STATE_IDLE:
607 assert(server->client == NULL);
608 TAILQ_REMOVE(&idle_servers, server, next);
609 log_debug("proxy: idle server connection %p, %s:%d closed",
610 server, server->host, server->port);
611 break;
612 default:
613 log_fatal("proxy: error cb called in invalid state");
616 server_free(server);
619 static void
620 on_server_continuation(struct http_conn *conn, void *arg)
622 struct server *server = arg;
624 log_debug("proxy: got 100 continue from server");
625 client_start_reading_request_body(server->client, 1);
628 static void
629 on_server_response(struct http_conn *conn, struct http_response *resp,
630 void *arg)
632 struct server *server = arg;
634 // XXX we should probably disable persistence on the server's
635 // connection if it sends an error response while we're sending
636 // client POST/PUT
637 client_write_response(server->client, resp);
639 if (http_conn_current_message_has_body(conn))
640 log_debug("proxy: will copy body from server %p to client %p",
641 server, server->client);
643 http_response_free(resp);
646 static void
647 on_server_read_body(struct http_conn *conn, struct evbuffer *buf, void *arg)
649 struct server *server = arg;
651 if (!http_conn_write_buf(server->client->conn, buf))
652 http_conn_stop_reading(conn);
655 static void
656 on_server_msg_complete(struct http_conn *conn, void *arg)
658 struct server *server = arg;
660 if (http_conn_current_message_has_body(conn))
661 http_conn_write_finished(server->client->conn);
662 client_request_serviced(server->client);
665 static void
666 on_server_write_more(struct http_conn *conn, void *arg)
668 struct server *server = arg;
670 http_conn_start_reading(server->client->conn);
673 static void
674 on_server_flush(struct http_conn *conn, void *arg)
678 static void
679 client_accept(struct evconnlistener *ecs, evutil_socket_t s,
680 struct sockaddr *addr, int len, void *arg)
682 struct client *client;
684 log_info("proxy: new client connection from %s",
685 format_addr(addr));
687 client = client_new(s);
689 // XXX do we want to keep track of the client obj somehow?
692 /* public API */
694 void
695 proxy_client_set_max_pending_requests(size_t nreqs)
697 max_pending_requests = nreqs;
700 size_t
701 proxy_client_get_max_pending_requests(void)
703 return max_pending_requests;
707 proxy_init(struct event_base *base, struct evdns_base *dns,
708 const struct sockaddr *listen_here, int socklen)
710 struct evconnlistener *lcs = NULL;
712 TAILQ_INIT(&idle_servers);
714 lcs = evconnlistener_new_bind(base, client_accept, NULL,
715 LEV_OPT_CLOSE_ON_FREE |
716 LEV_OPT_REUSEABLE,
717 -1, listen_here, socklen);
719 if (!lcs) {
720 log_socket_error("proxy: couldn't listen on %s",
721 format_addr(listen_here));
722 return -1;
725 log_notice("proxy: listening on %s", format_addr(listen_here));
727 listener = lcs;
728 proxy_event_base = base;
729 proxy_evdns_base = dns;
731 return 0;
735 void
736 proxy_cleanup(void)
738 // TODO