2 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 #define WIN32_LEAN_AND_MEAN
35 #undef WIN32_LEAN_AND_MEAN
38 #include <sys/types.h>
40 #include <sys/socket.h>
42 #ifdef HAVE_SYS_TIME_H
45 #include <sys/queue.h>
58 #include "evrpc-internal.h"
64 evrpc_init(struct evhttp
*http_server
)
66 struct evrpc_base
* base
= calloc(1, sizeof(struct evrpc_base
));
70 /* we rely on the tagging sub system */
73 TAILQ_INIT(&base
->registered_rpcs
);
74 TAILQ_INIT(&base
->input_hooks
);
75 TAILQ_INIT(&base
->output_hooks
);
76 base
->http_server
= http_server
;
82 evrpc_free(struct evrpc_base
*base
)
85 struct evrpc_hook
*hook
;
87 while ((rpc
= TAILQ_FIRST(&base
->registered_rpcs
)) != NULL
) {
88 assert(evrpc_unregister_rpc(base
, rpc
->uri
));
90 while ((hook
= TAILQ_FIRST(&base
->input_hooks
)) != NULL
) {
91 assert(evrpc_remove_hook(base
, EVRPC_INPUT
, hook
));
93 while ((hook
= TAILQ_FIRST(&base
->output_hooks
)) != NULL
) {
94 assert(evrpc_remove_hook(base
, EVRPC_OUTPUT
, hook
));
100 evrpc_add_hook(void *vbase
,
101 enum EVRPC_HOOK_TYPE hook_type
,
102 int (*cb
)(struct evhttp_request
*, struct evbuffer
*, void *),
105 struct _evrpc_hooks
*base
= vbase
;
106 struct evrpc_hook_list
*head
= NULL
;
107 struct evrpc_hook
*hook
= NULL
;
110 head
= &base
->in_hooks
;
113 head
= &base
->out_hooks
;
116 assert(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
119 hook
= calloc(1, sizeof(struct evrpc_hook
));
120 assert(hook
!= NULL
);
123 hook
->process_arg
= cb_arg
;
124 TAILQ_INSERT_TAIL(head
, hook
, next
);
130 evrpc_remove_hook_internal(struct evrpc_hook_list
*head
, void *handle
)
132 struct evrpc_hook
*hook
= NULL
;
133 TAILQ_FOREACH(hook
, head
, next
) {
134 if (hook
== handle
) {
135 TAILQ_REMOVE(head
, hook
, next
);
145 * remove the hook specified by the handle
149 evrpc_remove_hook(void *vbase
, enum EVRPC_HOOK_TYPE hook_type
, void *handle
)
151 struct _evrpc_hooks
*base
= vbase
;
152 struct evrpc_hook_list
*head
= NULL
;
155 head
= &base
->in_hooks
;
158 head
= &base
->out_hooks
;
161 assert(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
164 return (evrpc_remove_hook_internal(head
, handle
));
168 evrpc_process_hooks(struct evrpc_hook_list
*head
,
169 struct evhttp_request
*req
, struct evbuffer
*evbuf
)
171 struct evrpc_hook
*hook
;
172 TAILQ_FOREACH(hook
, head
, next
) {
173 if (hook
->process(req
, evbuf
, hook
->process_arg
) == -1)
180 static void evrpc_pool_schedule(struct evrpc_pool
*pool
);
181 static void evrpc_request_cb(struct evhttp_request
*, void *);
182 void evrpc_request_done(struct evrpc_req_generic
*);
185 * Registers a new RPC with the HTTP server. The evrpc object is expected
186 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
187 * calls this function.
191 evrpc_construct_uri(const char *uri
)
193 char *constructed_uri
;
194 int constructed_uri_len
;
196 constructed_uri_len
= strlen(EVRPC_URI_PREFIX
) + strlen(uri
) + 1;
197 if ((constructed_uri
= malloc(constructed_uri_len
)) == NULL
)
198 event_err(1, "%s: failed to register rpc at %s",
200 memcpy(constructed_uri
, EVRPC_URI_PREFIX
, strlen(EVRPC_URI_PREFIX
));
201 memcpy(constructed_uri
+ strlen(EVRPC_URI_PREFIX
), uri
, strlen(uri
));
202 constructed_uri
[constructed_uri_len
- 1] = '\0';
204 return (constructed_uri
);
208 evrpc_register_rpc(struct evrpc_base
*base
, struct evrpc
*rpc
,
209 void (*cb
)(struct evrpc_req_generic
*, void *), void *cb_arg
)
211 char *constructed_uri
= evrpc_construct_uri(rpc
->uri
);
215 rpc
->cb_arg
= cb_arg
;
217 TAILQ_INSERT_TAIL(&base
->registered_rpcs
, rpc
, next
);
219 evhttp_set_cb(base
->http_server
,
224 free(constructed_uri
);
230 evrpc_unregister_rpc(struct evrpc_base
*base
, const char *name
)
232 char *registered_uri
= NULL
;
235 /* find the right rpc; linear search might be slow */
236 TAILQ_FOREACH(rpc
, &base
->registered_rpcs
, next
) {
237 if (strcmp(rpc
->uri
, name
) == 0)
241 /* We did not find an RPC with this name */
244 TAILQ_REMOVE(&base
->registered_rpcs
, rpc
, next
);
246 free((char *)rpc
->uri
);
249 registered_uri
= evrpc_construct_uri(name
);
251 /* remove the http server callback */
252 assert(evhttp_del_cb(base
->http_server
, registered_uri
) == 0);
254 free(registered_uri
);
259 evrpc_request_cb(struct evhttp_request
*req
, void *arg
)
261 struct evrpc
*rpc
= arg
;
262 struct evrpc_req_generic
*rpc_state
= NULL
;
264 /* let's verify the outside parameters */
265 if (req
->type
!= EVHTTP_REQ_POST
||
266 EVBUFFER_LENGTH(req
->input_buffer
) <= 0)
270 * we might want to allow hooks to suspend the processing,
271 * but at the moment, we assume that they just act as simple
274 if (evrpc_process_hooks(&rpc
->base
->input_hooks
,
275 req
, req
->input_buffer
) == -1)
278 rpc_state
= calloc(1, sizeof(struct evrpc_req_generic
));
279 if (rpc_state
== NULL
)
282 /* let's check that we can parse the request */
283 rpc_state
->request
= rpc
->request_new();
284 if (rpc_state
->request
== NULL
)
287 rpc_state
->rpc
= rpc
;
289 if (rpc
->request_unmarshal(
290 rpc_state
->request
, req
->input_buffer
) == -1) {
291 /* we failed to parse the request; that's a bummer */
295 /* at this point, we have a well formed request, prepare the reply */
297 rpc_state
->reply
= rpc
->reply_new();
298 if (rpc_state
->reply
== NULL
)
301 rpc_state
->http_req
= req
;
302 rpc_state
->done
= evrpc_request_done
;
304 /* give the rpc to the user; they can deal with it */
305 rpc
->cb(rpc_state
, rpc
->cb_arg
);
310 evrpc_reqstate_free(rpc_state
);
311 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, "Service Error");
316 evrpc_reqstate_free(struct evrpc_req_generic
* rpc_state
)
318 /* clean up all memory */
319 if (rpc_state
!= NULL
) {
320 struct evrpc
*rpc
= rpc_state
->rpc
;
322 if (rpc_state
->request
!= NULL
)
323 rpc
->request_free(rpc_state
->request
);
324 if (rpc_state
->reply
!= NULL
)
325 rpc
->reply_free(rpc_state
->reply
);
331 evrpc_request_done(struct evrpc_req_generic
* rpc_state
)
333 struct evhttp_request
*req
= rpc_state
->http_req
;
334 struct evrpc
*rpc
= rpc_state
->rpc
;
335 struct evbuffer
* data
= NULL
;
337 if (rpc
->reply_complete(rpc_state
->reply
) == -1) {
338 /* the reply was not completely filled in. error out */
342 if ((data
= evbuffer_new()) == NULL
) {
347 /* serialize the reply */
348 rpc
->reply_marshal(data
, rpc_state
->reply
);
350 /* do hook based tweaks to the request */
351 if (evrpc_process_hooks(&rpc
->base
->output_hooks
,
355 /* on success, we are going to transmit marshaled binary data */
356 if (evhttp_find_header(req
->output_headers
, "Content-Type") == NULL
) {
357 evhttp_add_header(req
->output_headers
,
358 "Content-Type", "application/octet-stream");
361 evhttp_send_reply(req
, HTTP_OK
, "OK", data
);
365 evrpc_reqstate_free(rpc_state
);
372 evrpc_reqstate_free(rpc_state
);
373 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, "Service Error");
377 /* Client implementation of RPC site */
379 static int evrpc_schedule_request(struct evhttp_connection
*connection
,
380 struct evrpc_request_wrapper
*ctx
);
383 evrpc_pool_new(struct event_base
*base
)
385 struct evrpc_pool
*pool
= calloc(1, sizeof(struct evrpc_pool
));
389 TAILQ_INIT(&pool
->connections
);
390 TAILQ_INIT(&pool
->requests
);
392 TAILQ_INIT(&pool
->input_hooks
);
393 TAILQ_INIT(&pool
->output_hooks
);
402 evrpc_request_wrapper_free(struct evrpc_request_wrapper
*request
)
409 evrpc_pool_free(struct evrpc_pool
*pool
)
411 struct evhttp_connection
*connection
;
412 struct evrpc_request_wrapper
*request
;
413 struct evrpc_hook
*hook
;
415 while ((request
= TAILQ_FIRST(&pool
->requests
)) != NULL
) {
416 TAILQ_REMOVE(&pool
->requests
, request
, next
);
417 /* if this gets more complicated we need our own function */
418 evrpc_request_wrapper_free(request
);
421 while ((connection
= TAILQ_FIRST(&pool
->connections
)) != NULL
) {
422 TAILQ_REMOVE(&pool
->connections
, connection
, next
);
423 evhttp_connection_free(connection
);
426 while ((hook
= TAILQ_FIRST(&pool
->input_hooks
)) != NULL
) {
427 assert(evrpc_remove_hook(pool
, EVRPC_INPUT
, hook
));
430 while ((hook
= TAILQ_FIRST(&pool
->output_hooks
)) != NULL
) {
431 assert(evrpc_remove_hook(pool
, EVRPC_OUTPUT
, hook
));
438 * Add a connection to the RPC pool. A request scheduled on the pool
439 * may use any available connection.
443 evrpc_pool_add_connection(struct evrpc_pool
*pool
,
444 struct evhttp_connection
*connection
) {
445 assert(connection
->http_server
== NULL
);
446 TAILQ_INSERT_TAIL(&pool
->connections
, connection
, next
);
449 * associate an event base with this connection
451 if (pool
->base
!= NULL
)
452 evhttp_connection_set_base(connection
, pool
->base
);
455 * unless a timeout was specifically set for a connection,
456 * the connection inherits the timeout from the pool.
458 if (connection
->timeout
== -1)
459 connection
->timeout
= pool
->timeout
;
462 * if we have any requests pending, schedule them with the new
466 if (TAILQ_FIRST(&pool
->requests
) != NULL
) {
467 struct evrpc_request_wrapper
*request
=
468 TAILQ_FIRST(&pool
->requests
);
469 TAILQ_REMOVE(&pool
->requests
, request
, next
);
470 evrpc_schedule_request(connection
, request
);
475 evrpc_pool_set_timeout(struct evrpc_pool
*pool
, int timeout_in_secs
)
477 struct evhttp_connection
*evcon
;
478 TAILQ_FOREACH(evcon
, &pool
->connections
, next
) {
479 evcon
->timeout
= timeout_in_secs
;
481 pool
->timeout
= timeout_in_secs
;
485 static void evrpc_reply_done(struct evhttp_request
*, void *);
486 static void evrpc_request_timeout(int, short, void *);
489 * Finds a connection object associated with the pool that is currently
490 * idle and can be used to make a request.
492 static struct evhttp_connection
*
493 evrpc_pool_find_connection(struct evrpc_pool
*pool
)
495 struct evhttp_connection
*connection
;
496 TAILQ_FOREACH(connection
, &pool
->connections
, next
) {
497 if (TAILQ_FIRST(&connection
->requests
) == NULL
)
505 * We assume that the ctx is no longer queued on the pool.
508 evrpc_schedule_request(struct evhttp_connection
*connection
,
509 struct evrpc_request_wrapper
*ctx
)
511 struct evhttp_request
*req
= NULL
;
512 struct evrpc_pool
*pool
= ctx
->pool
;
513 struct evrpc_status status
;
517 if ((req
= evhttp_request_new(evrpc_reply_done
, ctx
)) == NULL
)
520 /* serialize the request data into the output buffer */
521 ctx
->request_marshal(req
->output_buffer
, ctx
->request
);
523 uri
= evrpc_construct_uri(ctx
->name
);
527 /* we need to know the connection that we might have to abort */
528 ctx
->evcon
= connection
;
530 /* apply hooks to the outgoing request */
531 if (evrpc_process_hooks(&pool
->output_hooks
,
532 req
, req
->output_buffer
) == -1)
535 if (pool
->timeout
> 0) {
537 * a timeout after which the whole rpc is going to be aborted.
540 evutil_timerclear(&tv
);
541 tv
.tv_sec
= pool
->timeout
;
542 evtimer_add(&ctx
->ev_timeout
, &tv
);
545 /* start the request over the connection */
546 res
= evhttp_make_request(connection
, req
, EVHTTP_REQ_POST
, uri
);
555 memset(&status
, 0, sizeof(status
));
556 status
.error
= EVRPC_STATUS_ERR_UNSTARTED
;
557 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
558 evrpc_request_wrapper_free(ctx
);
563 evrpc_make_request(struct evrpc_request_wrapper
*ctx
)
565 struct evrpc_pool
*pool
= ctx
->pool
;
567 /* initialize the event structure for this rpc */
568 evtimer_set(&ctx
->ev_timeout
, evrpc_request_timeout
, ctx
);
569 if (pool
->base
!= NULL
)
570 event_base_set(pool
->base
, &ctx
->ev_timeout
);
572 /* we better have some available connections on the pool */
573 assert(TAILQ_FIRST(&pool
->connections
) != NULL
);
576 * if no connection is available, we queue the request on the pool,
577 * the next time a connection is empty, the rpc will be send on that.
579 TAILQ_INSERT_TAIL(&pool
->requests
, ctx
, next
);
581 evrpc_pool_schedule(pool
);
587 evrpc_reply_done(struct evhttp_request
*req
, void *arg
)
589 struct evrpc_request_wrapper
*ctx
= arg
;
590 struct evrpc_pool
*pool
= ctx
->pool
;
591 struct evrpc_status status
;
594 /* cancel any timeout we might have scheduled */
595 event_del(&ctx
->ev_timeout
);
597 memset(&status
, 0, sizeof(status
));
598 status
.http_req
= req
;
600 /* we need to get the reply now */
602 /* apply hooks to the incoming request */
603 if (evrpc_process_hooks(&pool
->input_hooks
,
604 req
, req
->input_buffer
) == -1) {
605 status
.error
= EVRPC_STATUS_ERR_HOOKABORTED
;
608 res
= ctx
->reply_unmarshal(ctx
->reply
,
611 status
.error
= EVRPC_STATUS_ERR_BADPAYLOAD
;
615 status
.error
= EVRPC_STATUS_ERR_TIMEOUT
;
619 /* clear everything that we might have written previously */
620 ctx
->reply_clear(ctx
->reply
);
623 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
625 evrpc_request_wrapper_free(ctx
);
627 /* the http layer owns the request structure */
629 /* see if we can schedule another request */
630 evrpc_pool_schedule(pool
);
634 evrpc_pool_schedule(struct evrpc_pool
*pool
)
636 struct evrpc_request_wrapper
*ctx
= TAILQ_FIRST(&pool
->requests
);
637 struct evhttp_connection
*evcon
;
639 /* if no requests are pending, we have no work */
643 if ((evcon
= evrpc_pool_find_connection(pool
)) != NULL
) {
644 TAILQ_REMOVE(&pool
->requests
, ctx
, next
);
645 evrpc_schedule_request(evcon
, ctx
);
650 evrpc_request_timeout(int fd
, short what
, void *arg
)
652 struct evrpc_request_wrapper
*ctx
= arg
;
653 struct evhttp_connection
*evcon
= ctx
->evcon
;
654 assert(evcon
!= NULL
);
656 evhttp_connection_fail(evcon
, EVCON_HTTP_TIMEOUT
);