1 /* Copyright (C) 2010 Free Software Foundation, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
44 #include "state_defs.h"
46 /* Define-away for now. Eventually, define to gettext. */
47 #define _(msgid) (msgid)
50 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
51 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
53 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
56 extern backend_func_tbl bad_func_tbl
;
57 extern backend_func_tbl s3_func_tbl
;
58 extern backend_func_tbl curl_func_tbl
;
59 extern backend_func_tbl fs_func_tbl
;
62 URL_ROOT
=0, URL_BUCKET
, URL_OBJECT
, URL_ATTR
, URL_INVAL
,
63 URL_QUERY
, URL_PROVLIST
69 MHD_AccessHandlerCallback handler
;
73 unsigned short my_port
= MY_PORT
;
74 const char *program_name
;
76 char *(reserved_name
[]) = { "_default", "_query", "_new", NULL
};
77 char *(reserved_attr
[]) = { "bucket", "key", "date", "etag", "loc", NULL
};
79 backend_func_tbl
*main_func_tbl
= &bad_func_tbl
;
82 free_ms (my_state
*ms
)
84 if (ms
->cleanup
& CLEANUP_CURL
) {
85 curl_easy_cleanup(ms
->curl
);
88 if (ms
->cleanup
& CLEANUP_BUF_PTR
) {
89 free(ms
->pipe
.data_ptr
);
92 if (ms
->cleanup
& CLEANUP_POST
) {
93 MHD_destroy_post_processor(ms
->post
);
96 if (ms
->cleanup
& CLEANUP_DICT
) {
97 g_hash_table_destroy(ms
->dict
);
100 if (ms
->cleanup
& CLEANUP_QUERY
) {
101 meta_query_stop(ms
->query
);
104 if (ms
->cleanup
& CLEANUP_TMPL
) {
108 if (ms
->cleanup
& CLEANUP_URL
) {
116 validate_put (struct MHD_Connection
*conn
)
121 /* We're not a slave, so we don't care. */
125 mhdr
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
127 return (mhdr
&& !strcmp(mhdr
,"master"));
131 is_reserved (char *cand
, char **resv_list
)
135 for (i
= 0; resv_list
[i
]; ++i
) {
136 if (!strcmp(cand
,resv_list
[i
])) {
145 validate_url (const char *url
)
147 char *slash
= rindex(url
,'/');
150 /* There should be at least one betwixt bucket and key. */
154 return !is_reserved(slash
+1,reserved_name
);
158 * The proxy has MHD on one side and CURL on the other. The CURL side is
159 * always run in a child thread. Yes, there are both context switches
160 * and copies between the threads. Get over it. The focus here is on
161 * supporting multi-way replication on PUT, with minimal complexity. These
162 * were the two easiest libraries to use, and they both want to allocate
163 * their own buffers so we're kind of stuck with the copies unless we want
164 * to buffer whole files in memory (which would have to be virtual since
165 * they're potentialy bigger than physical) or explicitly ping them through
166 * a local filesystem. We could potentially take over scheduling from one
167 * or both to avoid some of the context switching, but those interfaces are
168 * significantly more error-prone and (at least in CURL's case) buggy.
170 * For a GET, the CURL child acts as a producer while the MHD parent acts
171 * as consumer. For a PUT, the MHD parent is the producer and the CURL
172 * child is the consumer. For GET the MHD component is invoked via a
173 * callback set up in the access handler; for PUT it's invoked via repeated
174 * calls to the access handler itself. Either way, the producer's job is
175 * to post its pointer+length to the my_state structure and then wait for
176 * all consumers to check back in saying they've finished it. This might
177 * involve multiple passes through each consumer for one pass through the
178 * single producer. When the producer is done, it does a similar handshake
179 * with the consumers. Each consumer has its own pipe_private structure,
180 * containing a pointer to the shared my_state plus a per-consumer offset
181 * into the current chunk.
183 * Attribute functions don't use CURL, so they do much simpler in-memory
184 * buffering. Queries also don't use CURL, but the MHD POST interface
185 * introduces some of its own complexity so see below for that.
189 simple_closer (void *ctx
)
193 DPRINTF("%s: cleaning up\n",__func__
);
198 child_closer (void * ctx
)
200 pipe_private
*pp
= ctx
;
202 DPRINTF("in %s\n",__func__
);
207 /* Invoked from MHD. */
209 proxy_get_cons (void *ctx
, uint64_t pos
, char *buf
, int max
)
211 pipe_private
*pp
= ctx
;
212 pipe_shared
*ps
= pp
->shared
;
213 my_state
*ms
= ps
->owner
;
219 DPRINTF("consumer asked to read %d\n",max
);
221 if (pipe_cons_wait(pp
)) {
222 DPRINTF("consumer offset %zu into %zu\n",
223 pp
->offset
, ps
->data_len
);
224 done
= ps
->data_len
- pp
->offset
;
228 memcpy(buf
,ps
->data_ptr
+pp
->offset
,done
);
230 DPRINTF("consumer copied %d, new offset %zu\n",
232 if (pp
->offset
== ps
->data_len
) {
233 DPRINTF("consumer finished chunk\n");
234 pipe_cons_signal(pp
, 0);
243 pthread_join(ms
->backend_th
,&child_res
);
244 if (child_res
== THREAD_FAILED
) {
245 ms
->rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
247 if (ms
->from_master
) {
248 pthread_join(ms
->cache_th
,NULL
);
249 /* TBD: do something about cache failure? */
258 proxy_get_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
259 const char *method
, const char *version
, const char *data
,
260 size_t *data_size
, void **rctx
)
262 struct MHD_Response
*resp
;
263 my_state
*ms
= *rctx
;
267 const char *user_etag
;
275 DPRINTF("PROXY GET DATA %s\n",url
);
277 ms
->url
= strdup(url
);
281 ms
->cleanup
|= CLEANUP_URL
;
283 my_etag
= meta_has_copy(ms
->bucket
,ms
->key
,me
);
285 user_etag
= MHD_lookup_connection_value(
286 conn
, MHD_HEADER_KIND
, "If-None-Match");
287 if (user_etag
&& !strcmp(user_etag
,my_etag
)) {
288 DPRINTF("ETag match!\n");
290 resp
= MHD_create_response_from_data(0,NULL
,
292 MHD_queue_response(conn
,MHD_HTTP_NOT_MODIFIED
,resp
);
293 MHD_destroy_response(resp
);
300 DPRINTF("%s/%s not found locally\n",ms
->bucket
,ms
->key
);
302 DPRINTF(" that means it doesn't exist\n");
303 resp
= MHD_create_response_from_data(0,NULL
,
305 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
306 MHD_destroy_response(resp
);
310 DPRINTF(" will fetch from %s:%u\n", master_host
,master_port
);
314 pipe_init_shared(&ms
->pipe
,ms
,ms
->from_master
+1);
315 pp
= pipe_init_private(&ms
->pipe
);
319 /* Master is always assumed to be CURL (i.e. our own protocol) */
320 if (ms
->from_master
) {
321 pthread_create(&ms
->backend_th
,NULL
,
322 curl_func_tbl
.get_child_func
,ms
);
325 pthread_create(&ms
->backend_th
,NULL
,
326 main_func_tbl
->get_child_func
,ms
);
328 /* TBD: check return value */
330 if (ms
->from_master
) {
331 pp2
= pipe_init_private(&ms
->pipe
);
335 pthread_create(&ms
->cache_th
,NULL
,
336 main_func_tbl
->cache_child_func
,pp2
);
337 /* TBD: check return value */
343 resp
= MHD_create_response_from_callback(
344 MHD_SIZE_UNKNOWN
, 65536, proxy_get_cons
, pp
, child_closer
);
346 fprintf(stderr
,"MHD_crfc failed\n");
348 /* TBD: terminate thread */
354 MHD_queue_response(conn
,ms
->rc
,resp
);
355 MHD_destroy_response(resp
);
361 recheck_replication (my_state
* ms
, char *policy
)
365 char fixed
[MAX_FIELD_LEN
];
367 if (is_reserved(ms
->key
,reserved_name
)) {
368 DPRINTF("declining to replicate reserved object %s\n",ms
->key
);
372 if (!policy
&& ms
->dict
) {
373 DPRINTF("using new policy for %s/%s\n",ms
->bucket
,ms
->key
);
374 policy
= g_hash_table_lookup(ms
->dict
,"_policy");
378 /* If we get a policy here or below, we have to free it. */
380 DPRINTF("fetching policy for %s/%s\n",ms
->bucket
,ms
->key
);
381 rc
= meta_get_value(ms
->bucket
,ms
->key
, "_policy", &policy
);
385 DPRINTF(" inheriting policy from %s\n",ms
->bucket
);
386 rc
= meta_get_value(ms
->bucket
,
387 "_default", "_policy", &policy
);
391 DPRINTF(" implementing policy %s\n",policy
);
393 * Can't use ms->url here because it might be a bucket POST
394 * and in that case ms->url points to the bucket.
396 snprintf(fixed
,sizeof(fixed
),"%s/%s",ms
->bucket
,ms
->key
);
397 replicate(fixed
,0,policy
);
403 DPRINTF(" could not find a policy anywhere!\n");
408 proxy_put_data (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
409 const char *method
, const char *version
, const char *data
,
410 size_t *data_size
, void **rctx
)
412 struct MHD_Response
*resp
;
413 my_state
*ms
= *rctx
;
423 DPRINTF("PROXY PUT DATA %s (%zu)\n",url
,*data_size
);
425 if (ms
->state
== MS_NEW
) {
426 if (!validate_put(conn
) || !validate_url(url
)) {
427 DPRINTF("rejecting %s\n",url
);
428 resp
= MHD_create_response_from_data(0,NULL
,
433 MHD_queue_response(conn
,MHD_HTTP_FORBIDDEN
,resp
);
434 MHD_destroy_response(resp
);
437 ms
->state
= MS_NORMAL
;
438 ms
->url
= strdup(url
);
442 ms
->cleanup
|= CLEANUP_URL
;
444 pipe_init_shared(&ms
->pipe
,ms
,1);
445 pp
= pipe_init_private(&ms
->pipe
);
449 pthread_create(&ms
->backend_th
,NULL
,
450 main_func_tbl
->put_child_func
,pp
);
451 /* TBD: check return value */
454 * Do the initial handshake with children. If we return from
455 * this callback without an error response, Microhttpd posts
456 * the "100 Continue" header and the client starts sending
457 * the data. We must report errors here or forever keep
460 rc
= pipe_prod_wait_init(&ms
->pipe
);
462 DPRINTF("producer wait failed\n");
463 resp
= MHD_create_response_from_data(0,NULL
,
468 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
470 MHD_destroy_response(resp
);
473 * Note that we fail here even if 1 of N replicas fail.
474 * Might want to fix this when we start looping over
475 * pipe_init_private() above.
477 DPRINTF("producer replicas failed (%u of %u)\n",
478 rc
, ms
->pipe
.cons_total
);
479 resp
= MHD_create_response_from_data(0,NULL
,
484 MHD_queue_response(conn
,MHD_HTTP_INTERNAL_SERVER_ERROR
,
486 MHD_destroy_response(resp
);
488 DPRINTF("producer proceeding\n");
491 else if (*data_size
) {
492 pipe_prod_signal(&ms
->pipe
,(void *)data
,*data_size
);
493 ms
->size
+= *data_size
;
494 DPRINTF("producer chunk finished\n");
498 pipe_prod_finish(&ms
->pipe
);
499 pthread_join(ms
->backend_th
,&child_res
);
500 if (child_res
== THREAD_FAILED
) {
501 DPRINTF("thread failed\n");
502 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
504 else if (ms
->pipe
.cons_error
== ms
->pipe
.cons_total
) {
505 DPRINTF("all %u consumers failed\n",
506 ms
->pipe
.cons_error
);
507 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
511 meta_got_copy(ms
->bucket
,ms
->key
,me
);
515 etag
= meta_did_put(ms
->bucket
,ms
->key
,me
,
518 DPRINTF("rereplicate (obj PUT)\n");
519 recheck_replication(ms
,NULL
);
523 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
529 MHD_add_response_header(resp
,"ETag",etag
);
532 MHD_queue_response(conn
,rc
,resp
);
533 MHD_destroy_response(resp
);
540 proxy_get_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
541 const char *method
, const char *version
, const char *data
,
542 size_t *data_size
, void **rctx
)
544 struct MHD_Response
*resp
;
546 my_state
*ms
= *rctx
;
547 int rc
= MHD_HTTP_NOT_FOUND
;
555 DPRINTF("PROXY GET ATTR %s\n",url
);
557 if (meta_get_value(ms
->bucket
,ms
->key
,ms
->attr
,&fixed
) == 0) {
558 resp
= MHD_create_response_from_data(strlen(fixed
),fixed
,
563 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
568 MHD_queue_response(conn
,rc
,resp
);
569 MHD_destroy_response(resp
);
575 proxy_put_attr (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
576 const char *method
, const char *version
, const char *data
,
577 size_t *data_size
, void **rctx
)
579 struct MHD_Response
*resp
;
580 my_state
*ms
= *rctx
;
588 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url
,*data_size
);
590 if (ms
->state
== MS_NEW
) {
591 ms
->state
= MS_NORMAL
;
592 ms
->url
= strdup(url
);
596 ms
->cleanup
|= CLEANUP_URL
;
597 attrval
= MHD_lookup_connection_value(conn
,MHD_HEADER_KIND
,
600 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,
605 else if (*data_size
) {
606 if (ms
->pipe
.data_len
) {
607 ms
->pipe
.data_len
+= *data_size
;
608 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
612 ms
->pipe
.data_ptr
= p
;
615 ms
->pipe
.data_len
= *data_size
+ 1;
616 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
617 if (!ms
->pipe
.data_ptr
) {
620 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
621 ms
->cleanup
|= CLEANUP_BUF_PTR
;
623 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
624 /* TBD: check return value */
628 if (!ms
->pipe
.data_ptr
) {
631 if (is_reserved(ms
->attr
,reserved_attr
)) {
632 resp
= MHD_create_response_from_data(
633 0,NULL
,MHD_NO
,MHD_NO
);
637 MHD_queue_response(conn
,MHD_HTTP_BAD_REQUEST
,
639 MHD_destroy_response(resp
);
643 meta_set_value(ms
->bucket
,ms
->key
,ms
->attr
,ms
->pipe
.data_ptr
);
645 * We should always re-replicate, because the replication
646 * policy might refer to this attr.
648 DPRINTF("rereplicate (attr PUT)\n");
649 recheck_replication(ms
,NULL
);
655 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
659 MHD_queue_response(conn
,MHD_HTTP_CREATED
,resp
);
660 MHD_destroy_response(resp
);
662 * TBD: check if the attribute was a replication policy, and
663 * start/stop replication activities as appropriate.
671 * For queries, we have to deal with MHD's post-iterator interface (not
672 * quite the same as the data-iteration even though we use it that way) on
673 * one side, and a query-iterator interface on the other. Data on both
674 * sides could be quite large, so we can't just stick them in header lines.
675 * We do still buffer the query in memory, though. Once that's done, we do
676 * very simple parsing - it will be more complicated later - and create the
677 * query iterator. That's also driven by MHD, this time though the
678 * content-callback interface, and repeatedly calls in to the metadata
679 * module to fetch one object name at a time.
683 query_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
684 const char *filename
, const char *content_type
,
685 const char *transfer_encoding
, const char *data
,
686 uint64_t off
, size_t size
)
693 (void)transfer_encoding
;
698 /* We actually accumulate the data in proxy_query. */
702 /* MHD reader function during queries. Return -1 for EOF. */
704 proxy_query_func (void *ctx
, uint64_t pos
, char *buf
, int max
)
708 const char *accept_hdr
;
714 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
718 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
722 ms
->cleanup
|= CLEANUP_TMPL
;
723 len
= tmpl_obj_header(ms
->gen_ctx
);
730 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
734 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
739 if (!meta_query_next(ms
->query
,&bucket
,&key
)) {
742 if (is_reserved(key
,reserved_name
)) {
745 len
= tmpl_obj_entry(ms
->gen_ctx
,bucket
,key
);
752 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
756 len
= tmpl_obj_footer(ms
->gen_ctx
);
763 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
765 ms
->cleanup
&= ~CLEANUP_TMPL
;
766 ms
->gen_ctx
= TMPL_CTX_DONE
;
771 proxy_query (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
772 const char *method
, const char *version
, const char *data
,
773 size_t *data_size
, void **rctx
)
775 struct MHD_Response
*resp
;
776 my_state
*ms
= *rctx
;
782 DPRINTF("PROXY QUERY %s (%zu)\n",url
,*data_size
);
784 if (ms
->state
== MS_NEW
) {
785 ms
->state
= MS_NORMAL
;
786 ms
->post
= MHD_create_post_processor(conn
,4096,
788 ms
->cleanup
|= CLEANUP_POST
;
790 else if (*data_size
) {
791 MHD_post_process(ms
->post
,data
,*data_size
);
792 if (ms
->pipe
.data_len
) {
793 ms
->pipe
.data_len
+= *data_size
;
794 char *p
= realloc(ms
->pipe
.data_ptr
,ms
->pipe
.data_len
);
798 ms
->pipe
.data_ptr
= p
;
801 ms
->pipe
.data_len
= *data_size
+ 1;
802 ms
->pipe
.data_ptr
= malloc(ms
->pipe
.data_len
);
803 if (!ms
->pipe
.data_ptr
) {
806 ((char *)ms
->pipe
.data_ptr
)[0] = '\0';
807 ms
->cleanup
|= CLEANUP_BUF_PTR
;
809 (void)strncat(ms
->pipe
.data_ptr
,data
,*data_size
);
810 /* TBD: check return value */
814 if (!ms
->pipe
.data_ptr
) {
817 ms
->query
= meta_query_new(ms
->bucket
,NULL
,ms
->pipe
.data_ptr
);
818 ms
->cleanup
|= CLEANUP_QUERY
;
819 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
820 65536, proxy_query_func
, ms
, simple_closer
);
822 fprintf(stderr
,"MHD_crfc failed\n");
826 MHD_add_response_header(resp
,"Content-Type","text/xml");
827 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
828 MHD_destroy_response(resp
);
836 proxy_list_objs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
837 const char *method
, const char *version
, const char *data
,
838 size_t *data_size
, void **rctx
)
840 my_state
*ms
= *rctx
;
841 struct MHD_Response
*resp
;
850 ms
->query
= meta_query_new((char *)ms
->bucket
,NULL
,NULL
);
851 ms
->cleanup
|= CLEANUP_QUERY
;
853 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
854 65536, proxy_query_func
, ms
, simple_closer
);
856 fprintf(stderr
,"MHD_crfc failed\n");
861 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
862 MHD_destroy_response(resp
);
867 proxy_delete (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
868 const char *method
, const char *version
, const char *data
,
869 size_t *data_size
, void **rctx
)
871 my_state
*ms
= *rctx
;
872 struct MHD_Response
*resp
;
885 DPRINTF("PROXY DELETE %s\n",url
);
887 rc
= main_func_tbl
->delete_func(ms
->bucket
,ms
->key
,(char *)url
);
892 copied_url
= strdup(url
);
894 bucket
= strtok_r(copied_url
,"/",&stctx
);
895 key
= strtok_r(NULL
,"/",&stctx
);
896 meta_delete(bucket
,key
);
899 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
903 MHD_add_response_header(resp
,"Content-Type","text/xml");
904 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
905 MHD_destroy_response(resp
);
907 replicate_delete((char *)url
);
912 /* TBD: get actual bucket list */
918 fake_bucket_t fake_bucket_list
[] = {
919 { "bucket_factory", "_new" },
920 { "provider_list", "_providers" },
924 root_blob_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
929 const char *accept_hdr
;
936 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
938 host
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,"Host");
941 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
945 ms
->cleanup
|= CLEANUP_TMPL
;
946 ms
->gen_ctx
->base
= host
;
947 len
= tmpl_root_header(ms
->gen_ctx
,"image_warehouse","1.0");
954 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
958 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
962 if (ms
->gen_ctx
->index
< ARRAY_SIZE(fake_bucket_list
)) {
963 fb
= fake_bucket_list
+ ms
->gen_ctx
->index
;
964 len
= tmpl_root_entry(ms
->gen_ctx
,fb
->rel
,fb
->link
);
971 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
975 if (meta_query_next(ms
->query
,&bucket
,&key
)) {
976 len
= tmpl_root_entry(ms
->gen_ctx
,"bucket",bucket
);
983 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
987 len
= tmpl_root_footer(ms
->gen_ctx
);
994 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
996 ms
->cleanup
&= ~CLEANUP_TMPL
;
997 ms
->gen_ctx
= TMPL_CTX_DONE
;
1002 proxy_api_root (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1003 const char *method
, const char *version
, const char *data
,
1004 size_t *data_size
, void **rctx
)
1006 struct MHD_Response
*resp
= NULL
;
1007 unsigned int rc
= MHD_HTTP_OK
;
1008 my_state
*ms
= *rctx
;
1015 DPRINTF("PROXY API ROOT (%s, %zu)\n",url
,*data_size
);
1017 ms
->query
= meta_query_new(NULL
,"_default",NULL
);
1022 ms
->cleanup
|= CLEANUP_QUERY
;
1024 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1025 65536, root_blob_generator
, ms
, simple_closer
);
1029 MHD_add_response_header(resp
,"Content-Type","text/xml");
1030 MHD_queue_response(conn
,rc
,resp
);
1031 MHD_destroy_response(resp
);
1038 post_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1039 const char *filename
, const char *content_type
,
1040 const char *transfer_encoding
, const char *data
,
1041 uint64_t off
, size_t size
)
1050 (void)transfer_encoding
;
1053 printf("adding %s, size=%zu\n",key
,size
);
1055 // TBD: don't assume that values are null-terminated strings
1056 old_val
= g_hash_table_lookup(ctx
,key
);
1058 old_len
= strlen(old_val
);
1059 new_val
= malloc(old_len
+size
+1);
1063 memcpy(new_val
,old_val
,old_len
);
1064 memcpy(new_val
+old_len
,data
,size
);
1065 new_val
[old_len
+size
] = '\0';
1068 new_val
= malloc(size
+1);
1072 memcpy(new_val
,data
,size
);
1073 new_val
[size
] = '\0';
1076 g_hash_table_insert(ctx
,strdup(key
),new_val
);
1077 /* TBD: check return value for strdups (none avail for insert) */
1081 /* Returns TRUE if we found an *invalid* key. */
1083 post_find (gpointer key
, gpointer value
, gpointer ctx
)
1088 if (!is_reserved(key
,reserved_attr
)) {
1092 DPRINTF("bad attr %s\n", (char *)key
);
1097 post_foreach (gpointer key
, gpointer value
, gpointer ctx
)
1101 DPRINTF("setting %s = %s for %s/%s\n",(char *)key
, (char *)value
,
1102 ms
->bucket
,ms
->key
);
1103 meta_set_value(ms
->bucket
,ms
->key
,key
,value
);
1107 create_bucket (char *name
)
1111 if (is_reserved(name
,reserved_name
)) {
1112 return MHD_HTTP_BAD_REQUEST
;
1115 rc
= main_func_tbl
->bcreate_func(name
);
1116 if (rc
== MHD_HTTP_OK
) {
1117 if (meta_set_value(name
,"_default", "_policy","0") != 0) {
1118 DPRINTF("default-policy " "create failed\n");
1121 DPRINTF("created bucket %s\n",name
);
1123 * There's not a whole lot to do about bucket-creation
1124 * failures on replicas, other than to report them, unless
1125 * we adopt an "all or nothing" approach and unwind the
1126 * create on the primary as well. Then what if that fails?
1127 * It's just one example of the general "fewer replicas
1128 * than desired" distributed-system problem, not worth a
1129 * point solution here/now. Revisit when we have a more
1130 * general replica-repair policy/system in place.
1132 replicate_bcreate(name
);
1139 proxy_bucket_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1140 const char *method
, const char *version
, const char *data
,
1141 size_t *data_size
, void **rctx
)
1143 struct MHD_Response
*resp
;
1144 my_state
*ms
= *rctx
;
1152 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1154 if (ms
->state
== MS_NEW
) {
1155 ms
->state
= MS_NORMAL
;
1156 ms
->url
= (char *)url
;
1157 ms
->dict
= g_hash_table_new_full(
1158 g_str_hash
,g_str_equal
,free
,free
);
1159 ms
->cleanup
|= CLEANUP_DICT
;
1160 ms
->post
= MHD_create_post_processor(conn
,4096,
1161 post_iterator
,ms
->dict
);
1162 ms
->cleanup
|= CLEANUP_POST
;
1164 else if (*data_size
) {
1165 MHD_post_process(ms
->post
,data
,*data_size
);
1169 rc
= MHD_HTTP_BAD_REQUEST
;
1170 key
= g_hash_table_lookup(ms
->dict
,"key");
1172 strncpy(ms
->key
,key
,MAX_FIELD_LEN
-1);
1173 g_hash_table_remove(ms
->dict
,"key");
1174 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1175 g_hash_table_foreach(ms
->dict
,post_foreach
,ms
);
1176 DPRINTF("rereplicate (bucket POST)\n");
1177 recheck_replication(ms
,NULL
);
1181 else if (!strcmp(ms
->bucket
,"_new")) {
1182 key
= g_hash_table_lookup(ms
->dict
,"name");
1184 rc
= create_bucket(key
);
1188 DPRINTF("A parameter is MISSING (fail)\n");
1190 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1192 fprintf(stderr
,"MHD_crfd failed\n");
1195 MHD_queue_response(conn
,rc
,resp
);
1196 MHD_destroy_response(resp
);
1204 check_location (my_state
*ms
)
1206 char *loc
= g_hash_table_lookup(ms
->dict
,"depot");
1209 DPRINTF("missing loc on check for %s/%s\n",ms
->bucket
,ms
->key
);
1210 return MHD_HTTP_BAD_REQUEST
;
1213 if (!meta_has_copy(ms
->bucket
,ms
->key
,loc
)) {
1214 DPRINTF("did not find %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1215 return MHD_HTTP_NOT_FOUND
;
1218 /* TBD: meta_has_copy returns an etag which we should check */
1219 DPRINTF("found %s/%s at %s\n",ms
->bucket
,ms
->key
,loc
);
1224 register_image (my_state
*ms
)
1231 site
= g_hash_table_lookup(ms
->dict
,"site");
1233 printf("site MISSING\n");
1234 return MHD_HTTP_BAD_REQUEST
;
1237 next
= index(site
,':');
1242 for (i
= 0; get_provider(i
,&prov
); ++i
) {
1243 if (strcmp(prov
.name
,site
)) {
1246 return prov
.func_tbl
->register_func(ms
,&prov
,next
,ms
->dict
);
1249 DPRINTF("site %s not found\n",site
);
1250 return MHD_HTTP_BAD_REQUEST
;
1254 proxy_object_post (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1255 const char *method
, const char *version
, const char *data
,
1256 size_t *data_size
, void **rctx
)
1258 struct MHD_Response
*resp
;
1259 my_state
*ms
= *rctx
;
1267 DPRINTF("PROXY POST (%s, %zu)\n",url
,*data_size
);
1269 if (ms
->state
== MS_NEW
) {
1270 ms
->state
= MS_NORMAL
;
1271 ms
->url
= (char *)url
;
1272 ms
->dict
= g_hash_table_new_full(
1273 g_str_hash
,g_str_equal
,free
,free
);
1274 ms
->cleanup
|= CLEANUP_DICT
;
1275 ms
->post
= MHD_create_post_processor(conn
,4096,
1276 post_iterator
,ms
->dict
);
1277 ms
->cleanup
|= CLEANUP_POST
;
1279 else if (*data_size
) {
1280 MHD_post_process(ms
->post
,data
,*data_size
);
1284 rc
= MHD_HTTP_BAD_REQUEST
;
1285 if (!g_hash_table_find(ms
->dict
,post_find
,ms
)) {
1286 op
= g_hash_table_lookup(ms
->dict
,"op");
1288 if (!strcmp(op
,"push")) {
1289 DPRINTF("rereplicate (obj POST)\n");
1290 recheck_replication(ms
,NULL
);
1293 else if (!strcmp(op
,"check")) {
1294 rc
= check_location(ms
);
1296 else if (!strcmp(op
,"register")) {
1297 rc
= register_image(ms
);
1300 DPRINTF("unknown op %s for %s/%s\n",
1301 op
, ms
->bucket
, ms
->key
);
1305 DPRINTF("op is MISSING (fail)\n");
1308 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1310 fprintf(stderr
,"MHD_crfd failed\n");
1313 MHD_queue_response(conn
,rc
,resp
);
1314 MHD_destroy_response(resp
);
1324 prov_list_generator (void *ctx
, uint64_t pos
, char *buf
, int max
)
1329 const char *accept_hdr
;
1333 accept_hdr
= MHD_lookup_connection_value(ms
->conn
,MHD_HEADER_KIND
,
1337 ms
->gen_ctx
= tmpl_get_ctx(accept_hdr
);
1341 ms
->cleanup
|= CLEANUP_TMPL
;
1342 len
= tmpl_prov_header(ms
->gen_ctx
);
1349 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1353 if (ms
->gen_ctx
== TMPL_CTX_DONE
) {
1357 if (get_provider(ms
->gen_ctx
->index
,&prov
)) {
1358 len
= tmpl_prov_entry(ms
->gen_ctx
,prov
.name
,prov
.type
,
1359 prov
.host
, prov
.port
, prov
.username
, prov
.password
);
1366 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1370 len
= tmpl_prov_footer(ms
->gen_ctx
);
1377 memcpy(buf
,ms
->gen_ctx
->buf
,len
);
1379 ms
->cleanup
&= ~CLEANUP_TMPL
;
1380 ms
->gen_ctx
= TMPL_CTX_DONE
;
1385 proxy_list_provs (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1386 const char *method
, const char *version
, const char *data
,
1387 size_t *data_size
, void **rctx
)
1389 struct MHD_Response
*resp
;
1390 my_state
*ms
= *rctx
;
1399 resp
= MHD_create_response_from_callback(MHD_SIZE_UNKNOWN
,
1400 65536, prov_list_generator
, ms
, simple_closer
);
1402 fprintf(stderr
,"MHD_crfd failed\n");
1406 MHD_queue_response(conn
,MHD_HTTP_OK
,resp
);
1407 MHD_destroy_response(resp
);
1413 prov_iterator (void *ctx
, enum MHD_ValueKind kind
, const char *key
,
1414 const char *filename
, const char *content_type
,
1415 const char *transfer_encoding
, const char *data
,
1416 uint64_t off
, size_t size
)
1421 (void)transfer_encoding
;
1424 g_hash_table_insert(ctx
,strdup(key
),strndup(data
,size
));
1425 /* TBD: check return value for strdups (none avail for insert) */
1431 proxy_update_prov (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1432 const char *method
, const char *version
, const char *data
,
1433 size_t *data_size
, void **rctx
)
1435 struct MHD_Response
*resp
;
1436 my_state
*ms
= *rctx
;
1446 if (ms
->state
== MS_NEW
) {
1447 ms
->state
= MS_NORMAL
;
1448 ms
->url
= (char *)url
;
1449 ms
->dict
= g_hash_table_new_full(
1450 g_str_hash
,g_str_equal
,free
,free
);
1451 ms
->cleanup
|= CLEANUP_DICT
;
1452 ms
->post
= MHD_create_post_processor(conn
,4096,
1453 prov_iterator
,ms
->dict
);
1454 ms
->cleanup
|= CLEANUP_POST
;
1456 else if (*data_size
) {
1457 MHD_post_process(ms
->post
,data
,*data_size
);
1461 rc
= MHD_HTTP_BAD_REQUEST
;
1462 provider
= g_hash_table_lookup(ms
->dict
,"provider");
1463 username
= g_hash_table_lookup(ms
->dict
,"username");
1464 password
= g_hash_table_lookup(ms
->dict
,"password");
1465 if (provider
&& username
&& password
) {
1466 update_provider(provider
,username
,password
);
1470 DPRINTF("provider/username/password MISSING\n");
1472 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1474 fprintf(stderr
,"MHD_crfd failed\n");
1477 MHD_queue_response(conn
,rc
,resp
);
1478 MHD_destroy_response(resp
);
1486 proxy_create_bucket (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1487 const char *method
, const char *version
, const char *data
,
1488 size_t *data_size
, void **rctx
)
1490 struct MHD_Response
*resp
;
1491 my_state
*ms
= *rctx
;
1501 /* curl -T moo.empty http://localhost:9090/_new by accident */
1502 rc
= create_bucket(ms
->bucket
);
1504 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1506 fprintf(stderr
,"MHD_crfd failed\n");
1509 MHD_queue_response(conn
,rc
,resp
);
1510 MHD_destroy_response(resp
);
1517 { /* get bucket list */
1518 "GET", URL_ROOT
, proxy_api_root
},
1519 { /* get object list */
1520 "GET", URL_BUCKET
, proxy_list_objs
},
1521 { /* create bucket */
1522 "PUT", URL_BUCKET
, proxy_create_bucket
},
1523 { /* get object data */
1524 "GET", URL_OBJECT
, proxy_get_data
},
1525 { /* get attribute data */
1526 "GET", URL_ATTR
, proxy_get_attr
},
1527 { /* put object data */
1528 "PUT", URL_OBJECT
, proxy_put_data
},
1529 { /* put attribute data */
1530 "PUT", URL_ATTR
, proxy_put_attr
},
1531 { /* create object and/or modify attributes */
1532 "POST", URL_BUCKET
, proxy_bucket_post
},
1533 { /* perform control operations on an object */
1534 "POST", URL_OBJECT
, proxy_object_post
},
1536 "POST", URL_QUERY
, proxy_query
},
1537 { /* delete object */
1538 "DELETE", URL_OBJECT
, proxy_delete
},
1539 { /* delete attribute (TBD) */
1540 "DELETE", URL_ATTR
, NULL
},
1541 { /* get provider list */
1542 "GET", URL_PROVLIST
, proxy_list_provs
},
1543 { /* update a provider */
1544 "POST", URL_PROVLIST
, proxy_update_prov
},
1549 parse_url (const char *url
, my_state
*ms
)
1551 unsigned short esize
;
1552 unsigned short eindex
;
1553 char *parts
[URL_INVAL
];
1555 if (strstr(url
,"../")) {
1556 /* Naughty, naughty. Never a good reason to allow this. */
1557 DPRINTF("Rejecting ../ in path.\n");
1562 parts
[URL_BUCKET
] = ms
->bucket
;
1563 parts
[URL_OBJECT
] = ms
->key
;
1564 parts
[URL_ATTR
] = ms
->attr
;
1567 while (*url
== '/') {
1572 if (eindex
== URL_BUCKET
) {
1573 if (!strcmp(ms
->bucket
,"_providers")) {
1574 eindex
= URL_PROVLIST
;
1577 else if (eindex
== URL_OBJECT
) {
1578 if (!strcmp(ms
->key
,"_query")) {
1585 if (++eindex
>= URL_INVAL
) {
1590 while (*url
&& (*url
!= '/')) {
1591 parts
[eindex
][esize
++] = *(url
++);
1592 if (esize
>= MAX_FIELD_LEN
) {
1602 access_handler (void *cctx
, struct MHD_Connection
*conn
, const char *url
,
1603 const char *method
, const char *version
, const char *data
,
1604 size_t *data_size
, void **rctx
)
1608 struct MHD_Response
*resp
;
1609 my_state
*ms
= *rctx
;
1612 return ms
->handler(cctx
,conn
,url
,method
,version
,
1613 data
,data_size
,rctx
);
1616 ms
= calloc(sizeof(*ms
), 1);
1621 utype
= parse_url(url
,ms
);
1623 for (i
= 0; my_rules
[i
].method
; ++i
) {
1624 if (utype
!= my_rules
[i
].utype
) {
1627 if (strcmp(method
,my_rules
[i
].method
)) {
1630 if (!my_rules
[i
].handler
) {
1633 ms
->handler
= my_rules
[i
].handler
;
1640 return ms
->handler(cctx
,conn
,url
,method
,version
,
1641 data
,data_size
,rctx
);
1644 if (!strcmp(method
,"QUIT")) {
1645 (void)sem_post((sem_t
*)cctx
);
1649 fprintf(stderr
,"bad request m=%s u=%s\n",method
,url
);
1652 resp
= MHD_create_response_from_data(0,NULL
,MHD_NO
,MHD_NO
);
1656 MHD_queue_response(conn
,MHD_HTTP_NOT_FOUND
,resp
);
1657 MHD_destroy_response(resp
);
1661 /* These enum values cannot possibly conflict with the option values
1662 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1663 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1666 GETOPT_HELP_CHAR
= (CHAR_MIN
- 2),
1667 GETOPT_VERSION_CHAR
= (CHAR_MIN
- 3)
1670 struct option my_options
[] = {
1671 { "config", required_argument
, NULL
, 'c' },
1672 { "db", required_argument
, NULL
, 'd' },
1673 { "master", required_argument
, NULL
, 'm' },
1674 { "port", required_argument
, NULL
, 'p' },
1675 { "verbose", no_argument
, NULL
, 'v' },
1676 { "version", no_argument
, NULL
, GETOPT_VERSION_CHAR
},
1677 { "help", no_argument
, NULL
, GETOPT_HELP_CHAR
},
1678 { NULL
, 0, NULL
, '\0' }
1684 if (status
!= EXIT_SUCCESS
)
1685 fprintf (stderr
, _("Try `%s --help' for more information.\n"),
1690 Usage: %s [OPTION]\n\
1694 Deltacloud image-warehouse daemon.\n\
1695 A configuration file must be specified.\n\
1697 -c, --config=FILE config file [required]\n\
1698 -d, --db=HOST_PORT database server as ip[:port]\n\
1699 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1700 -p, --port=PORT alternate listen port (default 9090)\n\
1701 -v, --verbose verbose/debug output\n\
1703 --help display this help and exit\n\
1704 --version output version information and exit\n\
1708 Report %s bugs to %s.\n\
1710 program_name
, PACKAGE_BUGREPORT
);
1716 main (int argc
, char **argv
)
1718 struct MHD_Daemon
*the_daemon
;
1723 program_name
= argv
[0];
1725 for (;;) switch (getopt_long(argc
,argv
,"c:d:m:p:v",my_options
,NULL
)) {
1731 db_host
= strtok_r(optarg
,":",&stctx
);
1732 port_tmp
= strtok_r(NULL
,":",&stctx
);
1734 db_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1739 master_host
= strtok_r(optarg
,":",&stctx
);
1740 port_tmp
= strtok_r(NULL
,":",&stctx
);
1742 master_port
= (unsigned short)strtoul(port_tmp
,NULL
,10);
1746 my_port
= (unsigned short)strtoul(optarg
,NULL
,10);
1751 case GETOPT_HELP_CHAR
:
1752 usage(EXIT_SUCCESS
);
1754 case GETOPT_VERSION_CHAR
:
1755 printf ("%s version %s\n", program_name
, PACKAGE_VERSION
);
1756 exit (EXIT_SUCCESS
);
1762 usage(EXIT_FAILURE
);
1768 error (0, 0, "no configuration file specified");
1769 usage (EXIT_FAILURE
);
1772 me
= parse_config();
1774 fprintf(stderr
,"could not parse %s\n",cfg_file
);
1778 sem_init(&the_sem
,0,0);
1781 main_func_tbl
= &s3_func_tbl
;
1784 main_func_tbl
= &curl_func_tbl
;
1788 main_func_tbl
= &fs_func_tbl
;
1792 printf("primary store type is %s\n",main_func_tbl
->name
);
1794 printf("operating as slave to %s:%u\n",
1795 master_host
, master_port
);
1797 printf("db is at %s:%u\n",db_host
,db_port
);
1798 printf("will listen on port %u\n",my_port
);
1799 printf("my location is \"%s\"\n",me
);
1804 main_func_tbl
->init_func();
1809 * Gotcha: if we don't set the connection memory limit explicitly,
1810 * the per-connection buffer for MHD will be smaller than that used
1811 * by CURL, so proxy_writefunc will never be able to do its job.
1813 the_daemon
= MHD_start_daemon(MY_MHD_FLAGS
,
1814 my_port
, NULL
, NULL
, &access_handler
, &the_sem
,
1815 MHD_OPTION_CONNECTION_MEMORY_LIMIT
, (size_t)1048576,
1818 fprintf(stderr
,"Could not create daemon.\n");