1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
44 #include "state_defs.h"
46 /* Define-away for now. Eventually, define to gettext. */
47 #define _(msgid) (msgid)
50 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
51 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
53 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
56 extern backend_func_tbl bad_func_tbl
;
57 extern backend_func_tbl s3_func_tbl
;
58 extern backend_func_tbl curl_func_tbl
;
59 extern backend_func_tbl fs_func_tbl
;
62 URL_ROOT
=0, URL_BUCKET
, URL_OBJECT
, URL_ATTR
, URL_INVAL
,
63 URL_QUERY
, URL_PROVLIST
69 MHD_AccessHandlerCallback handler
;
72 static int fs_mode
= 0;
73 static unsigned short my_port
= MY_PORT
;
74 const char *program_name
;
76 static char *(reserved_name
[]) = { "_default", "_query", "_new", NULL
};
77 static char *(reserved_attr
[]) = { "bucket", "key", "date", "etag", "loc", NULL
};
79 static backend_func_tbl
*main_func_tbl
= &bad_func_tbl
;
82 free_ms (my_state
*ms
)
84 if (ms
->cleanup
& CLEANUP_CURL
) {
85 curl_easy_cleanup(ms
->curl
);
88 if (ms
->cleanup
& CLEANUP_BUF_PTR
) {
89 free(ms
->pipe
.data_ptr
);
92 if (ms
->cleanup
& CLEANUP_POST
) {
93 MHD_destroy_post_processor(ms
->post
);
96 if (ms
->cleanup
& CLEANUP_DICT
) {
97 g_hash_table_destroy(ms
->dict
);
100 if (ms
->cleanup
& CLEANUP_QUERY
) {
101 meta_query_stop(ms
->query
);
104 if (ms
->cleanup
& CLEANUP_TMPL
) {
108 if (ms
->cleanup
& CLEANUP_URL
) {
116 validate_put (struct MHD_Connection
*conn
)
121 /* We're not a slave, so we don't care. */
125 mhdr
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
127 return (mhdr
&& !strcmp(mhdr
,"master"));
131 is_reserved (char *cand
, char **resv_list
)
135 for (i
= 0; resv_list
[i
]; ++i
) {
136 if (!strcmp(cand
,resv_list
[i
])) {
145 validate_url (const char *url
)
147 char *slash
= rindex(url
,'/');
150 /* There should be at least one betwixt bucket and key. */
154 return !is_reserved(slash
+1,reserved_name
);
158 * The proxy has MHD on one side and CURL on the other. The CURL side is
159 * always run in a child thread. Yes, there are both context switches
160 * and copies between the threads. Get over it. The focus here is on
161 * supporting multi-way replication on PUT, with minimal complexity. These
162 * were the two easiest libraries to use, and they both want to allocate
163 * their own buffers so we're kind of stuck with the copies unless we want
164 * to buffer whole files in memory (which would have to be virtual since
165 * they're potentialy bigger than physical) or explicitly ping them through
166 * a local filesystem. We could potentially take over scheduling from one
167 * or both to avoid some of the context switching, but those interfaces are
168 * significantly more error-prone and (at least in CURL's case) buggy.
170 * For a GET, the CURL child acts as a producer while the MHD parent acts
171 * as consumer. For a PUT, the MHD parent is the producer and the CURL
172 * child is the consumer. For GET the MHD component is invoked via a
173 * callback set up in the access handler; for PUT it's invoked via repeated
174 * calls to the access handler itself. Either way, the producer's job is
175 * to post its pointer+length to the my_state structure and then wait for
176 * all consumers to check back in saying they've finished it. This might
177 * involve multiple passes through each consumer for one pass through the
178 * single producer. When the producer is done, it does a similar handshake
179 * with the consumers. Each consumer has its own pipe_private structure,
180 * containing a pointer to the shared my_state plus a per-consumer offset
181 * into the current chunk.
183 * Attribute functions don't use CURL, so they do much simpler in-memory
184 * buffering. Queries also don't use CURL, but the MHD POST interface
185 * introduces some of its own complexity so see below for that.
189 simple_closer (void *ctx
)
193 DPRINTF("%s: cleaning up\n",__func__
);
198 child_closer (void * ctx
)
200 pipe_private
*pp
= ctx
;
202 DPRINTF("in %s\n",__func__
);
207 /* Invoked from MHD. */
209 proxy_get_cons (void *ctx
, uint64_t pos
, char *buf
, int max
)
211 pipe_private
*pp
= ctx
;
212 pipe_shared
*ps
= pp
->shared
;
213 my_state
*ms
= ps
->owner
;
219 DPRINTF("consumer asked to read %d\n",max
);
221 if (pipe_cons_wait(pp
)) {
222 DPRINTF("consumer offset %zu into %zu\n",
223 pp
->offset
, ps
->data_len
);
224 done
= ps
->data_len
- pp
->offset
;
228 memcpy(buf
,ps
->data_ptr
+pp
->offset
,done
);
230 DPRINTF("consumer copied %d, new offset %zu\n",
232 if (pp
->offset
== ps
->data_len
) {
233 DPRINTF("consumer finished chunk\n");
234 pipe_cons_signal(pp
, 0);
243 pthread_join(ms
->backend_th
,&child_res
);
244 if (child_res
== THREAD_FAILED
) {
245 ms
->rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
247 if (ms
->from_master
) {
248 pthread_join(ms
->cache_th
,NULL
);
249 /* TBD: do something about cache failure? */
258 proxy_get_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
259 const char *method
, const char *version
, const char *data
,
260 size_t *data_size
, void **rctx
)
262 struct MHD_Response
*resp
;
263 my_state
*ms
= *rctx
;
267 const char *user_etag
;
275 DPRINTF("PROXY GET DATA %s\n",url
);
277 ms
->url
= strdup(url
);
281 ms
->cleanup
|= CLEANUP_URL
;
283 my_etag
= meta_has_copy(ms
->bucket
,ms
->key
,me
);
285 user_etag
= MHD_lookup_connection_value(
286 conn
, MHD_HEADER_KIND
, "If-None-Match");
287 if (user_etag
&& !strcmp(user_etag
,my_etag
)) {
288 DPRINTF("ETag match!\n");
290 resp
= MHD_create_response_from_data(0,NULL
,
292 MHD_queue_response(conn
,MHD_HTTP_NOT_MODIFIED
,resp
);
293 MHD_destroy_response(resp
);
300 DPRINTF("%s/%s not found locally\n",ms
->bucket
,ms
->key
);
302 DPRINTF(" that means it doesn't exist\n");
303 resp
= MHD_create_response_from_data(0,NULL
,
305 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
306 MHD_destroy_response(resp
);
310 DPRINTF(" will fetch from %s:%u\n", master_host
,master_port
);
314 pipe_init_shared(&ms
->pipe
,ms
,ms
->from_master
+1);
315 pp
= pipe_init_private(&ms
->pipe
);
319 /* Master is always assumed to be CURL (i.e. our own protocol) */
320 if (ms
->from_master
) {
321 pthread_create(&ms
->backend_th
,NULL
,
322 curl_func_tbl
.get_child_func
,ms
);
325 pthread_create(&ms
->backend_th
,NULL
,
326 main_func_tbl
->get_child_func
,ms
);
328 /* TBD: check return value */
330 if (ms
->from_master
) {
331 pp2
= pipe_init_private(&ms
->pipe
);
335 pthread_create(&ms
->cache_th
,NULL
,
336 main_func_tbl
->cache_child_func
,pp2
);
337 /* TBD: check return value */
343 resp
= MHD_create_response_from_callback(
344 MHD_SIZE_UNKNOWN
, 65536, proxy_get_cons
, pp
, child_closer
);
346 fprintf(stderr
,"MHD_crfc failed\n");
348 /* TBD: terminate thread */
354 MHD_queue_response(conn
,ms
->rc
,resp
);
355 MHD_destroy_response(resp
);
361 recheck_replication (my_state
* ms
, char *policy
)
365 char fixed
[MAX_FIELD_LEN
];
367 if (is_reserved(ms
->key
,reserved_name
)) {
368 DPRINTF("declining to replicate reserved object %s\n",ms
->key
);
372 if (!policy
&& ms
->dict
) {
373 DPRINTF("using new policy for %s/%s\n",ms
->bucket
,ms
->key
);
374 policy
= g_hash_table_lookup(ms
->dict
,"_policy");
378 /* If we get a policy here or below, we have to free it. */
380 DPRINTF("fetching policy for %s/%s\n",ms
->bucket
,ms
->key
);
381 rc
= meta_get_value(ms
->bucket
,ms
->key
, "_policy", &policy
);
385 DPRINTF(" inheriting policy from %s\n",ms
->bucket
);
386 rc
= meta_get_value(ms
->bucket
,
387 "_default", "_policy", &policy
);
391 DPRINTF(" implementing policy %s\n",policy
);
393 * Can't use ms->url here because it might be a bucket POST
394 * and in that case ms->url points to the bucket.
396 snprintf(fixed
,sizeof(fixed
),"%s/%s",ms
->bucket
,ms
->key
);
397 replicate(fixed
,0,policy
);
403 DPRINTF(" could not find a policy anywhere!\n");
408 proxy_put_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
409 const char *method
, const char *version
, const char *data
,
410 size_t *data_size
, void **rctx
)
412 struct MHD_Response
*resp
;
413 my_state
*ms
= *rctx
;
423 DPRINTF("PROXY PUT DATA %s (%zu)\n",url
,*data_size
);
425 if (ms
->state
== MS_NEW
) {
426 if (!validate_put(conn
) || !validate_url(url
)) {
427 DPRINTF("rejecting %s\n",url
);
428 resp
= MHD_create_response_from_data(0,NULL
,
433 MHD_queue_response(conn
,MHD_HTTP_FORBIDDEN
,resp
);
434 MHD_destroy_response(resp
);
437 ms
->state
= MS_NORMAL
;
438 ms
->url
= strdup(url
);
442 ms
->cleanup
|= CLEANUP_URL
;
444 pipe_init_shared(&ms
->pipe
,ms
,1);
445 pp
= pipe_init_private(&ms
->pipe
);
449 pthread_create(&ms
->backend_th
,NULL
,
450 main_func_tbl
->put_child_func
,pp
);
451 /* TBD: check return value */
454 * Do the initial handshake with children. If we return from
455 * this callback without an error response, Microhttpd posts
456 * the "100 Continue" header and the client starts sending
457 * the data. We must report errors here or forever keep
460 rc
= pipe_prod_wait_init(&ms
->pipe
);
462 DPRINTF("producer wait failed\n");
463 resp
= MHD_create_response_from_data(0,NULL
,
468 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
470 MHD_destroy_response(resp
);
473 * Note that we fail here even if 1 of N replicas fail.
474 * Might want to fix this when we start looping over
475 * pipe_init_private() above.
477 DPRINTF("producer replicas failed (%u of %u)\n",
478 rc
, ms
->pipe
.cons_total
);
479 resp
= MHD_create_response_from_data(0,NULL
,
484 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
486 MHD_destroy_response(resp
);
488 DPRINTF("producer proceeding\n");
491 else if (*data_size
) {
492 pipe_prod_signal(&ms
->pipe
,(void *)data
,*data_size
);
493 ms
->size
+= *data_size
;
494 DPRINTF("producer chunk finished\n");
498 pipe_prod_finish(&ms
->pipe
);
499 pthread_join(ms
->backend_th
,&child_res
);
500 if (child_res
== THREAD_FAILED
) {
501 DPRINTF("thread failed\n");
502 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
504 else if (ms
->pipe
.cons_error
== ms
->pipe
.cons_total
) {
505 DPRINTF("all %u consumers failed\n",
506 ms
->pipe
.cons_error
);
507 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
511 meta_got_copy(ms
->bucket
,ms
->key
,me
);
515 etag
= meta_did_put(ms
->bucket
,ms
->key
,me
,
518 DPRINTF("rereplicate (obj PUT)\n");
519 recheck_replication(ms
,NULL
);
523 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
529 MHD_add_response_header(resp
,"ETag",etag
);
532 MHD_queue_response(conn
,rc
,resp
);
533 MHD_destroy_response(resp
);
540 proxy_get_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
541 const char *method
, const char *version
, const char *data
,
542 size_t *data_size
, void **rctx
)
544 struct MHD_Response
*resp
;
546 my_state
*ms
= *rctx
;
547 int rc
= MHD_HTTP_NOT_FOUND
;
555 DPRINTF("PROXY GET ATTR %s\n",url
);
557 if (meta_get_value(ms
->bucket
,ms
->key
,ms
->attr
,&fixed
) == 0) {
558 resp
= MHD_create_response_from_data(strlen(fixed
),fixed
,
563 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
568 MHD_queue_response(conn
,rc
,resp
);
569 MHD_destroy_response(resp
);
576 proxy_put_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
577 const char *method
, const char *version
, const char *data
,
578 size_t *data_size
, void **rctx
)
580 struct MHD_Response
*resp
;
581 my_state
*ms
= *rctx
;
589 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url
,*data_size
);
591 if (ms
->state
== MS_NEW
) {
592 ms
->state
= MS_NORMAL
;
593 ms
->url
= strdup(url
);
597 ms
->cleanup
|= CLEANUP_URL
;
598 attrval
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
601 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,
606 else if (*data_size
) {
607 if (ms
->pipe
.data_len
) {
608 ms
->pipe
.data_len
+= *data_size
;
609 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
613 ms
->pipe
.data_ptr
= p
;
616 ms
->pipe
.data_len
= *data_size
+ 1;
617 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
618 if (!ms
->pipe
.data_ptr
) {
621 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
622 ms
->cleanup
|= CLEANUP_BUF_PTR
;
624 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
625 /* TBD: check return value */
629 if (!ms
->pipe
.data_ptr
) {
632 if (is_reserved(ms
->attr
,reserved_attr
)) {
633 resp
= MHD_create_response_from_data(
634 0,NULL
,MHD_NO
,MHD_NO
);
638 MHD_queue_response(conn
,MHD_HTTP_BAD_REQUEST
,
640 MHD_destroy_response(resp
);
644 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,ms
->pipe
.data_ptr
);
646 * We should always re-replicate, because the replication
647 * policy might refer to this attr.
649 DPRINTF("rereplicate (attr PUT)\n");
650 recheck_replication(ms
,NULL
);
656 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
660 MHD_queue_response(conn
,MHD_HTTP_CREATED
,resp
);
661 MHD_destroy_response(resp
);
663 * TBD: check if the attribute was a replication policy, and
664 * start/stop replication activities as appropriate.
672 * For queries, we have to deal with MHD's post-iterator interface (not
673 * quite the same as the data-iteration even though we use it that way) on
674 * one side, and a query-iterator interface on the other. Data on both
675 * sides could be quite large, so we can't just stick them in header lines.
676 * We do still buffer the query in memory, though. Once that's done, we do
677 * very simple parsing - it will be more complicated later - and create the
678 * query iterator. That's also driven by MHD, this time though the
679 * content-callback interface, and repeatedly calls in to the metadata
680 * module to fetch one object name at a time.
684 query_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
685 const char *filename
, const char *content_type
,
686 const char *transfer_encoding
, const char *data
,
687 uint64_t off
, size_t size
)
694 (void)transfer_encoding
;
699 /* We actually accumulate the data in proxy_query. */
703 /* MHD reader function during queries. Return -1 for EOF. */
705 proxy_query_func (void *ctx
, uint64_t pos
, char *buf
, int max
)
709 const char *accept_hdr
;
715 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
719 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
723 ms
->cleanup
|= CLEANUP_TMPL
;
724 len
= tmpl_obj_header(ms
->gen_ctx
);
731 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
735 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
740 if (!meta_query_next(ms
->query
,&bucket
,&key
)) {
743 if (is_reserved(key
,reserved_name
)) {
746 len
= tmpl_obj_entry(ms
->gen_ctx
,bucket
,key
);
753 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
757 len
= tmpl_obj_footer(ms
->gen_ctx
);
764 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
766 ms
->cleanup
&= ~CLEANUP_TMPL
;
767 ms
->gen_ctx
= TMPL_CTX_DONE
;
772 proxy_query (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
773 const char *method
, const char *version
, const char *data
,
774 size_t *data_size
, void **rctx
)
776 struct MHD_Response
*resp
;
777 my_state
*ms
= *rctx
;
783 DPRINTF("PROXY QUERY %s (%zu)\n",url
,*data_size
);
785 if (ms
->state
== MS_NEW
) {
786 ms
->state
= MS_NORMAL
;
787 ms
->post
= MHD_create_post_processor(conn
,4096,
789 ms
->cleanup
|= CLEANUP_POST
;
791 else if (*data_size
) {
792 MHD_post_process(ms
->post
,data
,*data_size
);
793 if (ms
->pipe
.data_len
) {
794 ms
->pipe
.data_len
+= *data_size
;
795 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
799 ms
->pipe
.data_ptr
= p
;
802 ms
->pipe
.data_len
= *data_size
+ 1;
803 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
804 if (!ms
->pipe
.data_ptr
) {
807 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
808 ms
->cleanup
|= CLEANUP_BUF_PTR
;
810 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
811 /* TBD: check return value */
815 if (!ms
->pipe
.data_ptr
) {
818 ms
->query
= meta_query_new(ms
->bucket
,NULL
,ms
->pipe
.data_ptr
);
819 ms
->cleanup
|= CLEANUP_QUERY
;
820 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
821 65536, proxy_query_func
, ms
, simple_closer
);
823 fprintf(stderr
,"MHD_crfc failed\n");
827 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
828 MHD_destroy_response(resp
);
836 proxy_list_objs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
837 const char *method
, const char *version
, const char *data
,
838 size_t *data_size
, void **rctx
)
840 my_state
*ms
= *rctx
;
841 struct MHD_Response
*resp
;
850 ms
->query
= meta_query_new((char *)ms
->bucket
,NULL
,NULL
);
851 ms
->cleanup
|= CLEANUP_QUERY
;
853 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
854 65536, proxy_query_func
, ms
, simple_closer
);
856 fprintf(stderr
,"MHD_crfc failed\n");
861 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
862 MHD_destroy_response(resp
);
867 proxy_delete (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
868 const char *method
, const char *version
, const char *data
,
869 size_t *data_size
, void **rctx
)
871 my_state
*ms
= *rctx
;
872 struct MHD_Response
*resp
;
885 DPRINTF("PROXY DELETE %s\n",url
);
887 rc
= main_func_tbl
->delete_func(ms
->bucket
,ms
->key
,url
);
888 if (rc
== MHD_HTTP_OK
) {
889 copied_url
= strdup(url
);
891 bucket
= strtok_r(copied_url
,"/",&stctx
);
892 key
= strtok_r(NULL
,"/",&stctx
);
893 meta_delete(bucket
,key
);
895 replicate_delete(url
);
898 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
902 MHD_queue_response(conn
,rc
,resp
);
903 MHD_destroy_response(resp
);
909 /* TBD: get actual bucket list */
915 static const fake_bucket_t fake_bucket_list
[] = {
916 { "bucket_factory", "_new" },
917 { "provider_list", "_providers" },
921 root_blob_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
924 const fake_bucket_t
*fb
;
926 const char *accept_hdr
;
933 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
935 host
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,"Host");
938 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
942 ms
->cleanup
|= CLEANUP_TMPL
;
943 ms
->gen_ctx
->base
= host
;
944 len
= tmpl_root_header(ms
->gen_ctx
,"image_warehouse","1.0");
951 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
955 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
959 if (ms
->gen_ctx
->index
< ARRAY_SIZE(fake_bucket_list
)) {
960 fb
= fake_bucket_list
+ ms
->gen_ctx
->index
;
961 len
= tmpl_root_entry(ms
->gen_ctx
,fb
->rel
,fb
->link
);
968 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
972 if (meta_query_next(ms
->query
,&bucket
,&key
)) {
973 len
= tmpl_root_entry(ms
->gen_ctx
,"bucket",bucket
);
980 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
984 len
= tmpl_root_footer(ms
->gen_ctx
);
991 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
993 ms
->cleanup
&= ~CLEANUP_TMPL
;
994 ms
->gen_ctx
= TMPL_CTX_DONE
;
999 proxy_api_root (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1000 const char *method
, const char *version
, const char *data
,
1001 size_t *data_size
, void **rctx
)
1003 struct MHD_Response
*resp
= NULL
;
1004 unsigned int rc
= MHD_HTTP_OK
;
1005 my_state
*ms
= *rctx
;
1012 DPRINTF("PROXY API ROOT (%s, %zu)\n",url
,*data_size
);
1014 ms
->query
= meta_query_new(NULL
,"_default",NULL
);
1019 ms
->cleanup
|= CLEANUP_QUERY
;
1021 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1022 65536, root_blob_generator
, ms
, simple_closer
);
1026 MHD_queue_response(conn
,rc
,resp
);
1027 MHD_destroy_response(resp
);
1034 post_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1035 const char *filename
, const char *content_type
,
1036 const char *transfer_encoding
, const char *data
,
1037 uint64_t off
, size_t size
)
1046 (void)transfer_encoding
;
1049 printf("adding %s, size=%zu\n",key
,size
);
1051 // TBD: don't assume that values are null-terminated strings
1052 old_val
= g_hash_table_lookup(ctx
,key
);
1054 old_len
= strlen(old_val
);
1055 new_val
= malloc(old_len
+size
+1);
1059 memcpy(new_val
,old_val
,old_len
);
1060 memcpy(new_val
+old_len
,data
,size
);
1061 new_val
[old_len
+size
] = '\0';
1064 new_val
= malloc(size
+1);
1068 memcpy(new_val
,data
,size
);
1069 new_val
[size
] = '\0';
1072 g_hash_table_insert(ctx
,strdup(key
),new_val
);
1073 /* TBD: check return value for strdups (none avail for insert) */
1077 /* Returns TRUE if we found an *invalid* key. */
1079 post_find (gpointer key
, gpointer value
, gpointer ctx
)
1084 if (!is_reserved(key
,reserved_attr
)) {
1088 DPRINTF("bad attr %s\n", (char *)key
);
1093 post_foreach (gpointer key
, gpointer value
, gpointer ctx
)
1097 DPRINTF("setting %s = %s for %s/%s\n",(char *)key
, (char *)value
,
1098 ms
->bucket
,ms
->key
);
1099 meta_set_value(ms
->bucket
,ms
->key
,key
,value
);
1103 create_bucket (char *name
)
1107 if (is_reserved(name
,reserved_name
)) {
1108 return MHD_HTTP_BAD_REQUEST
;
1111 rc
= main_func_tbl
->bcreate_func(name
);
1112 if (rc
== MHD_HTTP_OK
) {
1113 if (meta_set_value(name
,"_default", "_policy","0") != 0) {
1114 DPRINTF("default-policy " "create failed\n");
1117 DPRINTF("created bucket %s\n",name
);
1119 * There's not a whole lot to do about bucket-creation
1120 * failures on replicas, other than to report them, unless
1121 * we adopt an "all or nothing" approach and unwind the
1122 * create on the primary as well. Then what if that fails?
1123 * It's just one example of the general "fewer replicas
1124 * than desired" distributed-system problem, not worth a
1125 * point solution here/now. Revisit when we have a more
1126 * general replica-repair policy/system in place.
1128 replicate_bcreate(name
);
1135 proxy_bucket_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1136 const char *method
, const char *version
, const char *data
,
1137 size_t *data_size
, void **rctx
)
1139 struct MHD_Response
*resp
;
1140 my_state
*ms
= *rctx
;
1148 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1150 if (ms
->state
== MS_NEW
) {
1151 ms
->state
= MS_NORMAL
;
1152 ms
->url
= (char *)url
;
1153 ms
->dict
= g_hash_table_new_full(
1154 g_str_hash
,g_str_equal
,free
,free
);
1155 ms
->cleanup
|= CLEANUP_DICT
;
1156 ms
->post
= MHD_create_post_processor(conn
,4096,
1157 post_iterator
,ms
->dict
);
1158 ms
->cleanup
|= CLEANUP_POST
;
1160 else if (*data_size
) {
1161 MHD_post_process(ms
->post
,data
,*data_size
);
1165 rc
= MHD_HTTP_BAD_REQUEST
;
1166 key
= g_hash_table_lookup(ms
->dict
,"key");
1168 strncpy(ms
->key
,key
,MAX_FIELD_LEN
-1);
1169 g_hash_table_remove(ms
->dict
,"key");
1170 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1171 g_hash_table_foreach(ms
->dict
,post_foreach
,ms
);
1172 DPRINTF("rereplicate (bucket POST)\n");
1173 recheck_replication(ms
,NULL
);
1177 else if (!strcmp(ms
->bucket
,"_new")) {
1178 key
= g_hash_table_lookup(ms
->dict
,"name");
1180 rc
= create_bucket(key
);
1184 DPRINTF("A parameter is MISSING (fail)\n");
1186 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1188 fprintf(stderr
,"MHD_crfd failed\n");
1191 MHD_queue_response(conn
,rc
,resp
);
1192 MHD_destroy_response(resp
);
1200 check_location (my_state
*ms
)
1202 char *loc
= g_hash_table_lookup(ms
->dict
,"depot");
1205 DPRINTF("missing loc on check for %s/%s\n",ms
->bucket
,ms
->key
);
1206 return MHD_HTTP_BAD_REQUEST
;
1209 if (!meta_has_copy(ms
->bucket
,ms
->key
,loc
)) {
1210 DPRINTF("did not find %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1211 return MHD_HTTP_NOT_FOUND
;
1214 /* TBD: meta_has_copy returns an etag which we should check */
1215 DPRINTF("found %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1220 register_image (my_state
*ms
)
1227 site
= g_hash_table_lookup(ms
->dict
,"site");
1229 printf("site MISSING\n");
1230 return MHD_HTTP_BAD_REQUEST
;
1233 next
= index(site
,':');
1238 for (i
= 0; get_provider(i
,&prov
); ++i
) {
1239 if (strcmp(prov
.name
,site
)) {
1242 return prov
.func_tbl
->register_func(ms
,&prov
,next
,ms
->dict
);
1245 DPRINTF("site %s not found\n",site
);
1246 return MHD_HTTP_BAD_REQUEST
;
1250 proxy_object_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1251 const char *method
, const char *version
, const char *data
,
1252 size_t *data_size
, void **rctx
)
1254 struct MHD_Response
*resp
;
1255 my_state
*ms
= *rctx
;
1263 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1265 if (ms
->state
== MS_NEW
) {
1266 ms
->state
= MS_NORMAL
;
1267 ms
->url
= (char *)url
;
1268 ms
->dict
= g_hash_table_new_full(
1269 g_str_hash
,g_str_equal
,free
,free
);
1270 ms
->cleanup
|= CLEANUP_DICT
;
1271 ms
->post
= MHD_create_post_processor(conn
,4096,
1272 post_iterator
,ms
->dict
);
1273 ms
->cleanup
|= CLEANUP_POST
;
1275 else if (*data_size
) {
1276 MHD_post_process(ms
->post
,data
,*data_size
);
1280 rc
= MHD_HTTP_BAD_REQUEST
;
1281 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1282 op
= g_hash_table_lookup(ms
->dict
,"op");
1284 if (!strcmp(op
,"push")) {
1285 DPRINTF("rereplicate (obj POST)\n");
1286 recheck_replication(ms
,NULL
);
1289 else if (!strcmp(op
,"check")) {
1290 rc
= check_location(ms
);
1292 else if (!strcmp(op
,"register")) {
1293 rc
= register_image(ms
);
1296 DPRINTF("unknown op %s for %s/%s\n",
1297 op
, ms
->bucket
, ms
->key
);
1301 DPRINTF("op is MISSING (fail)\n");
1304 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1306 fprintf(stderr
,"MHD_crfd failed\n");
1309 MHD_queue_response(conn
,rc
,resp
);
1310 MHD_destroy_response(resp
);
1320 prov_list_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
1325 const char *accept_hdr
;
1329 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
1333 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
1337 ms
->cleanup
|= CLEANUP_TMPL
;
1338 len
= tmpl_prov_header(ms
->gen_ctx
);
1345 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1349 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
1353 if (get_provider(ms
->gen_ctx
->index
,&prov
)) {
1354 len
= tmpl_prov_entry(ms
->gen_ctx
,prov
.name
,prov
.type
,
1355 prov
.host
, prov
.port
, prov
.username
, prov
.password
);
1362 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1366 len
= tmpl_prov_footer(ms
->gen_ctx
);
1373 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1375 ms
->cleanup
&= ~CLEANUP_TMPL
;
1376 ms
->gen_ctx
= TMPL_CTX_DONE
;
1381 proxy_list_provs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1382 const char *method
, const char *version
, const char *data
,
1383 size_t *data_size
, void **rctx
)
1385 struct MHD_Response
*resp
;
1386 my_state
*ms
= *rctx
;
1395 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1396 65536, prov_list_generator
, ms
, simple_closer
);
1398 fprintf(stderr
,"MHD_crfd failed\n");
1402 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
1403 MHD_destroy_response(resp
);
1409 prov_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1410 const char *filename
, const char *content_type
,
1411 const char *transfer_encoding
, const char *data
,
1412 uint64_t off
, size_t size
)
1417 (void)transfer_encoding
;
1420 g_hash_table_insert(ctx
,strdup(key
),strndup(data
,size
));
1421 /* TBD: check return value for strdups (none avail for insert) */
1427 proxy_update_prov (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1428 const char *method
, const char *version
, const char *data
,
1429 size_t *data_size
, void **rctx
)
1431 struct MHD_Response
*resp
;
1432 my_state
*ms
= *rctx
;
1442 if (ms
->state
== MS_NEW
) {
1443 ms
->state
= MS_NORMAL
;
1444 ms
->url
= (char *)url
;
1445 ms
->dict
= g_hash_table_new_full(
1446 g_str_hash
,g_str_equal
,free
,free
);
1447 ms
->cleanup
|= CLEANUP_DICT
;
1448 ms
->post
= MHD_create_post_processor(conn
,4096,
1449 prov_iterator
,ms
->dict
);
1450 ms
->cleanup
|= CLEANUP_POST
;
1452 else if (*data_size
) {
1453 MHD_post_process(ms
->post
,data
,*data_size
);
1457 rc
= MHD_HTTP_BAD_REQUEST
;
1458 provider
= g_hash_table_lookup(ms
->dict
,"provider");
1459 username
= g_hash_table_lookup(ms
->dict
,"username");
1460 password
= g_hash_table_lookup(ms
->dict
,"password");
1461 if (provider
&& username
&& password
) {
1462 update_provider(provider
,username
,password
);
1466 DPRINTF("provider/username/password MISSING\n");
1468 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1470 fprintf(stderr
,"MHD_crfd failed\n");
1473 MHD_queue_response(conn
,rc
,resp
);
1474 MHD_destroy_response(resp
);
1482 proxy_create_bucket (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1483 const char *method
, const char *version
, const char *data
,
1484 size_t *data_size
, void **rctx
)
1486 struct MHD_Response
*resp
;
1487 my_state
*ms
= *rctx
;
1497 /* curl -T moo.empty http://localhost:9090/_new by accident */
1498 rc
= create_bucket(ms
->bucket
);
1500 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1502 fprintf(stderr
,"MHD_crfd failed\n");
1505 MHD_queue_response(conn
,rc
,resp
);
1506 MHD_destroy_response(resp
);
1512 static const rule my_rules
[] = {
1513 { /* get bucket list */
1514 "GET", URL_ROOT
, proxy_api_root
},
1515 { /* get object list */
1516 "GET", URL_BUCKET
, proxy_list_objs
},
1517 { /* create bucket */
1518 "PUT", URL_BUCKET
, proxy_create_bucket
},
1519 { /* get object data */
1520 "GET", URL_OBJECT
, proxy_get_data
},
1521 { /* get attribute data */
1522 "GET", URL_ATTR
, proxy_get_attr
},
1523 { /* put object data */
1524 "PUT", URL_OBJECT
, proxy_put_data
},
1525 { /* put attribute data */
1526 "PUT", URL_ATTR
, proxy_put_attr
},
1527 { /* create object and/or modify attributes */
1528 "POST", URL_BUCKET
, proxy_bucket_post
},
1529 { /* perform control operations on an object */
1530 "POST", URL_OBJECT
, proxy_object_post
},
1532 "POST", URL_QUERY
, proxy_query
},
1533 { /* delete object */
1534 "DELETE", URL_OBJECT
, proxy_delete
},
1535 { /* delete attribute (TBD) */
1536 "DELETE", URL_ATTR
, NULL
},
1537 { /* get provider list */
1538 "GET", URL_PROVLIST
, proxy_list_provs
},
1539 { /* update a provider */
1540 "POST", URL_PROVLIST
, proxy_update_prov
},
1545 parse_url (const char *url
, my_state
*ms
)
1547 unsigned short esize
;
1548 unsigned short eindex
;
1549 char *parts
[URL_INVAL
];
1551 if (strstr(url
,"../")) {
1552 /* Naughty, naughty. Never a good reason to allow this. */
1553 DPRINTF("Rejecting ../ in path.\n");
1558 parts
[URL_BUCKET
] = ms
->bucket
;
1559 parts
[URL_OBJECT
] = ms
->key
;
1560 parts
[URL_ATTR
] = ms
->attr
;
1563 while (*url
== '/') {
1568 if (eindex
== URL_BUCKET
) {
1569 if (!strcmp(ms
->bucket
,"_providers")) {
1570 eindex
= URL_PROVLIST
;
1573 else if (eindex
== URL_OBJECT
) {
1574 if (!strcmp(ms
->key
,"_query")) {
1581 if (++eindex
>= URL_INVAL
) {
1586 while (*url
&& (*url
!= '/')) {
1587 parts
[eindex
][esize
++] = *(url
++);
1588 if (esize
>= MAX_FIELD_LEN
) {
1598 access_handler (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1599 const char *method
, const char *version
, const char *data
,
1600 size_t *data_size
, void **rctx
)
1604 struct MHD_Response
*resp
;
1605 my_state
*ms
= *rctx
;
1608 return ms
->handler(cctx
,conn
,url
,method
,version
,
1609 data
,data_size
,rctx
);
1612 ms
= calloc(sizeof(*ms
), 1);
1617 utype
= parse_url(url
,ms
);
1619 for (i
= 0; my_rules
[i
].method
; ++i
) {
1620 if (utype
!= my_rules
[i
].utype
) {
1623 if (strcmp(method
,my_rules
[i
].method
)) {
1626 if (!my_rules
[i
].handler
) {
1629 ms
->handler
= my_rules
[i
].handler
;
1636 return ms
->handler(cctx
,conn
,url
,method
,version
,
1637 data
,data_size
,rctx
);
1640 if (!strcmp(method
,"QUIT")) {
1641 (void)sem_post((sem_t
*)cctx
);
1645 fprintf(stderr
,"bad request m=%s u=%s\n",method
,url
);
1648 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1652 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
1653 MHD_destroy_response(resp
);
1657 /* These enum values cannot possibly conflict with the option values
1658 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1659 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1662 GETOPT_HELP_CHAR
= (CHAR_MIN
- 2),
1663 GETOPT_VERSION_CHAR
= (CHAR_MIN
- 3)
1666 static const struct option my_options
[] = {
1667 { "config", required_argument
, NULL
, 'c' },
1668 { "db", required_argument
, NULL
, 'd' },
1669 { "master", required_argument
, NULL
, 'm' },
1670 { "port", required_argument
, NULL
, 'p' },
1671 { "verbose", no_argument
, NULL
, 'v' },
1672 { "version", no_argument
, NULL
, GETOPT_VERSION_CHAR
},
1673 { "help", no_argument
, NULL
, GETOPT_HELP_CHAR
},
1674 { NULL
, 0, NULL
, '\0' }
1680 if (status
!= EXIT_SUCCESS
)
1681 fprintf (stderr
, _("Try `%s --help' for more information.\n"),
1686 Usage: %s [OPTION]\n\
1690 Deltacloud image-warehouse daemon.\n\
1691 A configuration file must be specified.\n\
1693 -c, --config=FILE config file [required]\n\
1694 -d, --db=HOST_PORT database server as ip[:port]\n\
1695 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1696 -p, --port=PORT alternate listen port (default 9090)\n\
1697 -v, --verbose verbose/debug output\n\
1699 --help display this help and exit\n\
1700 --version output version information and exit\n\
1704 Report %s bugs to %s.\n\
1706 program_name
, PACKAGE_BUGREPORT
);
1712 main (int argc
, char **argv
)
1714 struct MHD_Daemon
*the_daemon
;
1719 program_name
= argv
[0];
1721 for (;;) switch (getopt_long(argc
,argv
,"c:d:m:p:v",my_options
,NULL
)) {
1727 db_host
= strtok_r(optarg
,":",&stctx
);
1728 port_tmp
= strtok_r(NULL
,":",&stctx
);
1730 db_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1735 master_host
= strtok_r(optarg
,":",&stctx
);
1736 port_tmp
= strtok_r(NULL
,":",&stctx
);
1738 master_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1742 my_port
= (unsigned short)strtoul(optarg
,NULL
,10);
1747 case GETOPT_HELP_CHAR
:
1748 usage(EXIT_SUCCESS
);
1750 case GETOPT_VERSION_CHAR
:
1751 printf ("%s version %s\n", program_name
, PACKAGE_VERSION
);
1752 exit (EXIT_SUCCESS
);
1758 usage(EXIT_FAILURE
);
1764 error (0, 0, "no configuration file specified");
1765 usage (EXIT_FAILURE
);
1768 me
= parse_config();
1770 fprintf(stderr
,"could not parse %s\n",cfg_file
);
1774 sem_init(&the_sem
,0,0);
1777 main_func_tbl
= &s3_func_tbl
;
1780 main_func_tbl
= &curl_func_tbl
;
1784 main_func_tbl
= &fs_func_tbl
;
1788 printf("primary store type is %s\n",main_func_tbl
->name
);
1790 printf("operating as slave to %s:%u\n",
1791 master_host
, master_port
);
1793 printf("db is at %s:%u\n",db_host
,db_port
);
1794 printf("will listen on port %u\n",my_port
);
1795 printf("my location is \"%s\"\n",me
);
1796 if (fflush(stdout
) || ferror(stdout
))
1797 error(EXIT_FAILURE
, 0, "write failed");
1801 main_func_tbl
->init_func();
1806 * Gotcha: if we don't set the connection memory limit explicitly,
1807 * the per-connection buffer for MHD will be smaller than that used
1808 * by CURL, so proxy_writefunc will never be able to do its job.
1810 the_daemon
= MHD_start_daemon(MY_MHD_FLAGS
,
1811 my_port
, NULL
, NULL
, &access_handler
, &the_sem
,
1812 MHD_OPTION_CONNECTION_MEMORY_LIMIT
, (size_t)1048576,
1815 fprintf(stderr
,"Could not create daemon.\n");