1 /* Copyright (C) 2010 Free Software Foundation, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
44 #include "state_defs.h"
46 /* Define-away for now. Eventually, define to gettext. */
47 #define _(msgid) (msgid)
50 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
51 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
53 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
56 extern backend_func_tbl bad_func_tbl
;
57 extern backend_func_tbl s3_func_tbl
;
58 extern backend_func_tbl curl_func_tbl
;
59 extern backend_func_tbl fs_func_tbl
;
62 URL_ROOT
=0, URL_BUCKET
, URL_OBJECT
, URL_ATTR
, URL_INVAL
,
63 URL_QUERY
, URL_PROVLIST
69 MHD_AccessHandlerCallback handler
;
73 unsigned short my_port
= MY_PORT
;
74 const char *program_name
;
76 char *(reserved_name
[]) = { "_default", "_query", "_new", NULL
};
77 char *(reserved_attr
[]) = { "bucket", "key", "date", "etag", "loc", NULL
};
79 backend_func_tbl
*main_func_tbl
= &bad_func_tbl
;
82 free_ms (my_state
*ms
)
84 if (ms
->cleanup
& CLEANUP_CURL
) {
85 curl_easy_cleanup(ms
->curl
);
88 if (ms
->cleanup
& CLEANUP_BUF_PTR
) {
89 free(ms
->pipe
.data_ptr
);
92 if (ms
->cleanup
& CLEANUP_POST
) {
93 MHD_destroy_post_processor(ms
->post
);
96 if (ms
->cleanup
& CLEANUP_DICT
) {
97 g_hash_table_destroy(ms
->dict
);
100 if (ms
->cleanup
& CLEANUP_QUERY
) {
101 meta_query_stop(ms
->query
);
104 if (ms
->cleanup
& CLEANUP_TMPL
) {
108 if (ms
->cleanup
& CLEANUP_URL
) {
116 validate_put (struct MHD_Connection
*conn
)
121 /* We're not a slave, so we don't care. */
125 mhdr
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
127 return (mhdr
&& !strcmp(mhdr
,"master"));
131 is_reserved (char *cand
, char **resv_list
)
135 for (i
= 0; resv_list
[i
]; ++i
) {
136 if (!strcmp(cand
,resv_list
[i
])) {
145 validate_url (const char *url
)
147 char *slash
= rindex(url
,'/');
150 /* There should be at least one betwixt bucket and key. */
154 return !is_reserved(slash
+1,reserved_name
);
158 * The proxy has MHD on one side and CURL on the other. The CURL side is
159 * always run in a child thread. Yes, there are both context switches
160 * and copies between the threads. Get over it. The focus here is on
161 * supporting multi-way replication on PUT, with minimal complexity. These
162 * were the two easiest libraries to use, and they both want to allocate
163 * their own buffers so we're kind of stuck with the copies unless we want
164 * to buffer whole files in memory (which would have to be virtual since
165 * they're potentialy bigger than physical) or explicitly ping them through
166 * a local filesystem. We could potentially take over scheduling from one
167 * or both to avoid some of the context switching, but those interfaces are
168 * significantly more error-prone and (at least in CURL's case) buggy.
170 * For a GET, the CURL child acts as a producer while the MHD parent acts
171 * as consumer. For a PUT, the MHD parent is the producer and the CURL
172 * child is the consumer. For GET the MHD component is invoked via a
173 * callback set up in the access handler; for PUT it's invoked via repeated
174 * calls to the access handler itself. Either way, the producer's job is
175 * to post its pointer+length to the my_state structure and then wait for
176 * all consumers to check back in saying they've finished it. This might
177 * involve multiple passes through each consumer for one pass through the
178 * single producer. When the producer is done, it does a similar handshake
179 * with the consumers. Each consumer has its own pipe_private structure,
180 * containing a pointer to the shared my_state plus a per-consumer offset
181 * into the current chunk.
183 * Attribute functions don't use CURL, so they do much simpler in-memory
184 * buffering. Queries also don't use CURL, but the MHD POST interface
185 * introduces some of its own complexity so see below for that.
189 simple_closer (void *ctx
)
193 DPRINTF("%s: cleaning up\n",__func__
);
198 child_closer (void * ctx
)
200 pipe_private
*pp
= ctx
;
202 DPRINTF("in %s\n",__func__
);
207 /* Invoked from MHD. */
209 proxy_get_cons (void *ctx
, uint64_t pos
, char *buf
, int max
)
211 pipe_private
*pp
= ctx
;
212 pipe_shared
*ps
= pp
->shared
;
213 my_state
*ms
= ps
->owner
;
219 DPRINTF("consumer asked to read %d\n",max
);
221 if (pipe_cons_wait(pp
)) {
222 DPRINTF("consumer offset %zu into %zu\n",
223 pp
->offset
, ps
->data_len
);
224 done
= ps
->data_len
- pp
->offset
;
228 memcpy(buf
,ps
->data_ptr
+pp
->offset
,done
);
230 DPRINTF("consumer copied %d, new offset %zu\n",
232 if (pp
->offset
== ps
->data_len
) {
233 DPRINTF("consumer finished chunk\n");
234 pipe_cons_signal(pp
, 0);
243 pthread_join(ms
->backend_th
,&child_res
);
244 if (child_res
== THREAD_FAILED
) {
245 ms
->rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
247 if (ms
->from_master
) {
248 pthread_join(ms
->cache_th
,NULL
);
249 /* TBD: do something about cache failure? */
258 proxy_get_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
259 const char *method
, const char *version
, const char *data
,
260 size_t *data_size
, void **rctx
)
262 struct MHD_Response
*resp
;
263 my_state
*ms
= *rctx
;
267 const char *user_etag
;
275 DPRINTF("PROXY GET DATA %s\n",url
);
277 ms
->url
= strdup(url
);
281 ms
->cleanup
|= CLEANUP_URL
;
283 my_etag
= meta_has_copy(ms
->bucket
,ms
->key
,me
);
285 user_etag
= MHD_lookup_connection_value(
286 conn
, MHD_HEADER_KIND
, "If-None-Match");
287 if (user_etag
&& !strcmp(user_etag
,my_etag
)) {
288 DPRINTF("ETag match!\n");
290 resp
= MHD_create_response_from_data(0,NULL
,
292 MHD_queue_response(conn
,MHD_HTTP_NOT_MODIFIED
,resp
);
293 MHD_destroy_response(resp
);
300 DPRINTF("%s/%s not found locally\n",ms
->bucket
,ms
->key
);
302 DPRINTF(" that means it doesn't exist\n");
303 resp
= MHD_create_response_from_data(0,NULL
,
305 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
306 MHD_destroy_response(resp
);
309 DPRINTF(" will fetch from %s:%u\n", master_host
,master_port
);
313 pipe_init_shared(&ms
->pipe
,ms
,ms
->from_master
+1);
314 pp
= pipe_init_private(&ms
->pipe
);
318 /* Master is always assumed to be CURL (i.e. our own protocol) */
319 if (ms
->from_master
) {
320 pthread_create(&ms
->backend_th
,NULL
,
321 curl_func_tbl
.get_child_func
,ms
);
324 pthread_create(&ms
->backend_th
,NULL
,
325 main_func_tbl
->get_child_func
,ms
);
327 /* TBD: check return value */
329 if (ms
->from_master
) {
330 pp2
= pipe_init_private(&ms
->pipe
);
334 pthread_create(&ms
->cache_th
,NULL
,
335 main_func_tbl
->cache_child_func
,pp2
);
336 /* TBD: check return value */
342 resp
= MHD_create_response_from_callback(
343 MHD_SIZE_UNKNOWN
, 65536, proxy_get_cons
, pp
, child_closer
);
345 fprintf(stderr
,"MHD_crfc failed\n");
347 /* TBD: terminate thread */
353 MHD_queue_response(conn
,ms
->rc
,resp
);
354 MHD_destroy_response(resp
);
360 recheck_replication (my_state
* ms
, char *policy
)
364 char fixed
[MAX_FIELD_LEN
];
366 if (is_reserved(ms
->key
,reserved_name
)) {
367 DPRINTF("declining to replicate reserved object %s\n",ms
->key
);
371 if (!policy
&& ms
->dict
) {
372 DPRINTF("using new policy for %s/%s\n",ms
->bucket
,ms
->key
);
373 policy
= g_hash_table_lookup(ms
->dict
,"_policy");
377 /* If we get a policy here or below, we have to free it. */
379 DPRINTF("fetching policy for %s/%s\n",ms
->bucket
,ms
->key
);
380 rc
= meta_get_value(ms
->bucket
,ms
->key
, "_policy", &policy
);
384 DPRINTF(" inheriting policy from %s\n",ms
->bucket
);
385 rc
= meta_get_value(ms
->bucket
,
386 "_default", "_policy", &policy
);
390 DPRINTF(" implementing policy %s\n",policy
);
392 * Can't use ms->url here because it might be a bucket POST
393 * and in that case ms->url points to the bucket.
395 snprintf(fixed
,sizeof(fixed
),"%s/%s",ms
->bucket
,ms
->key
);
396 replicate(fixed
,0,policy
);
402 DPRINTF(" could not find a policy anywhere!\n");
407 proxy_put_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
408 const char *method
, const char *version
, const char *data
,
409 size_t *data_size
, void **rctx
)
411 struct MHD_Response
*resp
;
412 my_state
*ms
= *rctx
;
422 DPRINTF("PROXY PUT DATA %s (%zu)\n",url
,*data_size
);
424 if (ms
->state
== MS_NEW
) {
425 if (!validate_put(conn
) || !validate_url(url
)) {
426 DPRINTF("rejecting %s\n",url
);
427 resp
= MHD_create_response_from_data(0,NULL
,
432 MHD_queue_response(conn
,MHD_HTTP_FORBIDDEN
,resp
);
433 MHD_destroy_response(resp
);
436 ms
->state
= MS_NORMAL
;
437 ms
->url
= strdup(url
);
441 ms
->cleanup
|= CLEANUP_URL
;
443 pipe_init_shared(&ms
->pipe
,ms
,1);
444 pp
= pipe_init_private(&ms
->pipe
);
448 pthread_create(&ms
->backend_th
,NULL
,
449 main_func_tbl
->put_child_func
,pp
);
450 /* TBD: check return value */
453 * Do the initial handshake with children. If we return from
454 * this callback without an error response, Microhttpd posts
455 * the "100 Continue" header and the client starts sending
456 * the data. We must report errors here or forever keep
459 rc
= pipe_prod_wait_init(&ms
->pipe
);
461 DPRINTF("producer wait failed\n");
462 resp
= MHD_create_response_from_data(0,NULL
,
467 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
469 MHD_destroy_response(resp
);
472 * Note that we fail here even if 1 of N replicas fail.
473 * Might want to fix this when we start looping over
474 * pipe_init_private() above.
476 DPRINTF("producer replicas failed (%u of %u)\n",
477 rc
, ms
->pipe
.cons_total
);
478 resp
= MHD_create_response_from_data(0,NULL
,
483 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
485 MHD_destroy_response(resp
);
487 DPRINTF("producer proceeding\n");
490 else if (*data_size
) {
491 pipe_prod_signal(&ms
->pipe
,(void *)data
,*data_size
);
492 ms
->size
+= *data_size
;
493 DPRINTF("producer chunk finished\n");
497 pipe_prod_finish(&ms
->pipe
);
498 pthread_join(ms
->backend_th
,&child_res
);
499 if (child_res
== THREAD_FAILED
) {
500 DPRINTF("thread failed\n");
501 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
503 else if (ms
->pipe
.cons_error
== ms
->pipe
.cons_total
) {
504 DPRINTF("all %u consumers failed\n",
505 ms
->pipe
.cons_error
);
506 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
510 meta_got_copy(ms
->bucket
,ms
->key
,me
);
514 etag
= meta_did_put(ms
->bucket
,ms
->key
,me
,
517 DPRINTF("rereplicate (obj PUT)\n");
518 recheck_replication(ms
,NULL
);
522 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
528 MHD_add_response_header(resp
,"ETag",etag
);
531 MHD_queue_response(conn
,rc
,resp
);
532 MHD_destroy_response(resp
);
539 proxy_get_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
540 const char *method
, const char *version
, const char *data
,
541 size_t *data_size
, void **rctx
)
543 struct MHD_Response
*resp
;
545 my_state
*ms
= *rctx
;
546 int rc
= MHD_HTTP_NOT_FOUND
;
554 DPRINTF("PROXY GET ATTR %s\n",url
);
556 if (meta_get_value(ms
->bucket
,ms
->key
,ms
->attr
,&fixed
) == 0) {
557 resp
= MHD_create_response_from_data(strlen(fixed
),fixed
,
562 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
567 MHD_queue_response(conn
,rc
,resp
);
568 MHD_destroy_response(resp
);
574 proxy_put_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
575 const char *method
, const char *version
, const char *data
,
576 size_t *data_size
, void **rctx
)
578 struct MHD_Response
*resp
;
579 my_state
*ms
= *rctx
;
587 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url
,*data_size
);
589 if (ms
->state
== MS_NEW
) {
590 ms
->state
= MS_NORMAL
;
591 ms
->url
= strdup(url
);
595 ms
->cleanup
|= CLEANUP_URL
;
596 attrval
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
599 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,
604 else if (*data_size
) {
605 if (ms
->pipe
.data_len
) {
606 ms
->pipe
.data_len
+= *data_size
;
607 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
611 ms
->pipe
.data_ptr
= p
;
614 ms
->pipe
.data_len
= *data_size
+ 1;
615 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
616 if (!ms
->pipe
.data_ptr
) {
619 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
620 ms
->cleanup
|= CLEANUP_BUF_PTR
;
622 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
623 /* TBD: check return value */
627 if (!ms
->pipe
.data_ptr
) {
630 if (is_reserved(ms
->attr
,reserved_attr
)) {
631 resp
= MHD_create_response_from_data(
632 0,NULL
,MHD_NO
,MHD_NO
);
636 MHD_queue_response(conn
,MHD_HTTP_BAD_REQUEST
,
638 MHD_destroy_response(resp
);
642 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,ms
->pipe
.data_ptr
);
644 * We should always re-replicate, because the replication
645 * policy might refer to this attr.
647 DPRINTF("rereplicate (attr PUT)\n");
648 recheck_replication(ms
,NULL
);
654 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
658 MHD_queue_response(conn
,MHD_HTTP_CREATED
,resp
);
659 MHD_destroy_response(resp
);
661 * TBD: check if the attribute was a replication policy, and
662 * start/stop replication activities as appropriate.
670 * For queries, we have to deal with MHD's post-iterator interface (not
671 * quite the same as the data-iteration even though we use it that way) on
672 * one side, and a query-iterator interface on the other. Data on both
673 * sides could be quite large, so we can't just stick them in header lines.
674 * We do still buffer the query in memory, though. Once that's done, we do
675 * very simple parsing - it will be more complicated later - and create the
676 * query iterator. That's also driven by MHD, this time though the
677 * content-callback interface, and repeatedly calls in to the metadata
678 * module to fetch one object name at a time.
682 query_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
683 const char *filename
, const char *content_type
,
684 const char *transfer_encoding
, const char *data
,
685 uint64_t off
, size_t size
)
692 (void)transfer_encoding
;
697 /* We actually accumulate the data in proxy_query. */
701 /* MHD reader function during queries. Return -1 for EOF. */
703 proxy_query_func (void *ctx
, uint64_t pos
, char *buf
, int max
)
707 const char *accept_hdr
;
713 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
717 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
721 ms
->cleanup
|= CLEANUP_TMPL
;
722 len
= tmpl_obj_header(ms
->gen_ctx
);
729 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
733 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
738 if (!meta_query_next(ms
->query
,&bucket
,&key
)) {
741 if (is_reserved(key
,reserved_name
)) {
744 len
= tmpl_obj_entry(ms
->gen_ctx
,bucket
,key
);
751 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
755 len
= tmpl_obj_footer(ms
->gen_ctx
);
762 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
764 ms
->cleanup
&= ~CLEANUP_TMPL
;
765 ms
->gen_ctx
= TMPL_CTX_DONE
;
770 proxy_query (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
771 const char *method
, const char *version
, const char *data
,
772 size_t *data_size
, void **rctx
)
774 struct MHD_Response
*resp
;
775 my_state
*ms
= *rctx
;
781 DPRINTF("PROXY QUERY %s (%zu)\n",url
,*data_size
);
783 if (ms
->state
== MS_NEW
) {
784 ms
->state
= MS_NORMAL
;
785 ms
->post
= MHD_create_post_processor(conn
,4096,
787 ms
->cleanup
|= CLEANUP_POST
;
789 else if (*data_size
) {
790 MHD_post_process(ms
->post
,data
,*data_size
);
791 if (ms
->pipe
.data_len
) {
792 ms
->pipe
.data_len
+= *data_size
;
793 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
797 ms
->pipe
.data_ptr
= p
;
800 ms
->pipe
.data_len
= *data_size
+ 1;
801 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
802 if (!ms
->pipe
.data_ptr
) {
805 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
806 ms
->cleanup
|= CLEANUP_BUF_PTR
;
808 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
809 /* TBD: check return value */
813 if (!ms
->pipe
.data_ptr
) {
816 ms
->query
= meta_query_new(ms
->bucket
,NULL
,ms
->pipe
.data_ptr
);
817 ms
->cleanup
|= CLEANUP_QUERY
;
818 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
819 65536, proxy_query_func
, ms
, simple_closer
);
821 fprintf(stderr
,"MHD_crfc failed\n");
825 MHD_add_response_header(resp
,"Content-Type","text/xml");
826 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
827 MHD_destroy_response(resp
);
834 proxy_list_objs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
835 const char *method
, const char *version
, const char *data
,
836 size_t *data_size
, void **rctx
)
838 my_state
*ms
= *rctx
;
839 struct MHD_Response
*resp
;
848 ms
->query
= meta_query_new((char *)ms
->bucket
,NULL
,NULL
);
849 ms
->cleanup
|= CLEANUP_QUERY
;
851 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
852 65536, proxy_query_func
, ms
, simple_closer
);
854 fprintf(stderr
,"MHD_crfc failed\n");
859 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
860 MHD_destroy_response(resp
);
865 proxy_delete (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
866 const char *method
, const char *version
, const char *data
,
867 size_t *data_size
, void **rctx
)
869 my_state
*ms
= *rctx
;
870 struct MHD_Response
*resp
;
883 DPRINTF("PROXY DELETE %s\n",url
);
885 rc
= main_func_tbl
->delete_func(ms
->bucket
,ms
->key
,(char *)url
);
890 copied_url
= strdup(url
);
892 bucket
= strtok_r(copied_url
,"/",&stctx
);
893 key
= strtok_r(NULL
,"/",&stctx
);
894 meta_delete(bucket
,key
);
897 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
901 MHD_add_response_header(resp
,"Content-Type","text/xml");
902 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
903 MHD_destroy_response(resp
);
905 replicate_delete((char *)url
);
909 /* TBD: get actual bucket list */
915 fake_bucket_t fake_bucket_list
[] = {
916 { "bucket_factory", "_new" },
917 { "provider_list", "_providers" },
921 root_blob_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
926 const char *accept_hdr
;
933 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
935 host
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,"Host");
938 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
942 ms
->cleanup
|= CLEANUP_TMPL
;
943 ms
->gen_ctx
->base
= host
;
944 len
= tmpl_root_header(ms
->gen_ctx
,"image_warehouse","1.0");
951 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
955 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
959 if (ms
->gen_ctx
->index
< ARRAY_SIZE(fake_bucket_list
)) {
960 fb
= fake_bucket_list
+ ms
->gen_ctx
->index
;
961 len
= tmpl_root_entry(ms
->gen_ctx
,fb
->rel
,fb
->link
);
968 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
972 if (meta_query_next(ms
->query
,&bucket
,&key
)) {
973 len
= tmpl_root_entry(ms
->gen_ctx
,"bucket",bucket
);
980 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
984 len
= tmpl_root_footer(ms
->gen_ctx
);
991 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
993 ms
->cleanup
&= ~CLEANUP_TMPL
;
994 ms
->gen_ctx
= TMPL_CTX_DONE
;
999 proxy_api_root (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1000 const char *method
, const char *version
, const char *data
,
1001 size_t *data_size
, void **rctx
)
1003 struct MHD_Response
*resp
= NULL
;
1004 unsigned int rc
= MHD_HTTP_OK
;
1005 my_state
*ms
= *rctx
;
1012 DPRINTF("PROXY API ROOT (%s, %zu)\n",url
,*data_size
);
1014 ms
->query
= meta_query_new(NULL
,"_default",NULL
);
1019 ms
->cleanup
|= CLEANUP_QUERY
;
1021 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1022 65536, root_blob_generator
, ms
, simple_closer
);
1026 MHD_add_response_header(resp
,"Content-Type","text/xml");
1027 MHD_queue_response(conn
,rc
,resp
);
1028 MHD_destroy_response(resp
);
1035 post_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1036 const char *filename
, const char *content_type
,
1037 const char *transfer_encoding
, const char *data
,
1038 uint64_t off
, size_t size
)
1047 (void)transfer_encoding
;
1050 printf("adding %s, size=%zu\n",key
,size
);
1052 // TBD: don't assume that values are null-terminated strings
1053 old_val
= g_hash_table_lookup(ctx
,key
);
1055 old_len
= strlen(old_val
);
1056 new_val
= malloc(old_len
+size
+1);
1060 memcpy(new_val
,old_val
,old_len
);
1061 memcpy(new_val
+old_len
,data
,size
);
1062 new_val
[old_len
+size
] = '\0';
1065 new_val
= malloc(size
+1);
1069 memcpy(new_val
,data
,size
);
1070 new_val
[size
] = '\0';
1073 g_hash_table_insert(ctx
,strdup(key
),new_val
);
1074 /* TBD: check return value for strdups (none avail for insert) */
1078 /* Returns TRUE if we found an *invalid* key. */
1080 post_find (gpointer key
, gpointer value
, gpointer ctx
)
1085 if (!is_reserved(key
,reserved_attr
)) {
1089 DPRINTF("bad attr %s\n", (char *)key
);
1094 post_foreach (gpointer key
, gpointer value
, gpointer ctx
)
1098 DPRINTF("setting %s = %s for %s/%s\n",(char *)key
, (char *)value
,
1099 ms
->bucket
,ms
->key
);
1100 meta_set_value(ms
->bucket
,ms
->key
,key
,value
);
1104 create_bucket (char *name
)
1108 if (is_reserved(name
,reserved_name
)) {
1109 return MHD_HTTP_BAD_REQUEST
;
1112 rc
= main_func_tbl
->bcreate_func(name
);
1113 if (rc
== MHD_HTTP_OK
) {
1114 if (meta_set_value(name
,"_default", "_policy","0") != 0) {
1115 DPRINTF("default-policy " "create failed\n");
1118 DPRINTF("created bucket %s\n",name
);
1120 * There's not a whole lot to do about bucket-creation
1121 * failures on replicas, other than to report them, unless
1122 * we adopt an "all or nothing" approach and unwind the
1123 * create on the primary as well. Then what if that fails?
1124 * It's just one example of the general "fewer replicas
1125 * than desired" distributed-system problem, not worth a
1126 * point solution here/now. Revisit when we have a more
1127 * general replica-repair policy/system in place.
1129 replicate_bcreate(name
);
1136 proxy_bucket_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1137 const char *method
, const char *version
, const char *data
,
1138 size_t *data_size
, void **rctx
)
1140 struct MHD_Response
*resp
;
1141 my_state
*ms
= *rctx
;
1149 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1151 if (ms
->state
== MS_NEW
) {
1152 ms
->state
= MS_NORMAL
;
1153 ms
->url
= (char *)url
;
1154 ms
->dict
= g_hash_table_new_full(
1155 g_str_hash
,g_str_equal
,free
,free
);
1156 ms
->cleanup
|= CLEANUP_DICT
;
1157 ms
->post
= MHD_create_post_processor(conn
,4096,
1158 post_iterator
,ms
->dict
);
1159 ms
->cleanup
|= CLEANUP_POST
;
1161 else if (*data_size
) {
1162 MHD_post_process(ms
->post
,data
,*data_size
);
1166 rc
= MHD_HTTP_BAD_REQUEST
;
1167 key
= g_hash_table_lookup(ms
->dict
,"key");
1169 strncpy(ms
->key
,key
,MAX_FIELD_LEN
-1);
1170 g_hash_table_remove(ms
->dict
,"key");
1171 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1172 g_hash_table_foreach(ms
->dict
,post_foreach
,ms
);
1173 DPRINTF("rereplicate (bucket POST)\n");
1174 recheck_replication(ms
,NULL
);
1178 else if (!strcmp(ms
->bucket
,"_new")) {
1179 key
= g_hash_table_lookup(ms
->dict
,"name");
1181 rc
= create_bucket(key
);
1185 DPRINTF("A parameter is MISSING (fail)\n");
1187 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1189 fprintf(stderr
,"MHD_crfd failed\n");
1192 MHD_queue_response(conn
,rc
,resp
);
1193 MHD_destroy_response(resp
);
1200 check_location (my_state
*ms
)
1202 char *loc
= g_hash_table_lookup(ms
->dict
,"depot");
1205 DPRINTF("missing loc on check for %s/%s\n",ms
->bucket
,ms
->key
);
1206 return MHD_HTTP_BAD_REQUEST
;
1209 if (!meta_has_copy(ms
->bucket
,ms
->key
,loc
)) {
1210 DPRINTF("did not find %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1211 return MHD_HTTP_NOT_FOUND
;
1214 /* TBD: meta_has_copy returns an etag which we should check */
1215 DPRINTF("found %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1220 register_image (my_state
*ms
)
1227 site
= g_hash_table_lookup(ms
->dict
,"site");
1229 printf("site MISSING\n");
1230 return MHD_HTTP_BAD_REQUEST
;
1233 next
= index(site
,':');
1238 for (i
= 0; get_provider(i
,&prov
); ++i
) {
1239 if (strcmp(prov
.name
,site
)) {
1242 return prov
.func_tbl
->register_func(ms
,&prov
,next
,ms
->dict
);
1245 DPRINTF("site %s not found\n",site
);
1246 return MHD_HTTP_BAD_REQUEST
;
1250 proxy_object_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1251 const char *method
, const char *version
, const char *data
,
1252 size_t *data_size
, void **rctx
)
1254 struct MHD_Response
*resp
;
1255 my_state
*ms
= *rctx
;
1263 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1265 if (ms
->state
== MS_NEW
) {
1266 ms
->state
= MS_NORMAL
;
1267 ms
->url
= (char *)url
;
1268 ms
->dict
= g_hash_table_new_full(
1269 g_str_hash
,g_str_equal
,free
,free
);
1270 ms
->cleanup
|= CLEANUP_DICT
;
1271 ms
->post
= MHD_create_post_processor(conn
,4096,
1272 post_iterator
,ms
->dict
);
1273 ms
->cleanup
|= CLEANUP_POST
;
1275 else if (*data_size
) {
1276 MHD_post_process(ms
->post
,data
,*data_size
);
1280 rc
= MHD_HTTP_BAD_REQUEST
;
1281 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1282 op
= g_hash_table_lookup(ms
->dict
,"op");
1284 if (!strcmp(op
,"push")) {
1285 DPRINTF("rereplicate (obj POST)\n");
1286 recheck_replication(ms
,NULL
);
1289 else if (!strcmp(op
,"check")) {
1290 rc
= check_location(ms
);
1292 else if (!strcmp(op
,"register")) {
1293 rc
= register_image(ms
);
1296 DPRINTF("unknown op %s for %s/%s\n",
1297 op
, ms
->bucket
, ms
->key
);
1301 DPRINTF("op is MISSING (fail)\n");
1304 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1306 fprintf(stderr
,"MHD_crfd failed\n");
1309 MHD_queue_response(conn
,rc
,resp
);
1310 MHD_destroy_response(resp
);
1319 prov_list_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
1324 const char *accept_hdr
;
1328 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
1332 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
1336 ms
->cleanup
|= CLEANUP_TMPL
;
1337 len
= tmpl_prov_header(ms
->gen_ctx
);
1344 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1348 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
1352 if (get_provider(ms
->gen_ctx
->index
,&prov
)) {
1353 len
= tmpl_prov_entry(ms
->gen_ctx
,prov
.name
,prov
.type
,
1354 prov
.host
, prov
.port
, prov
.username
, prov
.password
);
1361 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1365 len
= tmpl_prov_footer(ms
->gen_ctx
);
1372 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1374 ms
->cleanup
&= ~CLEANUP_TMPL
;
1375 ms
->gen_ctx
= TMPL_CTX_DONE
;
1380 proxy_list_provs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1381 const char *method
, const char *version
, const char *data
,
1382 size_t *data_size
, void **rctx
)
1384 struct MHD_Response
*resp
;
1385 my_state
*ms
= *rctx
;
1394 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1395 65536, prov_list_generator
, ms
, simple_closer
);
1397 fprintf(stderr
,"MHD_crfd failed\n");
1401 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
1402 MHD_destroy_response(resp
);
1408 prov_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1409 const char *filename
, const char *content_type
,
1410 const char *transfer_encoding
, const char *data
,
1411 uint64_t off
, size_t size
)
1416 (void)transfer_encoding
;
1419 g_hash_table_insert(ctx
,strdup(key
),strndup(data
,size
));
1420 /* TBD: check return value for strdups (none avail for insert) */
1426 proxy_update_prov (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1427 const char *method
, const char *version
, const char *data
,
1428 size_t *data_size
, void **rctx
)
1430 struct MHD_Response
*resp
;
1431 my_state
*ms
= *rctx
;
1441 if (ms
->state
== MS_NEW
) {
1442 ms
->state
= MS_NORMAL
;
1443 ms
->url
= (char *)url
;
1444 ms
->dict
= g_hash_table_new_full(
1445 g_str_hash
,g_str_equal
,free
,free
);
1446 ms
->cleanup
|= CLEANUP_DICT
;
1447 ms
->post
= MHD_create_post_processor(conn
,4096,
1448 prov_iterator
,ms
->dict
);
1449 ms
->cleanup
|= CLEANUP_POST
;
1451 else if (*data_size
) {
1452 MHD_post_process(ms
->post
,data
,*data_size
);
1456 rc
= MHD_HTTP_BAD_REQUEST
;
1457 provider
= g_hash_table_lookup(ms
->dict
,"provider");
1458 username
= g_hash_table_lookup(ms
->dict
,"username");
1459 password
= g_hash_table_lookup(ms
->dict
,"password");
1460 if (provider
&& username
&& password
) {
1461 update_provider(provider
,username
,password
);
1465 DPRINTF("provider/username/password MISSING\n");
1467 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1469 fprintf(stderr
,"MHD_crfd failed\n");
1472 MHD_queue_response(conn
,rc
,resp
);
1473 MHD_destroy_response(resp
);
1480 proxy_create_bucket (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1481 const char *method
, const char *version
, const char *data
,
1482 size_t *data_size
, void **rctx
)
1484 struct MHD_Response
*resp
;
1485 my_state
*ms
= *rctx
;
1495 /* curl -T moo.empty http://localhost:9090/_new by accident */
1496 rc
= create_bucket(ms
->bucket
);
1498 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1500 fprintf(stderr
,"MHD_crfd failed\n");
1503 MHD_queue_response(conn
,rc
,resp
);
1504 MHD_destroy_response(resp
);
1510 { /* get bucket list */
1511 "GET", URL_ROOT
, proxy_api_root
},
1512 { /* get object list */
1513 "GET", URL_BUCKET
, proxy_list_objs
},
1514 { /* create bucket */
1515 "PUT", URL_BUCKET
, proxy_create_bucket
},
1516 { /* get object data */
1517 "GET", URL_OBJECT
, proxy_get_data
},
1518 { /* get attribute data */
1519 "GET", URL_ATTR
, proxy_get_attr
},
1520 { /* put object data */
1521 "PUT", URL_OBJECT
, proxy_put_data
},
1522 { /* put attribute data */
1523 "PUT", URL_ATTR
, proxy_put_attr
},
1524 { /* create object and/or modify attributes */
1525 "POST", URL_BUCKET
, proxy_bucket_post
},
1526 { /* perform control operations on an object */
1527 "POST", URL_OBJECT
, proxy_object_post
},
1529 "POST", URL_QUERY
, proxy_query
},
1530 { /* delete object */
1531 "DELETE", URL_OBJECT
, proxy_delete
},
1532 { /* delete attribute (TBD) */
1533 "DELETE", URL_ATTR
, NULL
},
1534 { /* get provider list */
1535 "GET", URL_PROVLIST
, proxy_list_provs
},
1536 { /* update a provider */
1537 "POST", URL_PROVLIST
, proxy_update_prov
},
1542 parse_url (const char *url
, my_state
*ms
)
1544 unsigned short esize
;
1545 unsigned short eindex
;
1546 char *parts
[URL_INVAL
];
1549 parts
[URL_BUCKET
] = ms
->bucket
;
1550 parts
[URL_OBJECT
] = ms
->key
;
1551 parts
[URL_ATTR
] = ms
->attr
;
1554 while (*url
== '/') {
1559 if (eindex
== URL_BUCKET
) {
1560 if (!strcmp(ms
->bucket
,"_providers")) {
1561 eindex
= URL_PROVLIST
;
1564 else if (eindex
== URL_OBJECT
) {
1565 if (!strcmp(ms
->key
,"_query")) {
1572 if (++eindex
>= URL_INVAL
) {
1577 while (*url
&& (*url
!= '/')) {
1578 parts
[eindex
][esize
++] = *(url
++);
1579 if (esize
>= MAX_FIELD_LEN
) {
1589 access_handler (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1590 const char *method
, const char *version
, const char *data
,
1591 size_t *data_size
, void **rctx
)
1595 struct MHD_Response
*resp
;
1596 my_state
*ms
= *rctx
;
1599 return ms
->handler(cctx
,conn
,url
,method
,version
,
1600 data
,data_size
,rctx
);
1603 ms
= malloc(sizeof(my_state
));
1607 memset(ms
,0,sizeof(*ms
));
1609 utype
= parse_url(url
,ms
);
1611 for (i
= 0; my_rules
[i
].method
; ++i
) {
1612 if (utype
!= my_rules
[i
].utype
) {
1615 if (strcmp(method
,my_rules
[i
].method
)) {
1618 if (!my_rules
[i
].handler
) {
1621 ms
->handler
= my_rules
[i
].handler
;
1628 return ms
->handler(cctx
,conn
,url
,method
,version
,
1629 data
,data_size
,rctx
);
1632 if (!strcmp(method
,"QUIT")) {
1633 (void)sem_post((sem_t
*)cctx
);
1637 fprintf(stderr
,"bad request m=%s u=%s\n",method
,url
);
1640 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1644 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
1645 MHD_destroy_response(resp
);
1649 /* These enum values cannot possibly conflict with the option values
1650 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1651 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1654 GETOPT_HELP_CHAR
= (CHAR_MIN
- 2),
1655 GETOPT_VERSION_CHAR
= (CHAR_MIN
- 3)
1658 struct option my_options
[] = {
1659 { "config", required_argument
, NULL
, 'c' },
1660 { "db", required_argument
, NULL
, 'd' },
1661 { "master", required_argument
, NULL
, 'm' },
1662 { "port", required_argument
, NULL
, 'p' },
1663 { "verbose", no_argument
, NULL
, 'v' },
1664 { "version", no_argument
, NULL
, GETOPT_VERSION_CHAR
},
1665 { "help", no_argument
, NULL
, GETOPT_HELP_CHAR
},
1666 { NULL
, 0, NULL
, '\0' }
1672 if (status
!= EXIT_SUCCESS
)
1673 fprintf (stderr
, _("Try `%s --help' for more information.\n"),
1678 Usage: %s [OPTION]\n\
1682 Deltacloud image-warehouse daemon.\n\
1683 A configuration file must be specified.\n\
1685 -c, --config=FILE config file [required]\n\
1686 -d, --db=HOST_PORT database server as ip[:port]\n\
1687 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1688 -p, --port=PORT alternate listen port (default 9090)\n\
1689 -v, --verbose verbose/debug output\n\
1691 --help display this help and exit\n\
1692 --version output version information and exit\n\
1696 Report %s bugs to %s.\n\
1698 program_name
, PACKAGE_BUGREPORT
);
1704 main (int argc
, char **argv
)
1706 struct MHD_Daemon
*the_daemon
;
1711 program_name
= argv
[0];
1713 for (;;) switch (getopt_long(argc
,argv
,"c:d:m:p:v",my_options
,NULL
)) {
1719 db_host
= strtok_r(optarg
,":",&stctx
);
1720 port_tmp
= strtok_r(NULL
,":",&stctx
);
1722 db_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1727 master_host
= strtok_r(optarg
,":",&stctx
);
1728 port_tmp
= strtok_r(NULL
,":",&stctx
);
1730 master_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1734 my_port
= (unsigned short)strtoul(optarg
,NULL
,10);
1739 case GETOPT_HELP_CHAR
:
1740 usage(EXIT_SUCCESS
);
1742 case GETOPT_VERSION_CHAR
:
1743 printf ("%s version %s\n", program_name
, PACKAGE_VERSION
);
1744 exit (EXIT_SUCCESS
);
1750 usage(EXIT_FAILURE
);
1756 error (0, 0, "no configuration file specified");
1757 usage (EXIT_FAILURE
);
1760 me
= parse_config();
1762 fprintf(stderr
,"could not parse %s\n",cfg_file
);
1766 sem_init(&the_sem
,0,0);
1769 main_func_tbl
= &s3_func_tbl
;
1772 main_func_tbl
= &curl_func_tbl
;
1776 main_func_tbl
= &fs_func_tbl
;
1780 printf("primary store type is %s\n",main_func_tbl
->name
);
1782 printf("operating as slave to %s:%u\n",
1783 master_host
, master_port
);
1785 printf("db is at %s:%u\n",db_host
,db_port
);
1786 printf("will listen on port %u\n",my_port
);
1787 printf("my location is \"%s\"\n",me
);
1792 main_func_tbl
->init_func();
1797 * Gotcha: if we don't set the connection memory limit explicitly,
1798 * the per-connection buffer for MHD will be smaller than that used
1799 * by CURL, so proxy_writefunc will never be able to do its job.
1801 the_daemon
= MHD_start_daemon(MY_MHD_FLAGS
,
1802 my_port
, NULL
, NULL
, &access_handler
, &the_sem
,
1803 MHD_OPTION_CONNECTION_MEMORY_LIMIT
, (size_t)1048576,
1806 fprintf(stderr
,"Could not create daemon.\n");