2 * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
3 * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 #include "event2/event-config.h"
30 #define WIN32_LEAN_AND_MEAN
33 #undef WIN32_LEAN_AND_MEAN
36 #include <sys/types.h>
38 #include <sys/socket.h>
40 #ifdef _EVENT_HAVE_SYS_TIME_H
43 #include <sys/queue.h>
53 #include <sys/queue.h>
55 #include "event2/event.h"
56 #include "event2/event_struct.h"
57 #include "event2/rpc.h"
58 #include "event2/rpc_struct.h"
59 #include "evrpc-internal.h"
60 #include "event2/http.h"
61 #include "event2/buffer.h"
62 #include "event2/tag.h"
63 #include "event2/http_struct.h"
64 #include "event2/http_compat.h"
65 #include "event2/util.h"
66 #include "util-internal.h"
67 #include "log-internal.h"
68 #include "mm-internal.h"
71 evrpc_init(struct evhttp
*http_server
)
73 struct evrpc_base
* base
= mm_calloc(1, sizeof(struct evrpc_base
));
77 /* we rely on the tagging sub system */
80 TAILQ_INIT(&base
->registered_rpcs
);
81 TAILQ_INIT(&base
->input_hooks
);
82 TAILQ_INIT(&base
->output_hooks
);
84 TAILQ_INIT(&base
->paused_requests
);
86 base
->http_server
= http_server
;
92 evrpc_free(struct evrpc_base
*base
)
95 struct evrpc_hook
*hook
;
96 struct evrpc_hook_ctx
*pause
;
99 while ((rpc
= TAILQ_FIRST(&base
->registered_rpcs
)) != NULL
) {
100 r
= evrpc_unregister_rpc(base
, rpc
->uri
);
101 EVUTIL_ASSERT(r
== 0);
103 while ((pause
= TAILQ_FIRST(&base
->paused_requests
)) != NULL
) {
104 TAILQ_REMOVE(&base
->paused_requests
, pause
, next
);
107 while ((hook
= TAILQ_FIRST(&base
->input_hooks
)) != NULL
) {
108 r
= evrpc_remove_hook(base
, EVRPC_INPUT
, hook
);
111 while ((hook
= TAILQ_FIRST(&base
->output_hooks
)) != NULL
) {
112 r
= evrpc_remove_hook(base
, EVRPC_OUTPUT
, hook
);
119 evrpc_add_hook(void *vbase
,
120 enum EVRPC_HOOK_TYPE hook_type
,
121 int (*cb
)(void *, struct evhttp_request
*, struct evbuffer
*, void *),
124 struct _evrpc_hooks
*base
= vbase
;
125 struct evrpc_hook_list
*head
= NULL
;
126 struct evrpc_hook
*hook
= NULL
;
129 head
= &base
->in_hooks
;
132 head
= &base
->out_hooks
;
135 EVUTIL_ASSERT(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
138 hook
= mm_calloc(1, sizeof(struct evrpc_hook
));
139 EVUTIL_ASSERT(hook
!= NULL
);
142 hook
->process_arg
= cb_arg
;
143 TAILQ_INSERT_TAIL(head
, hook
, next
);
149 evrpc_remove_hook_internal(struct evrpc_hook_list
*head
, void *handle
)
151 struct evrpc_hook
*hook
= NULL
;
152 TAILQ_FOREACH(hook
, head
, next
) {
153 if (hook
== handle
) {
154 TAILQ_REMOVE(head
, hook
, next
);
164 * remove the hook specified by the handle
168 evrpc_remove_hook(void *vbase
, enum EVRPC_HOOK_TYPE hook_type
, void *handle
)
170 struct _evrpc_hooks
*base
= vbase
;
171 struct evrpc_hook_list
*head
= NULL
;
174 head
= &base
->in_hooks
;
177 head
= &base
->out_hooks
;
180 EVUTIL_ASSERT(hook_type
== EVRPC_INPUT
|| hook_type
== EVRPC_OUTPUT
);
183 return (evrpc_remove_hook_internal(head
, handle
));
187 evrpc_process_hooks(struct evrpc_hook_list
*head
, void *ctx
,
188 struct evhttp_request
*req
, struct evbuffer
*evbuf
)
190 struct evrpc_hook
*hook
;
191 TAILQ_FOREACH(hook
, head
, next
) {
192 int res
= hook
->process(ctx
, req
, evbuf
, hook
->process_arg
);
193 if (res
!= EVRPC_CONTINUE
)
197 return (EVRPC_CONTINUE
);
200 static void evrpc_pool_schedule(struct evrpc_pool
*pool
);
201 static void evrpc_request_cb(struct evhttp_request
*, void *);
204 * Registers a new RPC with the HTTP server. The evrpc object is expected
205 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
206 * calls this function.
210 evrpc_construct_uri(const char *uri
)
212 char *constructed_uri
;
213 size_t constructed_uri_len
;
215 constructed_uri_len
= strlen(EVRPC_URI_PREFIX
) + strlen(uri
) + 1;
216 if ((constructed_uri
= mm_malloc(constructed_uri_len
)) == NULL
)
217 event_err(1, "%s: failed to register rpc at %s",
219 memcpy(constructed_uri
, EVRPC_URI_PREFIX
, strlen(EVRPC_URI_PREFIX
));
220 memcpy(constructed_uri
+ strlen(EVRPC_URI_PREFIX
), uri
, strlen(uri
));
221 constructed_uri
[constructed_uri_len
- 1] = '\0';
223 return (constructed_uri
);
227 evrpc_register_rpc(struct evrpc_base
*base
, struct evrpc
*rpc
,
228 void (*cb
)(struct evrpc_req_generic
*, void *), void *cb_arg
)
230 char *constructed_uri
= evrpc_construct_uri(rpc
->uri
);
234 rpc
->cb_arg
= cb_arg
;
236 TAILQ_INSERT_TAIL(&base
->registered_rpcs
, rpc
, next
);
238 evhttp_set_cb(base
->http_server
,
243 mm_free(constructed_uri
);
249 evrpc_unregister_rpc(struct evrpc_base
*base
, const char *name
)
251 char *registered_uri
= NULL
;
255 /* find the right rpc; linear search might be slow */
256 TAILQ_FOREACH(rpc
, &base
->registered_rpcs
, next
) {
257 if (strcmp(rpc
->uri
, name
) == 0)
261 /* We did not find an RPC with this name */
264 TAILQ_REMOVE(&base
->registered_rpcs
, rpc
, next
);
266 registered_uri
= evrpc_construct_uri(name
);
268 /* remove the http server callback */
269 r
= evhttp_del_cb(base
->http_server
, registered_uri
);
270 EVUTIL_ASSERT(r
== 0);
272 mm_free(registered_uri
);
274 mm_free((char *)rpc
->uri
);
279 static int evrpc_pause_request(void *vbase
, void *ctx
,
280 void (*cb
)(void *, enum EVRPC_HOOK_RESULT
));
281 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT
);
284 evrpc_request_cb(struct evhttp_request
*req
, void *arg
)
286 struct evrpc
*rpc
= arg
;
287 struct evrpc_req_generic
*rpc_state
= NULL
;
289 /* let's verify the outside parameters */
290 if (req
->type
!= EVHTTP_REQ_POST
||
291 evbuffer_get_length(req
->input_buffer
) <= 0)
294 rpc_state
= mm_calloc(1, sizeof(struct evrpc_req_generic
));
295 if (rpc_state
== NULL
)
297 rpc_state
->rpc
= rpc
;
298 rpc_state
->http_req
= req
;
299 rpc_state
->rpc_data
= NULL
;
301 if (TAILQ_FIRST(&rpc
->base
->input_hooks
) != NULL
) {
304 evrpc_hook_associate_meta(&rpc_state
->hook_meta
, req
->evcon
);
307 * allow hooks to modify the outgoing request
309 hook_res
= evrpc_process_hooks(&rpc
->base
->input_hooks
,
310 rpc_state
, req
, req
->input_buffer
);
312 case EVRPC_TERMINATE
:
315 evrpc_pause_request(rpc
->base
, rpc_state
,
316 evrpc_request_cb_closure
);
321 EVUTIL_ASSERT(hook_res
== EVRPC_TERMINATE
||
322 hook_res
== EVRPC_CONTINUE
||
323 hook_res
== EVRPC_PAUSE
);
327 evrpc_request_cb_closure(rpc_state
, EVRPC_CONTINUE
);
331 if (rpc_state
!= NULL
)
332 evrpc_reqstate_free(rpc_state
);
333 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, NULL
);
338 evrpc_request_cb_closure(void *arg
, enum EVRPC_HOOK_RESULT hook_res
)
340 struct evrpc_req_generic
*rpc_state
= arg
;
341 struct evrpc
*rpc
= rpc_state
->rpc
;
342 struct evhttp_request
*req
= rpc_state
->http_req
;
344 if (hook_res
== EVRPC_TERMINATE
)
347 /* let's check that we can parse the request */
348 rpc_state
->request
= rpc
->request_new(rpc
->request_new_arg
);
349 if (rpc_state
->request
== NULL
)
352 if (rpc
->request_unmarshal(
353 rpc_state
->request
, req
->input_buffer
) == -1) {
354 /* we failed to parse the request; that's a bummer */
358 /* at this point, we have a well formed request, prepare the reply */
360 rpc_state
->reply
= rpc
->reply_new(rpc
->reply_new_arg
);
361 if (rpc_state
->reply
== NULL
)
364 /* give the rpc to the user; they can deal with it */
365 rpc
->cb(rpc_state
, rpc
->cb_arg
);
370 if (rpc_state
!= NULL
)
371 evrpc_reqstate_free(rpc_state
);
372 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, NULL
);
378 evrpc_reqstate_free(struct evrpc_req_generic
* rpc_state
)
381 EVUTIL_ASSERT(rpc_state
!= NULL
);
382 rpc
= rpc_state
->rpc
;
384 /* clean up all memory */
385 if (rpc_state
->hook_meta
!= NULL
)
386 evrpc_hook_context_free(rpc_state
->hook_meta
);
387 if (rpc_state
->request
!= NULL
)
388 rpc
->request_free(rpc_state
->request
);
389 if (rpc_state
->reply
!= NULL
)
390 rpc
->reply_free(rpc_state
->reply
);
391 if (rpc_state
->rpc_data
!= NULL
)
392 evbuffer_free(rpc_state
->rpc_data
);
397 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT
);
400 evrpc_request_done(struct evrpc_req_generic
*rpc_state
)
402 struct evhttp_request
*req
= rpc_state
->http_req
;
403 struct evrpc
*rpc
= rpc_state
->rpc
;
405 if (rpc
->reply_complete(rpc_state
->reply
) == -1) {
406 /* the reply was not completely filled in. error out */
410 if ((rpc_state
->rpc_data
= evbuffer_new()) == NULL
) {
415 /* serialize the reply */
416 rpc
->reply_marshal(rpc_state
->rpc_data
, rpc_state
->reply
);
418 if (TAILQ_FIRST(&rpc
->base
->output_hooks
) != NULL
) {
421 evrpc_hook_associate_meta(&rpc_state
->hook_meta
, req
->evcon
);
423 /* do hook based tweaks to the request */
424 hook_res
= evrpc_process_hooks(&rpc
->base
->output_hooks
,
425 rpc_state
, req
, rpc_state
->rpc_data
);
427 case EVRPC_TERMINATE
:
430 if (evrpc_pause_request(rpc
->base
, rpc_state
,
431 evrpc_request_done_closure
) == -1)
437 EVUTIL_ASSERT(hook_res
== EVRPC_TERMINATE
||
438 hook_res
== EVRPC_CONTINUE
||
439 hook_res
== EVRPC_PAUSE
);
443 evrpc_request_done_closure(rpc_state
, EVRPC_CONTINUE
);
447 if (rpc_state
!= NULL
)
448 evrpc_reqstate_free(rpc_state
);
449 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, NULL
);
454 evrpc_get_request(struct evrpc_req_generic
*req
)
460 evrpc_get_reply(struct evrpc_req_generic
*req
)
466 evrpc_request_done_closure(void *arg
, enum EVRPC_HOOK_RESULT hook_res
)
468 struct evrpc_req_generic
*rpc_state
= arg
;
469 struct evhttp_request
*req
= rpc_state
->http_req
;
471 if (hook_res
== EVRPC_TERMINATE
)
474 /* on success, we are going to transmit marshaled binary data */
475 if (evhttp_find_header(req
->output_headers
, "Content-Type") == NULL
) {
476 evhttp_add_header(req
->output_headers
,
477 "Content-Type", "application/octet-stream");
479 evhttp_send_reply(req
, HTTP_OK
, "OK", rpc_state
->rpc_data
);
481 evrpc_reqstate_free(rpc_state
);
486 if (rpc_state
!= NULL
)
487 evrpc_reqstate_free(rpc_state
);
488 evhttp_send_error(req
, HTTP_SERVUNAVAIL
, NULL
);
493 /* Client implementation of RPC site */
495 static int evrpc_schedule_request(struct evhttp_connection
*connection
,
496 struct evrpc_request_wrapper
*ctx
);
499 evrpc_pool_new(struct event_base
*base
)
501 struct evrpc_pool
*pool
= mm_calloc(1, sizeof(struct evrpc_pool
));
505 TAILQ_INIT(&pool
->connections
);
506 TAILQ_INIT(&pool
->requests
);
508 TAILQ_INIT(&pool
->paused_requests
);
510 TAILQ_INIT(&pool
->input_hooks
);
511 TAILQ_INIT(&pool
->output_hooks
);
520 evrpc_request_wrapper_free(struct evrpc_request_wrapper
*request
)
522 if (request
->hook_meta
!= NULL
)
523 evrpc_hook_context_free(request
->hook_meta
);
524 mm_free(request
->name
);
529 evrpc_pool_free(struct evrpc_pool
*pool
)
531 struct evhttp_connection
*connection
;
532 struct evrpc_request_wrapper
*request
;
533 struct evrpc_hook_ctx
*pause
;
534 struct evrpc_hook
*hook
;
537 while ((request
= TAILQ_FIRST(&pool
->requests
)) != NULL
) {
538 TAILQ_REMOVE(&pool
->requests
, request
, next
);
539 evrpc_request_wrapper_free(request
);
542 while ((pause
= TAILQ_FIRST(&pool
->paused_requests
)) != NULL
) {
543 TAILQ_REMOVE(&pool
->paused_requests
, pause
, next
);
547 while ((connection
= TAILQ_FIRST(&pool
->connections
)) != NULL
) {
548 TAILQ_REMOVE(&pool
->connections
, connection
, next
);
549 evhttp_connection_free(connection
);
552 while ((hook
= TAILQ_FIRST(&pool
->input_hooks
)) != NULL
) {
553 r
= evrpc_remove_hook(pool
, EVRPC_INPUT
, hook
);
557 while ((hook
= TAILQ_FIRST(&pool
->output_hooks
)) != NULL
) {
558 r
= evrpc_remove_hook(pool
, EVRPC_OUTPUT
, hook
);
566 * Add a connection to the RPC pool. A request scheduled on the pool
567 * may use any available connection.
571 evrpc_pool_add_connection(struct evrpc_pool
*pool
,
572 struct evhttp_connection
*connection
)
574 EVUTIL_ASSERT(connection
->http_server
== NULL
);
575 TAILQ_INSERT_TAIL(&pool
->connections
, connection
, next
);
578 * associate an event base with this connection
580 if (pool
->base
!= NULL
)
581 evhttp_connection_set_base(connection
, pool
->base
);
584 * unless a timeout was specifically set for a connection,
585 * the connection inherits the timeout from the pool.
587 if (connection
->timeout
== -1)
588 connection
->timeout
= pool
->timeout
;
591 * if we have any requests pending, schedule them with the new
595 if (TAILQ_FIRST(&pool
->requests
) != NULL
) {
596 struct evrpc_request_wrapper
*request
=
597 TAILQ_FIRST(&pool
->requests
);
598 TAILQ_REMOVE(&pool
->requests
, request
, next
);
599 evrpc_schedule_request(connection
, request
);
604 evrpc_pool_remove_connection(struct evrpc_pool
*pool
,
605 struct evhttp_connection
*connection
)
607 TAILQ_REMOVE(&pool
->connections
, connection
, next
);
611 evrpc_pool_set_timeout(struct evrpc_pool
*pool
, int timeout_in_secs
)
613 struct evhttp_connection
*evcon
;
614 TAILQ_FOREACH(evcon
, &pool
->connections
, next
) {
615 evcon
->timeout
= timeout_in_secs
;
617 pool
->timeout
= timeout_in_secs
;
621 static void evrpc_reply_done(struct evhttp_request
*, void *);
622 static void evrpc_request_timeout(evutil_socket_t
, short, void *);
625 * Finds a connection object associated with the pool that is currently
626 * idle and can be used to make a request.
628 static struct evhttp_connection
*
629 evrpc_pool_find_connection(struct evrpc_pool
*pool
)
631 struct evhttp_connection
*connection
;
632 TAILQ_FOREACH(connection
, &pool
->connections
, next
) {
633 if (TAILQ_FIRST(&connection
->requests
) == NULL
)
641 * Prototypes responsible for evrpc scheduling and hooking
644 static void evrpc_schedule_request_closure(void *ctx
, enum EVRPC_HOOK_RESULT
);
647 * We assume that the ctx is no longer queued on the pool.
650 evrpc_schedule_request(struct evhttp_connection
*connection
,
651 struct evrpc_request_wrapper
*ctx
)
653 struct evhttp_request
*req
= NULL
;
654 struct evrpc_pool
*pool
= ctx
->pool
;
655 struct evrpc_status status
;
657 if ((req
= evhttp_request_new(evrpc_reply_done
, ctx
)) == NULL
)
660 /* serialize the request data into the output buffer */
661 ctx
->request_marshal(req
->output_buffer
, ctx
->request
);
663 /* we need to know the connection that we might have to abort */
664 ctx
->evcon
= connection
;
666 /* if we get paused we also need to know the request */
669 if (TAILQ_FIRST(&pool
->output_hooks
) != NULL
) {
672 evrpc_hook_associate_meta(&ctx
->hook_meta
, connection
);
674 /* apply hooks to the outgoing request */
675 hook_res
= evrpc_process_hooks(&pool
->output_hooks
,
676 ctx
, req
, req
->output_buffer
);
679 case EVRPC_TERMINATE
:
682 /* we need to be explicitly resumed */
683 if (evrpc_pause_request(pool
, ctx
,
684 evrpc_schedule_request_closure
) == -1)
688 /* we can just continue */
691 EVUTIL_ASSERT(hook_res
== EVRPC_TERMINATE
||
692 hook_res
== EVRPC_CONTINUE
||
693 hook_res
== EVRPC_PAUSE
);
697 evrpc_schedule_request_closure(ctx
, EVRPC_CONTINUE
);
701 memset(&status
, 0, sizeof(status
));
702 status
.error
= EVRPC_STATUS_ERR_UNSTARTED
;
703 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
704 evrpc_request_wrapper_free(ctx
);
709 evrpc_schedule_request_closure(void *arg
, enum EVRPC_HOOK_RESULT hook_res
)
711 struct evrpc_request_wrapper
*ctx
= arg
;
712 struct evhttp_connection
*connection
= ctx
->evcon
;
713 struct evhttp_request
*req
= ctx
->req
;
714 struct evrpc_pool
*pool
= ctx
->pool
;
715 struct evrpc_status status
;
719 if (hook_res
== EVRPC_TERMINATE
)
722 uri
= evrpc_construct_uri(ctx
->name
);
726 if (pool
->timeout
> 0) {
728 * a timeout after which the whole rpc is going to be aborted.
731 evutil_timerclear(&tv
);
732 tv
.tv_sec
= pool
->timeout
;
733 evtimer_add(&ctx
->ev_timeout
, &tv
);
736 /* start the request over the connection */
737 res
= evhttp_make_request(connection
, req
, EVHTTP_REQ_POST
, uri
);
746 memset(&status
, 0, sizeof(status
));
747 status
.error
= EVRPC_STATUS_ERR_UNSTARTED
;
748 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
749 evrpc_request_wrapper_free(ctx
);
752 /* we just queue the paused request on the pool under the req object */
754 evrpc_pause_request(void *vbase
, void *ctx
,
755 void (*cb
)(void *, enum EVRPC_HOOK_RESULT
))
757 struct _evrpc_hooks
*base
= vbase
;
758 struct evrpc_hook_ctx
*pause
= mm_malloc(sizeof(*pause
));
765 TAILQ_INSERT_TAIL(&base
->pause_requests
, pause
, next
);
770 evrpc_resume_request(void *vbase
, void *ctx
, enum EVRPC_HOOK_RESULT res
)
772 struct _evrpc_hooks
*base
= vbase
;
773 struct evrpc_pause_list
*head
= &base
->pause_requests
;
774 struct evrpc_hook_ctx
*pause
;
776 TAILQ_FOREACH(pause
, head
, next
) {
777 if (pause
->ctx
== ctx
)
784 (*pause
->cb
)(pause
->ctx
, res
);
785 TAILQ_REMOVE(head
, pause
, next
);
791 evrpc_make_request(struct evrpc_request_wrapper
*ctx
)
793 struct evrpc_pool
*pool
= ctx
->pool
;
795 /* initialize the event structure for this rpc */
796 evtimer_assign(&ctx
->ev_timeout
, pool
->base
, evrpc_request_timeout
, ctx
);
798 /* we better have some available connections on the pool */
799 EVUTIL_ASSERT(TAILQ_FIRST(&pool
->connections
) != NULL
);
802 * if no connection is available, we queue the request on the pool,
803 * the next time a connection is empty, the rpc will be send on that.
805 TAILQ_INSERT_TAIL(&pool
->requests
, ctx
, next
);
807 evrpc_pool_schedule(pool
);
813 struct evrpc_request_wrapper
*
814 evrpc_make_request_ctx(
815 struct evrpc_pool
*pool
, void *request
, void *reply
,
817 void (*req_marshal
)(struct evbuffer
*, void *),
818 void (*rpl_clear
)(void *),
819 int (*rpl_unmarshal
)(void *, struct evbuffer
*),
820 void (*cb
)(struct evrpc_status
*, void *, void *, void *),
823 struct evrpc_request_wrapper
*ctx
= (struct evrpc_request_wrapper
*)
824 mm_malloc(sizeof(struct evrpc_request_wrapper
));
829 ctx
->hook_meta
= NULL
;
831 ctx
->name
= mm_strdup(rpcname
);
832 if (ctx
->name
== NULL
) {
838 ctx
->request
= request
;
840 ctx
->request_marshal
= req_marshal
;
841 ctx
->reply_clear
= rpl_clear
;
842 ctx
->reply_unmarshal
= rpl_unmarshal
;
848 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT
);
851 evrpc_reply_done(struct evhttp_request
*req
, void *arg
)
853 struct evrpc_request_wrapper
*ctx
= arg
;
854 struct evrpc_pool
*pool
= ctx
->pool
;
855 int hook_res
= EVRPC_CONTINUE
;
857 /* cancel any timeout we might have scheduled */
858 event_del(&ctx
->ev_timeout
);
862 /* we need to get the reply now */
864 evrpc_reply_done_closure(ctx
, EVRPC_CONTINUE
);
868 if (TAILQ_FIRST(&pool
->input_hooks
) != NULL
) {
869 evrpc_hook_associate_meta(&ctx
->hook_meta
, ctx
->evcon
);
871 /* apply hooks to the incoming request */
872 hook_res
= evrpc_process_hooks(&pool
->input_hooks
,
873 ctx
, req
, req
->input_buffer
);
876 case EVRPC_TERMINATE
:
881 * if we get paused we also need to know the
882 * request. unfortunately, the underlying
883 * layer is going to free it. we need to
884 * request ownership explicitly
887 evhttp_request_own(req
);
889 evrpc_pause_request(pool
, ctx
,
890 evrpc_reply_done_closure
);
893 EVUTIL_ASSERT(hook_res
== EVRPC_TERMINATE
||
894 hook_res
== EVRPC_CONTINUE
||
895 hook_res
== EVRPC_PAUSE
);
899 evrpc_reply_done_closure(ctx
, hook_res
);
901 /* http request is being freed by underlying layer */
905 evrpc_reply_done_closure(void *arg
, enum EVRPC_HOOK_RESULT hook_res
)
907 struct evrpc_request_wrapper
*ctx
= arg
;
908 struct evhttp_request
*req
= ctx
->req
;
909 struct evrpc_pool
*pool
= ctx
->pool
;
910 struct evrpc_status status
;
913 memset(&status
, 0, sizeof(status
));
914 status
.http_req
= req
;
916 /* we need to get the reply now */
918 status
.error
= EVRPC_STATUS_ERR_TIMEOUT
;
919 } else if (hook_res
== EVRPC_TERMINATE
) {
920 status
.error
= EVRPC_STATUS_ERR_HOOKABORTED
;
922 res
= ctx
->reply_unmarshal(ctx
->reply
, req
->input_buffer
);
924 status
.error
= EVRPC_STATUS_ERR_BADPAYLOAD
;
928 /* clear everything that we might have written previously */
929 ctx
->reply_clear(ctx
->reply
);
932 (*ctx
->cb
)(&status
, ctx
->request
, ctx
->reply
, ctx
->cb_arg
);
934 evrpc_request_wrapper_free(ctx
);
936 /* the http layer owned the original request structure, but if we
937 * got paused, we asked for ownership and need to free it here. */
938 if (req
!= NULL
&& evhttp_request_is_owned(req
))
939 evhttp_request_free(req
);
941 /* see if we can schedule another request */
942 evrpc_pool_schedule(pool
);
946 evrpc_pool_schedule(struct evrpc_pool
*pool
)
948 struct evrpc_request_wrapper
*ctx
= TAILQ_FIRST(&pool
->requests
);
949 struct evhttp_connection
*evcon
;
951 /* if no requests are pending, we have no work */
955 if ((evcon
= evrpc_pool_find_connection(pool
)) != NULL
) {
956 TAILQ_REMOVE(&pool
->requests
, ctx
, next
);
957 evrpc_schedule_request(evcon
, ctx
);
962 evrpc_request_timeout(evutil_socket_t fd
, short what
, void *arg
)
964 struct evrpc_request_wrapper
*ctx
= arg
;
965 struct evhttp_connection
*evcon
= ctx
->evcon
;
966 EVUTIL_ASSERT(evcon
!= NULL
);
968 evhttp_connection_fail(evcon
, EVCON_HTTP_TIMEOUT
);
972 * frees potential meta data associated with a request.
976 evrpc_meta_data_free(struct evrpc_meta_list
*meta_data
)
978 struct evrpc_meta
*entry
;
979 EVUTIL_ASSERT(meta_data
!= NULL
);
981 while ((entry
= TAILQ_FIRST(meta_data
)) != NULL
) {
982 TAILQ_REMOVE(meta_data
, entry
, next
);
984 mm_free(entry
->data
);
989 static struct evrpc_hook_meta
*
990 evrpc_hook_meta_new(void)
992 struct evrpc_hook_meta
*ctx
;
993 ctx
= mm_malloc(sizeof(struct evrpc_hook_meta
));
994 EVUTIL_ASSERT(ctx
!= NULL
);
996 TAILQ_INIT(&ctx
->meta_data
);
1003 evrpc_hook_associate_meta(struct evrpc_hook_meta
**pctx
,
1004 struct evhttp_connection
*evcon
)
1006 struct evrpc_hook_meta
*ctx
= *pctx
;
1008 *pctx
= ctx
= evrpc_hook_meta_new();
1013 evrpc_hook_context_free(struct evrpc_hook_meta
*ctx
)
1015 evrpc_meta_data_free(&ctx
->meta_data
);
1019 /* Adds meta data */
1021 evrpc_hook_add_meta(void *ctx
, const char *key
,
1022 const void *data
, size_t data_size
)
1024 struct evrpc_request_wrapper
*req
= ctx
;
1025 struct evrpc_hook_meta
*store
= NULL
;
1026 struct evrpc_meta
*meta
= NULL
;
1028 if ((store
= req
->hook_meta
) == NULL
)
1029 store
= req
->hook_meta
= evrpc_hook_meta_new();
1031 meta
= mm_malloc(sizeof(struct evrpc_meta
));
1032 EVUTIL_ASSERT(meta
!= NULL
);
1033 meta
->key
= mm_strdup(key
);
1034 EVUTIL_ASSERT(meta
->key
!= NULL
);
1035 meta
->data_size
= data_size
;
1036 meta
->data
= mm_malloc(data_size
);
1037 EVUTIL_ASSERT(meta
->data
!= NULL
);
1038 memcpy(meta
->data
, data
, data_size
);
1040 TAILQ_INSERT_TAIL(&store
->meta_data
, meta
, next
);
1044 evrpc_hook_find_meta(void *ctx
, const char *key
, void **data
, size_t *data_size
)
1046 struct evrpc_request_wrapper
*req
= ctx
;
1047 struct evrpc_meta
*meta
= NULL
;
1049 if (req
->hook_meta
== NULL
)
1052 TAILQ_FOREACH(meta
, &req
->hook_meta
->meta_data
, next
) {
1053 if (strcmp(meta
->key
, key
) == 0) {
1055 *data_size
= meta
->data_size
;
1063 struct evhttp_connection
*
1064 evrpc_hook_get_connection(void *ctx
)
1066 struct evrpc_request_wrapper
*req
= ctx
;
1067 return (req
->hook_meta
!= NULL
? req
->hook_meta
->evcon
: NULL
);
1071 evrpc_send_request_generic(struct evrpc_pool
*pool
,
1072 void *request
, void *reply
,
1073 void (*cb
)(struct evrpc_status
*, void *, void *, void *),
1075 const char *rpcname
,
1076 void (*req_marshal
)(struct evbuffer
*, void *),
1077 void (*rpl_clear
)(void *),
1078 int (*rpl_unmarshal
)(void *, struct evbuffer
*))
1080 struct evrpc_status status
;
1081 struct evrpc_request_wrapper
*ctx
;
1082 ctx
= evrpc_make_request_ctx(pool
, request
, reply
,
1083 rpcname
, req_marshal
, rpl_clear
, rpl_unmarshal
, cb
, cb_arg
);
1086 return (evrpc_make_request(ctx
));
1088 memset(&status
, 0, sizeof(status
));
1089 status
.error
= EVRPC_STATUS_ERR_UNSTARTED
;
1090 (*(cb
))(&status
, request
, reply
, cb_arg
);
1094 /** Takes a request object and fills it in with the right magic */
1095 static struct evrpc
*
1096 evrpc_register_object(const char *name
,
1097 void *(*req_new
)(void*), void *req_new_arg
, void (*req_free
)(void *),
1098 int (*req_unmarshal
)(void *, struct evbuffer
*),
1099 void *(*rpl_new
)(void*), void *rpl_new_arg
, void (*rpl_free
)(void *),
1100 int (*rpl_complete
)(void *),
1101 void (*rpl_marshal
)(struct evbuffer
*, void *))
1103 struct evrpc
* rpc
= (struct evrpc
*)mm_calloc(1, sizeof(struct evrpc
));
1106 rpc
->uri
= mm_strdup(name
);
1107 if (rpc
->uri
== NULL
) {
1111 rpc
->request_new
= req_new
;
1112 rpc
->request_new_arg
= req_new_arg
;
1113 rpc
->request_free
= req_free
;
1114 rpc
->request_unmarshal
= req_unmarshal
;
1115 rpc
->reply_new
= rpl_new
;
1116 rpc
->reply_new_arg
= rpl_new_arg
;
1117 rpc
->reply_free
= rpl_free
;
1118 rpc
->reply_complete
= rpl_complete
;
1119 rpc
->reply_marshal
= rpl_marshal
;
1124 evrpc_register_generic(struct evrpc_base
*base
, const char *name
,
1125 void (*callback
)(struct evrpc_req_generic
*, void *), void *cbarg
,
1126 void *(*req_new
)(void *), void *req_new_arg
, void (*req_free
)(void *),
1127 int (*req_unmarshal
)(void *, struct evbuffer
*),
1128 void *(*rpl_new
)(void *), void *rpl_new_arg
, void (*rpl_free
)(void *),
1129 int (*rpl_complete
)(void *),
1130 void (*rpl_marshal
)(struct evbuffer
*, void *))
1133 evrpc_register_object(name
, req_new
, req_new_arg
, req_free
, req_unmarshal
,
1134 rpl_new
, rpl_new_arg
, rpl_free
, rpl_complete
, rpl_marshal
);
1137 evrpc_register_rpc(base
, rpc
,
1138 (void (*)(struct evrpc_req_generic
*, void *))callback
, cbarg
);
1142 /** accessors for obscure and undocumented functionality */
1144 evrpc_request_get_pool(struct evrpc_request_wrapper
*ctx
)
1150 evrpc_request_set_pool(struct evrpc_request_wrapper
*ctx
,
1151 struct evrpc_pool
*pool
)
1157 evrpc_request_set_cb(struct evrpc_request_wrapper
*ctx
,
1158 void (*cb
)(struct evrpc_status
*, void *request
, void *reply
, void *arg
),
1162 ctx
->cb_arg
= cb_arg
;