parser: move x*alloc functions into #if-unit-test block where used
[iwhd.git] / rest.c
blob8f42be3d4c3f00e6ea1058ff061eba016876473b
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <error.h>
19 #include <fcntl.h>
20 #include <getopt.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <strings.h>
29 #include <unistd.h>
30 #include <sys/stat.h>
31 #include <assert.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
36 #include <glib.h>
38 #include "iwh.h"
39 #include "meta.h"
40 #include "backend.h"
41 #include "setup.h"
42 #include "replica.h"
43 #include "template.h"
44 #include "mpipe.h"
45 #include "state_defs.h"
47 /* Define-away for now. Eventually, define to gettext. */
48 #define _(msgid) (msgid)
50 #if defined(DEBUG)
51 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
52 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
53 #else
54 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
55 #endif
57 typedef enum {
58 URL_ROOT=0, URL_BUCKET, URL_OBJECT, URL_ATTR, URL_INVAL,
59 URL_QUERY, URL_PROVLIST
60 } url_type;
62 typedef struct {
63 char *method;
64 url_type utype;
65 MHD_AccessHandlerCallback handler;
66 } rule;
68 static unsigned short my_port = MY_PORT;
69 const char *program_name;
70 char *cfg_file = NULL;
72 static char *(reserved_name[]) = { "_default", "_query", "_new", NULL };
73 static char *(reserved_attr[]) = { "bucket", "key", "date", "etag", "loc", NULL };
75 void
76 free_ms (my_state *ms)
78 if (!g_atomic_int_dec_and_test(&ms->refcnt)) {
79 return;
82 if (ms->cleanup & CLEANUP_BUF_PTR) {
83 free(ms->pipe.data_ptr);
86 if (ms->cleanup & CLEANUP_POST) {
87 MHD_destroy_post_processor(ms->post);
90 if (ms->cleanup & CLEANUP_DICT) {
91 g_hash_table_destroy(ms->dict);
94 if (ms->cleanup & CLEANUP_QUERY) {
95 meta_query_stop(ms->query);
98 if (ms->cleanup & CLEANUP_TMPL) {
99 free(ms->gen_ctx);
102 if (ms->cleanup & CLEANUP_URL) {
103 free(ms->url);
106 if (ms->cleanup & CLEANUP_AQUERY) {
107 meta_attr_stop(ms->aquery);
110 free(ms);
113 static int
114 validate_put (struct MHD_Connection *conn)
116 const char *mhdr;
118 if (!master_host) {
119 /* We're not a slave, so we don't care. */
120 return 1;
123 mhdr = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
124 "X-redhat-role");
125 return (mhdr && !strcmp(mhdr,"master"));
128 static int
129 is_reserved (const char *cand, char **resv_list)
131 int i;
133 for (i = 0; resv_list[i]; ++i) {
134 if (!strcmp(cand,resv_list[i])) {
135 return TRUE;
139 return FALSE;
142 static int
143 validate_url (const char *url)
145 char *slash = rindex(url,'/');
147 if (!slash) {
148 /* There should be at least one betwixt bucket and key. */
149 return 0;
152 return !is_reserved(slash+1,reserved_name);
155 /**********
156 * The proxy has MHD on one side and CURL on the other. The CURL side is
157 * always run in a child thread. Yes, there are both context switches
158 * and copies between the threads. Get over it. The focus here is on
159 * supporting multi-way replication on PUT, with minimal complexity. These
160 * were the two easiest libraries to use, and they both want to allocate
161 * their own buffers so we're kind of stuck with the copies unless we want
162 * to buffer whole files in memory (which would have to be virtual since
163 * they're potentialy bigger than physical) or explicitly ping them through
164 * a local filesystem. We could potentially take over scheduling from one
165 * or both to avoid some of the context switching, but those interfaces are
166 * significantly more error-prone and (at least in CURL's case) buggy.
168 * For a GET, the CURL child acts as a producer while the MHD parent acts
169 * as consumer. For a PUT, the MHD parent is the producer and the CURL
170 * child is the consumer. For GET the MHD component is invoked via a
171 * callback set up in the access handler; for PUT it's invoked via repeated
172 * calls to the access handler itself. Either way, the producer's job is
173 * to post its pointer+length to the my_state structure and then wait for
174 * all consumers to check back in saying they've finished it. This might
175 * involve multiple passes through each consumer for one pass through the
176 * single producer. When the producer is done, it does a similar handshake
177 * with the consumers. Each consumer has its own pipe_private structure,
178 * containing a pointer to the shared my_state plus a per-consumer offset
179 * into the current chunk.
181 * Attribute functions don't use CURL, so they do much simpler in-memory
182 * buffering. Queries also don't use CURL, but the MHD POST interface
183 * introduces some of its own complexity so see below for that.
184 **********/
186 static void
187 simple_closer (void *ctx)
189 my_state *ms = ctx;
191 DPRINTF("%s: cleaning up\n",__func__);
192 free_ms(ms);
195 static void
196 child_closer (void * ctx)
198 pipe_private *pp = ctx;
200 DPRINTF("in %s\n",__func__);
202 free(pp);
205 /* Invoked from MHD. */
206 static int
207 proxy_get_cons (void *ctx, uint64_t pos, char *buf, int max)
209 pipe_private *pp = ctx;
210 pipe_shared *ps = pp->shared;
211 my_state *ms = ps->owner;
212 int done;
213 void *child_res;
215 (void)pos;
217 DPRINTF("consumer asked to read %d\n",max);
219 if (pipe_cons_wait(pp)) {
220 DPRINTF("consumer offset %zu into %zu\n",
221 pp->offset, ps->data_len);
222 done = ps->data_len - pp->offset;
223 if (done > max) {
224 done = max;
226 memcpy(buf,ps->data_ptr+pp->offset,done);
227 pp->offset += done;
228 DPRINTF("consumer copied %d, new offset %zu\n",
229 done, pp->offset);
230 if (pp->offset == ps->data_len) {
231 DPRINTF("consumer finished chunk\n");
232 pipe_cons_signal(pp, 0);
235 else {
236 done = -1;
239 if (done == (-1)) {
240 child_res = NULL;
241 pthread_join(ms->backend_th,&child_res);
242 if (child_res == THREAD_FAILED) {
243 DPRINTF("GET producer failed\n");
244 /* Nothing we can do; already sent status. */
246 if (ms->from_master) {
247 pthread_join(ms->cache_th,NULL);
248 /* TBD: do something about cache failure? */
250 free_ms(ms);
253 return done;
256 static int
257 proxy_get_data (void *cctx, struct MHD_Connection *conn, const char *url,
258 const char *method, const char *version, const char *data,
259 size_t *data_size, void **rctx)
261 struct MHD_Response *resp;
262 my_state *ms = *rctx;
263 pipe_private *pp;
264 pipe_private *pp2;
265 char *my_etag;
266 const char *user_etag;
267 int rc;
269 (void)cctx;
270 (void)method;
271 (void)version;
272 (void)data;
273 (void)data_size;
275 DPRINTF("PROXY GET DATA %s\n",url);
277 ms->url = strdup(url);
278 if (!ms->url) {
279 return MHD_NO;
281 ms->cleanup |= CLEANUP_URL;
283 my_etag = meta_has_copy(ms->bucket,ms->key,me);
284 if (!my_etag) {
285 DPRINTF("falling back to local for %s/%s\n",ms->bucket,ms->key);
286 ms->from_master = 0;
288 else if (*my_etag) {
289 user_etag = MHD_lookup_connection_value(
290 conn, MHD_HEADER_KIND, "If-None-Match");
291 if (user_etag && !strcmp(user_etag,my_etag)) {
292 DPRINTF("ETag match!\n");
293 free(my_etag);
294 resp = MHD_create_response_from_data(0,NULL,
295 MHD_NO,MHD_NO);
296 MHD_queue_response(conn,MHD_HTTP_NOT_MODIFIED,resp);
297 MHD_destroy_response(resp);
298 return MHD_YES;
300 free(my_etag);
301 ms->from_master = 0;
303 else {
304 DPRINTF("%s/%s not found locally\n",ms->bucket,ms->key);
305 if (!master_host) {
306 DPRINTF(" that means it doesn't exist\n");
307 resp = MHD_create_response_from_data(0,NULL,
308 MHD_NO,MHD_NO);
309 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
310 MHD_destroy_response(resp);
311 free_ms(ms);
312 return MHD_YES;
314 DPRINTF(" will fetch from %s:%u\n", master_host,master_port);
315 ms->from_master = 1;
318 pipe_init_shared(&ms->pipe,ms,ms->from_master+1);
319 pp = pipe_init_private(&ms->pipe);
320 if (!pp) {
321 return MHD_NO;
323 ms->thunk.parent = ms;
324 ms->thunk.prov = ms->from_master ? master_prov : main_prov;
325 pthread_create(&ms->backend_th,NULL,
326 ms->thunk.prov->func_tbl->get_child_func,&ms->thunk);
327 /* TBD: check return value */
329 if (ms->from_master) {
330 pp2 = pipe_init_private(&ms->pipe);
331 if (!pp2) {
332 return MHD_NO;
334 pp2->prov = main_prov;
335 pthread_create(&ms->cache_th,NULL,
336 main_prov->func_tbl->cache_child_func,pp2);
337 /* TBD: check return value */
339 else {
340 pp2 = NULL;
343 rc = pipe_cons_wait_init(&ms->pipe);
344 ms->rc = (rc == 0) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR;
346 resp = MHD_create_response_from_callback(
347 MHD_SIZE_UNKNOWN, 65536, proxy_get_cons, pp, child_closer);
348 if (!resp) {
349 fprintf(stderr,"MHD_crfc failed\n");
350 if (pp2) {
351 /* TBD: terminate thread */
352 free(pp2);
354 child_closer(pp);
355 return MHD_NO;
357 MHD_queue_response(conn,ms->rc,resp);
358 MHD_destroy_response(resp);
360 return MHD_YES;
363 static void
364 recheck_replication (my_state * ms, char *policy)
366 int rc;
367 int free_it = FALSE;
368 char fixed[MAX_FIELD_LEN];
370 if (is_reserved(ms->key,reserved_name)) {
371 DPRINTF("declining to replicate reserved object %s\n",ms->key);
372 return;
375 if (!policy && ms->dict) {
376 DPRINTF("using new policy for %s/%s\n",ms->bucket,ms->key);
377 policy = g_hash_table_lookup(ms->dict,"_policy");
380 if (!policy) {
381 /* If we get a policy here or below, we have to free it. */
382 free_it = TRUE;
383 DPRINTF("fetching policy for %s/%s\n",ms->bucket,ms->key);
384 rc = meta_get_value(ms->bucket,ms->key, "_policy", &policy);
387 if (!policy) {
388 DPRINTF(" inheriting policy from %s\n",ms->bucket);
389 rc = meta_get_value(ms->bucket,
390 "_default", "_policy", &policy);
393 if (policy) {
394 DPRINTF(" implementing policy %s\n",policy);
396 * Can't use ms->url here because it might be a bucket POST
397 * and in that case ms->url points to the bucket.
399 snprintf(fixed,sizeof(fixed),"%s/%s",ms->bucket,ms->key);
400 replicate(fixed,0,policy,ms);
401 if (free_it) {
402 free(policy);
405 else {
406 DPRINTF(" could not find a policy anywhere!\n");
410 static int
411 proxy_put_data (void *cctx, struct MHD_Connection *conn, const char *url,
412 const char *method, const char *version, const char *data,
413 size_t *data_size, void **rctx)
415 struct MHD_Response *resp;
416 my_state *ms = *rctx;
417 pipe_private *pp;
418 int rc;
419 char *etag = NULL;
420 void *child_res;
422 (void)cctx;
423 (void)method;
424 (void)version;
426 DPRINTF("PROXY PUT DATA %s (%zu)\n",url,*data_size);
428 if (ms->state == MS_NEW) {
429 if (!validate_put(conn) || !validate_url(url)) {
430 DPRINTF("rejecting %s\n",url);
431 resp = MHD_create_response_from_data(0,NULL,
432 MHD_NO,MHD_NO);
433 if (!resp) {
434 return MHD_NO;
436 MHD_queue_response(conn,MHD_HTTP_FORBIDDEN,resp);
437 MHD_destroy_response(resp);
438 return MHD_YES;
440 ms->state = MS_NORMAL;
441 ms->url = strdup(url);
442 if (!ms->url) {
443 return MHD_NO;
445 ms->cleanup |= CLEANUP_URL;
446 ms->size = 0;
447 pipe_init_shared(&ms->pipe,ms,1);
448 pp = pipe_init_private(&ms->pipe);
449 if (!pp) {
450 return MHD_NO;
452 pp->prov = main_prov;
453 ms->be_flags = BACKEND_GET_SIZE;
454 pthread_create(&ms->backend_th,NULL,
455 main_prov->func_tbl->put_child_func,pp);
456 /* TBD: check return value */
459 * Do the initial handshake with children. If we return from
460 * this callback without an error response, Microhttpd posts
461 * the "100 Continue" header and the client starts sending
462 * the data. We must report errors here or forever keep
463 * out peace.
465 rc = pipe_prod_wait_init(&ms->pipe);
466 if (rc != 0) {
467 DPRINTF("producer wait failed\n");
468 resp = MHD_create_response_from_data(0,NULL,
469 MHD_NO,MHD_NO);
470 if (!resp) {
471 return MHD_NO;
473 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
474 resp);
475 MHD_destroy_response(resp);
476 } else if (rc > 0) {
478 * Note that we fail here even if 1 of N replicas fail.
479 * Might want to fix this when we start looping over
480 * pipe_init_private() above.
482 DPRINTF("producer replicas failed (%u of %u)\n",
483 rc, ms->pipe.cons_total);
484 resp = MHD_create_response_from_data(0,NULL,
485 MHD_NO,MHD_NO);
486 if (!resp) {
487 return MHD_NO;
489 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
490 resp);
491 MHD_destroy_response(resp);
492 } else {
493 DPRINTF("producer proceeding\n");
496 else if (*data_size) {
497 pipe_prod_signal(&ms->pipe,(void *)data,*data_size);
498 ms->size += *data_size;
499 DPRINTF("producer chunk finished\n");
500 *data_size = 0;
502 else {
503 pipe_prod_finish(&ms->pipe);
504 pthread_join(ms->backend_th,&child_res);
505 if (child_res == THREAD_FAILED) {
506 DPRINTF("thread failed\n");
507 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
509 else if (ms->pipe.cons_error == ms->pipe.cons_total) {
510 DPRINTF("all %u consumers failed\n",
511 ms->pipe.cons_error);
512 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
514 else {
515 if (master_host) {
516 meta_got_copy(ms->bucket,ms->key,me);
517 etag = NULL;
519 else {
520 etag = meta_did_put(ms->bucket,ms->key,me,
521 ms->size);
523 DPRINTF("rereplicate (obj PUT)\n");
524 recheck_replication(ms,NULL);
525 rc = MHD_HTTP_OK;
527 free_ms(ms);
528 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
529 if (!resp) {
530 free(etag);
531 return MHD_NO;
533 if (etag) {
534 MHD_add_response_header(resp,"ETag",etag);
535 free(etag);
537 MHD_queue_response(conn,rc,resp);
538 MHD_destroy_response(resp);
541 return MHD_YES;
544 static int
545 proxy_get_attr (void *cctx, struct MHD_Connection *conn, const char *url,
546 const char *method, const char *version, const char *data,
547 size_t *data_size, void **rctx)
549 struct MHD_Response *resp;
550 char *fixed;
551 my_state *ms = *rctx;
552 int rc = MHD_HTTP_NOT_FOUND;
554 (void)cctx;
555 (void)method;
556 (void)version;
557 (void)data;
558 (void)data_size;
560 DPRINTF("PROXY GET ATTR %s\n",url);
562 if (meta_get_value(ms->bucket,ms->key,ms->attr,&fixed) == 0) {
563 resp = MHD_create_response_from_data(strlen(fixed),fixed,
564 MHD_YES,MHD_NO);
565 rc = MHD_HTTP_OK;
567 else {
568 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
570 if (!resp) {
571 return MHD_NO;
573 MHD_queue_response(conn,rc,resp);
574 MHD_destroy_response(resp);
576 free_ms(ms);
577 return MHD_YES;
580 static int
581 proxy_put_attr (void *cctx, struct MHD_Connection *conn, const char *url,
582 const char *method, const char *version, const char *data,
583 size_t *data_size, void **rctx)
585 struct MHD_Response *resp;
586 my_state *ms = *rctx;
587 const char *attrval;
588 int send_resp = 0;
590 (void)cctx;
591 (void)method;
592 (void)version;
594 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url,*data_size);
596 if (ms->state == MS_NEW) {
597 ms->state = MS_NORMAL;
598 ms->url = strdup(url);
599 if (!ms->url) {
600 return MHD_NO;
602 ms->cleanup |= CLEANUP_URL;
603 attrval = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
604 "X-redhat-value");
605 if (attrval) {
606 meta_set_value(ms->bucket,ms->key,ms->attr,
607 (char *)attrval);
608 send_resp = 1;
611 else if (*data_size) {
612 if (ms->pipe.data_len) {
613 ms->pipe.data_len += *data_size;
614 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
615 if (!p) {
616 return MHD_NO;
618 ms->pipe.data_ptr = p;
620 else {
621 ms->pipe.data_len = *data_size + 1;
622 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
623 if (!ms->pipe.data_ptr) {
624 return MHD_NO;
626 ((char *)ms->pipe.data_ptr)[0] = '\0';
627 ms->cleanup |= CLEANUP_BUF_PTR;
629 (void)strncat(ms->pipe.data_ptr,data,*data_size);
630 /* TBD: check return value */
631 *data_size = 0;
633 else {
634 if (!ms->pipe.data_ptr) {
635 return MHD_NO;
637 if (is_reserved(ms->attr,reserved_attr)) {
638 resp = MHD_create_response_from_data(
639 0,NULL,MHD_NO,MHD_NO);
640 if (!resp) {
641 return MHD_NO;
643 MHD_queue_response(conn,MHD_HTTP_BAD_REQUEST,
644 resp);
645 MHD_destroy_response(resp);
646 free_ms(ms);
647 return MHD_YES;
649 meta_set_value(ms->bucket,ms->key,ms->attr,ms->pipe.data_ptr);
650 /* This might get stomped by replication. */
651 if (ms->cleanup & CLEANUP_BUF_PTR) {
652 free(ms->pipe.data_ptr);
653 ms->cleanup &= ~CLEANUP_BUF_PTR;
656 * We should always re-replicate, because the replication
657 * policy might refer to this attr.
659 DPRINTF("rereplicate (attr PUT)\n");
660 recheck_replication(ms,NULL);
661 free_ms(ms);
662 send_resp = 1;
665 if (send_resp) {
666 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
667 if (!resp) {
668 return MHD_NO;
670 MHD_queue_response(conn,MHD_HTTP_CREATED,resp);
671 MHD_destroy_response(resp);
673 * TBD: check if the attribute was a replication policy, and
674 * start/stop replication activities as appropriate.
678 return MHD_YES;
681 /**********
682 * For queries, we have to deal with MHD's post-iterator interface (not
683 * quite the same as the data-iteration even though we use it that way) on
684 * one side, and a query-iterator interface on the other. Data on both
685 * sides could be quite large, so we can't just stick them in header lines.
686 * We do still buffer the query in memory, though. Once that's done, we do
687 * very simple parsing - it will be more complicated later - and create the
688 * query iterator. That's also driven by MHD, this time though the
689 * content-callback interface, and repeatedly calls in to the metadata
690 * module to fetch one object name at a time.
691 **********/
693 static int
694 query_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
695 const char *filename, const char *content_type,
696 const char *transfer_encoding, const char *data,
697 uint64_t off, size_t size)
699 (void)ctx;
700 (void)kind;
701 (void)key;
702 (void)filename;
703 (void)content_type;
704 (void)transfer_encoding;
705 (void)data;
706 (void)off;
707 (void)size;
709 /* We actually accumulate the data in proxy_query. */
710 return MHD_YES;
713 /* MHD reader function during queries. Return -1 for EOF. */
714 static int
715 proxy_query_func (void *ctx, uint64_t pos, char *buf, int max)
717 my_state *ms = ctx;
718 int len;
719 const char *accept_hdr;
720 char *bucket;
721 char *key;
723 (void)pos;
725 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
726 "Accept");
728 if (!ms->gen_ctx) {
729 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
730 if (!ms->gen_ctx) {
731 return -1;
733 ms->cleanup |= CLEANUP_TMPL;
734 len = tmpl_list_header(ms->gen_ctx);
735 if (!len) {
736 return -1;
738 if (len > max) {
739 len = max;
741 memcpy(buf,ms->gen_ctx->buf,len);
742 return len;
745 if (ms->gen_ctx == TMPL_CTX_DONE) {
746 return -1;
749 for(;;) {
750 if (!meta_query_next(ms->query,&bucket,&key)) {
751 break;
753 if (is_reserved(key,reserved_name)) {
754 continue;
756 len = tmpl_list_entry(ms->gen_ctx,bucket,key);
757 if (!len) {
758 return -1;
760 if (len > max) {
761 len = max;
763 memcpy(buf,ms->gen_ctx->buf,len);
764 return len;
767 len = tmpl_list_footer(ms->gen_ctx);
768 if (!len) {
769 return -1;
771 if (len > max) {
772 len = max;
774 memcpy(buf,ms->gen_ctx->buf,len);
775 free(ms->gen_ctx);
776 ms->cleanup &= ~CLEANUP_TMPL;
777 ms->gen_ctx = TMPL_CTX_DONE;
778 return len;
781 static int
782 proxy_query (void *cctx, struct MHD_Connection *conn, const char *url,
783 const char *method, const char *version, const char *data,
784 size_t *data_size, void **rctx)
786 struct MHD_Response *resp;
787 my_state *ms = *rctx;
789 (void)cctx;
790 (void)method;
791 (void)version;
793 DPRINTF("PROXY QUERY %s (%zu)\n",url,*data_size);
795 if (ms->state == MS_NEW) {
796 ms->state = MS_NORMAL;
797 ms->post = MHD_create_post_processor(conn,4096,
798 query_iterator,ms);
799 ms->cleanup |= CLEANUP_POST;
801 else if (*data_size) {
802 MHD_post_process(ms->post,data,*data_size);
803 if (ms->pipe.data_len) {
804 ms->pipe.data_len += *data_size;
805 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
806 if (!p) {
807 return MHD_NO;
809 ms->pipe.data_ptr = p;
811 else {
812 ms->pipe.data_len = *data_size + 1;
813 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
814 if (!ms->pipe.data_ptr) {
815 return MHD_NO;
817 ((char *)ms->pipe.data_ptr)[0] = '\0';
818 ms->cleanup |= CLEANUP_BUF_PTR;
820 (void)strncat(ms->pipe.data_ptr,data,*data_size);
821 /* TBD: check return value */
822 *data_size = 0;
824 else {
825 if (!ms->pipe.data_ptr) {
826 return MHD_NO;
828 ms->query = meta_query_new(ms->bucket,NULL,ms->pipe.data_ptr);
829 ms->cleanup |= CLEANUP_QUERY;
830 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
831 65536, proxy_query_func, ms, simple_closer);
832 if (!resp) {
833 fprintf(stderr,"MHD_crfc failed\n");
834 simple_closer(ms);
835 return MHD_NO;
837 MHD_queue_response(conn,MHD_HTTP_OK,resp);
838 MHD_destroy_response(resp);
839 free_ms(ms);
842 return MHD_YES;
845 static int
846 proxy_list_objs (void *cctx, struct MHD_Connection *conn, const char *url,
847 const char *method, const char *version, const char *data,
848 size_t *data_size, void **rctx)
850 my_state *ms = *rctx;
851 struct MHD_Response *resp;
853 (void)cctx;
854 (void)url;
855 (void)method;
856 (void)version;
857 (void)data;
858 (void)data_size;
860 ms->query = meta_query_new((char *)ms->bucket,NULL,NULL);
861 ms->cleanup |= CLEANUP_QUERY;
863 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
864 65536, proxy_query_func, ms, simple_closer);
865 if (!resp) {
866 fprintf(stderr,"MHD_crfc failed\n");
867 simple_closer(ms);
868 return MHD_NO;
871 MHD_queue_response(conn,MHD_HTTP_OK,resp);
872 MHD_destroy_response(resp);
873 return MHD_YES;
876 static int
877 proxy_delete (void *cctx, struct MHD_Connection *conn, const char *url,
878 const char *method, const char *version, const char *data,
879 size_t *data_size, void **rctx)
881 my_state *ms = *rctx;
882 struct MHD_Response *resp;
883 char *copied_url;
884 char *bucket;
885 char *key;
886 char *stctx = NULL;
887 int rc;
889 (void)cctx;
890 (void)method;
891 (void)version;
892 (void)data;
893 (void)data_size;
895 DPRINTF("PROXY DELETE %s\n",url);
897 ms->thunk.parent = ms;
898 ms->thunk.prov = main_prov;
899 rc = ms->thunk.prov->func_tbl->delete_func(main_prov,
900 ms->bucket,ms->key,url);
901 if (rc == MHD_HTTP_OK) {
902 copied_url = strdup(url);
903 assert (copied_url);
904 bucket = strtok_r(copied_url,"/",&stctx);
905 key = strtok_r(NULL,"/",&stctx);
906 meta_delete(bucket,key);
907 free(copied_url);
908 replicate_delete(url,ms);
911 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
912 if (!resp) {
913 return MHD_NO;
915 MHD_queue_response(conn,rc,resp);
916 MHD_destroy_response(resp);
918 free_ms(ms);
919 return MHD_YES;
922 /* TBD: get actual bucket list */
923 typedef struct {
924 char *rel;
925 char *link;
926 } fake_bucket_t;
928 static const fake_bucket_t fake_bucket_list[] = {
929 { "bucket_factory", "_new" },
930 { "provider_list", "_providers" },
933 static int
934 root_blob_generator (void *ctx, uint64_t pos, char *buf, int max)
936 my_state *ms = ctx;
937 const fake_bucket_t *fb;
938 int len;
939 const char *accept_hdr;
940 const char *host;
941 char *bucket;
942 char *key;
944 (void)pos;
946 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
947 "Accept");
948 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
950 if (!ms->gen_ctx) {
951 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
952 if (!ms->gen_ctx) {
953 return -1;
955 ms->cleanup |= CLEANUP_TMPL;
956 ms->gen_ctx->base = host;
957 len = tmpl_root_header(ms->gen_ctx,"image_warehouse","1.0");
958 if (!len) {
959 return -1;
961 if (len > max) {
962 len = max;
964 memcpy(buf,ms->gen_ctx->buf,len);
965 return len;
968 if (ms->gen_ctx == TMPL_CTX_DONE) {
969 return -1;
972 if (ms->gen_ctx->index < ARRAY_SIZE(fake_bucket_list)) {
973 fb = fake_bucket_list + ms->gen_ctx->index;
974 len = tmpl_root_entry(ms->gen_ctx,fb->rel,fb->link);
975 if (!len) {
976 return -1;
978 if (len > max) {
979 len = max;
981 memcpy(buf,ms->gen_ctx->buf,len);
982 return len;
985 if (meta_query_next(ms->query,&bucket,&key)) {
986 len = tmpl_root_entry(ms->gen_ctx,"bucket",bucket);
987 if (!len) {
988 return -1;
990 if (len > max) {
991 len = max;
993 memcpy(buf,ms->gen_ctx->buf,len);
994 return len;
997 len = tmpl_root_footer(ms->gen_ctx);
998 if (!len) {
999 return -1;
1001 if (len > max) {
1002 len = max;
1004 memcpy(buf,ms->gen_ctx->buf,len);
1005 free(ms->gen_ctx);
1006 ms->cleanup &= ~CLEANUP_TMPL;
1007 ms->gen_ctx = TMPL_CTX_DONE;
1008 return len;
1011 static int
1012 proxy_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1013 const char *method, const char *version, const char *data,
1014 size_t *data_size, void **rctx)
1016 struct MHD_Response *resp = NULL;
1017 unsigned int rc = MHD_HTTP_OK;
1018 my_state *ms = *rctx;
1020 (void)cctx;
1021 (void)method;
1022 (void)version;
1023 (void)data;
1025 DPRINTF("PROXY API ROOT (%s, %zu)\n",url,*data_size);
1027 ms->query = meta_query_new(NULL,"_default",NULL);
1028 if (!ms->query) {
1029 free_ms(ms);
1030 return MHD_NO;
1032 ms->cleanup |= CLEANUP_QUERY;
1034 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1035 65536, root_blob_generator, ms, simple_closer);
1036 if (!resp) {
1037 return MHD_NO;
1039 MHD_queue_response(conn,rc,resp);
1040 MHD_destroy_response(resp);
1042 return MHD_YES;
1046 static int
1047 post_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1048 const char *filename, const char *content_type,
1049 const char *transfer_encoding, const char *data,
1050 uint64_t off, size_t size)
1052 char *old_val;
1053 size_t old_len;
1054 char *new_val;
1056 (void)kind;
1057 (void)filename;
1058 (void)content_type;
1059 (void)transfer_encoding;
1060 (void)off;
1062 printf("adding %s, size=%zu\n",key,size);
1064 // TBD: don't assume that values are null-terminated strings
1065 old_val = g_hash_table_lookup(ctx,key);
1066 if (old_val) {
1067 old_len = strlen(old_val);
1068 new_val = malloc(old_len+size+1);
1069 if (!new_val) {
1070 return MHD_NO;
1072 memcpy(new_val,old_val,old_len);
1073 memcpy(new_val+old_len,data,size);
1074 new_val[old_len+size] = '\0';
1076 else {
1077 new_val = malloc(size+1);
1078 if (!new_val) {
1079 return MHD_NO;
1081 memcpy(new_val,data,size);
1082 new_val[size] = '\0';
1085 g_hash_table_insert(ctx,strdup(key),new_val);
1086 /* TBD: check return value for strdups (none avail for insert) */
1087 return MHD_YES;
1090 /* Returns TRUE if we found an *invalid* key. */
1091 static gboolean
1092 post_find (gpointer key, gpointer value, gpointer ctx)
1094 (void)value;
1095 (void)ctx;
1097 if (!is_reserved(key,reserved_attr)) {
1098 return FALSE;
1101 DPRINTF("bad attr %s\n", (char *)key);
1102 return TRUE;
1105 static void
1106 post_foreach (gpointer key, gpointer value, gpointer ctx)
1108 my_state *ms = ctx;
1110 DPRINTF("setting %s = %s for %s/%s\n",(char *)key, (char *)value,
1111 ms->bucket,ms->key);
1112 meta_set_value(ms->bucket,ms->key,key,value);
1115 static int
1116 create_bucket (char *name, my_state *ms)
1118 int rc;
1120 if (is_reserved(name,reserved_name)) {
1121 return MHD_HTTP_BAD_REQUEST;
1124 rc = main_prov->func_tbl->bcreate_func(main_prov,name);
1125 if (rc == MHD_HTTP_OK) {
1126 if (meta_set_value(name,"_default", "_policy","0") != 0) {
1127 DPRINTF("default-policy " "create failed\n");
1128 /* Non-fatal. */
1130 DPRINTF("created bucket %s\n",name);
1132 * There's not a whole lot to do about bucket-creation
1133 * failures on replicas, other than to report them, unless
1134 * we adopt an "all or nothing" approach and unwind the
1135 * create on the primary as well. Then what if that fails?
1136 * It's just one example of the general "fewer replicas
1137 * than desired" distributed-system problem, not worth a
1138 * point solution here/now. Revisit when we have a more
1139 * general replica-repair policy/system in place.
1141 replicate_bcreate(name,ms);
1144 return rc;
1147 static int
1148 control_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1149 const char *method, const char *version, const char *data,
1150 size_t *data_size, void **rctx)
1152 struct MHD_Response *resp;
1153 my_state *ms = *rctx;
1154 int rc;
1155 char *op;
1156 char buf[80];
1157 int len;
1159 (void)cctx;
1160 (void)method;
1161 (void)version;
1163 DPRINTF("ROOT POST (%s, %zu)\n",url,*data_size);
1165 if (ms->state == MS_NEW) {
1166 ms->state = MS_NORMAL;
1167 ms->url = (char *)url;
1168 ms->dict = g_hash_table_new_full(
1169 g_str_hash,g_str_equal,free,free);
1170 ms->cleanup |= CLEANUP_DICT;
1171 ms->post = MHD_create_post_processor(conn,4096,
1172 post_iterator,ms->dict);
1173 ms->cleanup |= CLEANUP_POST;
1174 return MHD_YES;
1177 if (*data_size) {
1178 MHD_post_process(ms->post,data,*data_size);
1179 *data_size = 0;
1180 return MHD_YES;
1183 rc = MHD_HTTP_BAD_REQUEST;
1185 op = g_hash_table_lookup(ms->dict,"op");
1186 if (op) {
1187 if (!strcmp(op,"rep_status")) {
1188 len = snprintf(buf,sizeof(buf),"%d requests\n",
1189 get_rep_count());
1190 rc = MHD_HTTP_OK;
1192 else {
1193 len = snprintf(buf,sizeof(buf),"unknown op");
1196 else {
1197 len = snprintf(buf,sizeof(buf),"missing op");
1200 if (len >= (int)sizeof(buf)) {
1201 len = 0;
1202 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
1205 /* NB The last arg tells MHD to copy the arg and free it later. */
1206 resp = MHD_create_response_from_data(len,buf,MHD_NO,MHD_YES);
1207 if (!resp) {
1208 return MHD_NO;
1210 MHD_queue_response(conn,rc,resp);
1211 MHD_destroy_response(resp);
1213 free_ms(ms);
1214 return MHD_YES;
1217 static int
1218 proxy_bucket_post (void *cctx, struct MHD_Connection *conn, const char *url,
1219 const char *method, const char *version, const char *data,
1220 size_t *data_size, void **rctx)
1222 struct MHD_Response *resp;
1223 my_state *ms = *rctx;
1224 int rc;
1225 char *key;
1227 (void)cctx;
1228 (void)method;
1229 (void)version;
1231 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1233 if (ms->state == MS_NEW) {
1234 ms->state = MS_NORMAL;
1235 ms->url = (char *)url;
1236 ms->dict = g_hash_table_new_full(
1237 g_str_hash,g_str_equal,free,free);
1238 ms->cleanup |= CLEANUP_DICT;
1239 ms->post = MHD_create_post_processor(conn,4096,
1240 post_iterator,ms->dict);
1241 ms->cleanup |= CLEANUP_POST;
1243 else if (*data_size) {
1244 MHD_post_process(ms->post,data,*data_size);
1245 *data_size = 0;
1247 else {
1248 rc = MHD_HTTP_BAD_REQUEST;
1249 key = g_hash_table_lookup(ms->dict,"key");
1250 if (key) {
1251 strncpy(ms->key,key,MAX_FIELD_LEN-1);
1252 g_hash_table_remove(ms->dict,"key");
1253 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1254 g_hash_table_foreach(ms->dict,post_foreach,ms);
1255 DPRINTF("rereplicate (bucket POST)\n");
1256 recheck_replication(ms,NULL);
1257 rc = MHD_HTTP_OK;
1260 else if (!strcmp(ms->bucket,"_new")) {
1261 key = g_hash_table_lookup(ms->dict,"name");
1262 if (key != NULL) {
1263 rc = create_bucket(key,ms);
1266 else {
1267 DPRINTF("A parameter is MISSING (fail)\n");
1269 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1270 if (!resp) {
1271 fprintf(stderr,"MHD_crfd failed\n");
1272 return MHD_NO;
1274 MHD_queue_response(conn,rc,resp);
1275 MHD_destroy_response(resp);
1276 free_ms(ms);
1279 return MHD_YES;
1282 static int
1283 check_location (my_state *ms)
1285 char *loc = g_hash_table_lookup(ms->dict,"depot");
1287 if (!loc) {
1288 DPRINTF("missing loc on check for %s/%s\n",ms->bucket,ms->key);
1289 return MHD_HTTP_BAD_REQUEST;
1292 if (!meta_has_copy(ms->bucket,ms->key,loc)) {
1293 DPRINTF("did not find %s/%s at %s\n",ms->bucket,ms->key,loc);
1294 return MHD_HTTP_NOT_FOUND;
1297 /* TBD: meta_has_copy returns an etag which we should check */
1298 DPRINTF("found %s/%s at %s\n",ms->bucket,ms->key,loc);
1299 return MHD_HTTP_OK;
1302 static int
1303 register_image (my_state *ms)
1305 const char *site;
1306 const provider_t *prov;
1307 char *next;
1309 site = g_hash_table_lookup(ms->dict,"site");
1310 if (!site) {
1311 printf("site MISSING\n");
1312 return MHD_HTTP_BAD_REQUEST;
1315 next = index(site,':');
1316 if (next) {
1317 *(next++) = '\0';
1320 prov = get_provider(site);
1321 if (!prov) {
1322 DPRINTF("site %s not found\n",site);
1323 return MHD_HTTP_BAD_REQUEST;
1326 return prov->func_tbl->register_func(ms,prov,next,ms->dict);
1330 static int
1331 parts_callback (void *ctx, uint64_t pos, char *buf, int max)
1333 my_state *ms = ctx;
1334 int len;
1335 const char *accept_hdr;
1336 const char *name;
1337 const char *value;
1338 const char *host;
1340 (void)pos;
1342 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1343 "Accept");
1344 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
1346 if (!ms->gen_ctx) {
1347 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1348 if (!ms->gen_ctx) {
1349 return -1;
1351 ms->cleanup |= CLEANUP_TMPL;
1352 ms->gen_ctx->base = host;
1353 len = tmpl_obj_header(ms->gen_ctx,ms->bucket,ms->key);
1354 if (!len) {
1355 return -1;
1357 if (len > max) {
1358 len = max;
1360 memcpy(buf,ms->gen_ctx->buf,len);
1361 return len;
1364 if (ms->gen_ctx == TMPL_CTX_DONE) {
1365 return -1;
1369 // Set up and use query for what attributes exist.
1370 for(;;) {
1371 if (!meta_attr_next(ms->aquery,&name,&value)) {
1372 break;
1374 if (is_reserved(name,reserved_attr)) {
1375 continue;
1377 len = tmpl_obj_entry(ms->gen_ctx,ms->bucket,ms->key,name);
1378 if (!len) {
1379 return -1;
1381 if (len > max) {
1382 len = max;
1384 memcpy(buf,ms->gen_ctx->buf,len);
1385 return len;
1388 len = tmpl_obj_footer(ms->gen_ctx);
1389 if (!len) {
1390 return -1;
1392 if (len > max) {
1393 len = max;
1395 memcpy(buf,ms->gen_ctx->buf,len);
1396 free(ms->gen_ctx);
1397 ms->cleanup &= ~CLEANUP_TMPL;
1398 ms->gen_ctx = TMPL_CTX_DONE;
1399 return len;
1402 static int
1403 show_parts (struct MHD_Connection *conn, my_state *ms)
1405 struct MHD_Response *resp;
1407 ms->aquery = meta_get_attrs(ms->bucket,ms->key);
1408 if (!ms->aquery) {
1409 return MHD_HTTP_NOT_FOUND;
1411 ms->cleanup |= CLEANUP_AQUERY;
1413 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1414 65536, parts_callback, ms, simple_closer);
1415 if (!resp) {
1416 fprintf(stderr,"MHD_crfc failed\n");
1417 simple_closer(ms);
1418 return MHD_NO;
1420 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1421 MHD_destroy_response(resp);
1422 return MHD_HTTP_PROCESSING;
1425 static int
1426 proxy_object_post (void *cctx, struct MHD_Connection *conn, const char *url,
1427 const char *method, const char *version, const char *data,
1428 size_t *data_size, void **rctx)
1430 struct MHD_Response *resp;
1431 my_state *ms = *rctx;
1432 int rc;
1433 char *op;
1435 (void)cctx;
1436 (void)method;
1437 (void)version;
1439 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1441 if (ms->state == MS_NEW) {
1442 ms->state = MS_NORMAL;
1443 ms->url = (char *)url;
1444 ms->dict = g_hash_table_new_full(
1445 g_str_hash,g_str_equal,free,free);
1446 ms->cleanup |= CLEANUP_DICT;
1447 ms->post = MHD_create_post_processor(conn,4096,
1448 post_iterator,ms->dict);
1449 ms->cleanup |= CLEANUP_POST;
1451 else if (*data_size) {
1452 MHD_post_process(ms->post,data,*data_size);
1453 *data_size = 0;
1455 else {
1456 rc = MHD_HTTP_BAD_REQUEST;
1457 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1458 op = g_hash_table_lookup(ms->dict,"op");
1459 if (op) {
1460 if (!strcmp(op,"push")) {
1461 DPRINTF("rereplicate (obj POST)\n");
1462 recheck_replication(ms,NULL);
1463 rc = MHD_HTTP_OK;
1465 else if (!strcmp(op,"check")) {
1466 rc = check_location(ms);
1468 else if (!strcmp(op,"register")) {
1469 rc = register_image(ms);
1471 else if (!strcmp(op,"parts")) {
1472 rc = show_parts(conn,ms);
1474 else {
1475 DPRINTF("unknown op %s for %s/%s\n",
1476 op, ms->bucket, ms->key);
1479 else {
1480 DPRINTF("op is MISSING (fail)\n");
1483 if (rc != MHD_HTTP_PROCESSING) {
1485 * MHD_HTTP_PROCESSING is a special response that
1486 * means a request-specific routine (e.g. show_parts)
1487 * created its own response. Therefore we shouldn't.
1489 resp = MHD_create_response_from_data(0,NULL,
1490 MHD_NO,MHD_NO);
1491 if (!resp) {
1492 fprintf(stderr,"MHD_crfd failed\n");
1493 return MHD_NO;
1495 MHD_queue_response(conn,rc,resp);
1496 MHD_destroy_response(resp);
1497 free_ms(ms);
1501 return MHD_YES;
1506 static int
1507 prov_list_generator (void *ctx, uint64_t pos, char *buf, int max)
1509 my_state *ms = ctx;
1510 int len;
1511 gpointer key;
1512 const provider_t *prov;
1513 const char *accept_hdr;
1515 (void)pos;
1517 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1518 "Accept");
1520 if (!ms->gen_ctx) {
1521 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1522 if (!ms->gen_ctx) {
1523 return -1;
1525 ms->cleanup |= CLEANUP_TMPL;
1526 init_prov_iter(&ms->prov_iter);
1527 len = tmpl_prov_header(ms->gen_ctx);
1528 if (!len) {
1529 return -1;
1531 if (len > max) {
1532 len = max;
1534 memcpy(buf,ms->gen_ctx->buf,len);
1535 return len;
1538 if (ms->gen_ctx == TMPL_CTX_DONE) {
1539 return -1;
1542 if (g_hash_table_iter_next(&ms->prov_iter,&key,(gpointer *)&prov)) {
1543 len = tmpl_prov_entry(ms->gen_ctx,prov->name,prov->type,
1544 prov->host, prov->port, prov->username, prov->password);
1545 if (!len) {
1546 return -1;
1548 if (len > max) {
1549 len = max;
1551 memcpy(buf,ms->gen_ctx->buf,len);
1552 return len;
1555 len = tmpl_prov_footer(ms->gen_ctx);
1556 if (!len) {
1557 return -1;
1559 if (len > max) {
1560 len = max;
1562 memcpy(buf,ms->gen_ctx->buf,len);
1563 free(ms->gen_ctx);
1564 ms->cleanup &= ~CLEANUP_TMPL;
1565 ms->gen_ctx = TMPL_CTX_DONE;
1566 return len;
1569 static int
1570 proxy_list_provs (void *cctx, struct MHD_Connection *conn, const char *url,
1571 const char *method, const char *version, const char *data,
1572 size_t *data_size, void **rctx)
1574 struct MHD_Response *resp;
1575 my_state *ms = *rctx;
1577 (void)cctx;
1578 (void)url;
1579 (void)method;
1580 (void)version;
1581 (void)data;
1582 (void)data_size;
1584 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1585 65536, prov_list_generator, ms, simple_closer);
1586 if (!resp) {
1587 fprintf(stderr,"MHD_crfd failed\n");
1588 simple_closer(ms);
1589 return MHD_NO;
1591 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1592 MHD_destroy_response(resp);
1594 return MHD_YES;
1597 static int
1598 prov_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1599 const char *filename, const char *content_type,
1600 const char *transfer_encoding, const char *data,
1601 uint64_t off, size_t size)
1603 (void)kind;
1604 (void)filename;
1605 (void)content_type;
1606 (void)transfer_encoding;
1607 (void)off;
1609 g_hash_table_insert(ctx,strdup(key),strndup(data,size));
1610 /* TBD: check return value for strdups (none avail for insert) */
1611 return MHD_YES;
1615 static int
1616 proxy_update_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1617 const char *method, const char *version, const char *data,
1618 size_t *data_size, void **rctx)
1620 struct MHD_Response *resp;
1621 my_state *ms = *rctx;
1622 int rc;
1623 char *provider;
1624 char *username;
1625 char *password;
1627 (void)cctx;
1628 (void)method;
1629 (void)version;
1631 if (ms->state == MS_NEW) {
1632 ms->state = MS_NORMAL;
1633 ms->url = (char *)url;
1634 ms->dict = g_hash_table_new_full(
1635 g_str_hash,g_str_equal,free,free);
1636 ms->cleanup |= CLEANUP_DICT;
1637 ms->post = MHD_create_post_processor(conn,4096,
1638 prov_iterator,ms->dict);
1639 ms->cleanup |= CLEANUP_POST;
1641 else if (*data_size) {
1642 MHD_post_process(ms->post,data,*data_size);
1643 *data_size = 0;
1645 else {
1646 rc = MHD_HTTP_BAD_REQUEST;
1647 provider = g_hash_table_lookup(ms->dict,"provider");
1648 username = g_hash_table_lookup(ms->dict,"username");
1649 password = g_hash_table_lookup(ms->dict,"password");
1650 if (provider && username && password) {
1651 update_provider(provider,username,password);
1652 rc = MHD_HTTP_OK;
1654 else {
1655 DPRINTF("provider/username/password MISSING\n");
1657 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1658 if (!resp) {
1659 fprintf(stderr,"MHD_crfd failed\n");
1660 return MHD_NO;
1662 MHD_queue_response(conn,rc,resp);
1663 MHD_destroy_response(resp);
1664 free_ms(ms);
1667 return MHD_YES;
1670 static int
1671 proxy_create_bucket (void *cctx, struct MHD_Connection *conn, const char *url,
1672 const char *method, const char *version, const char *data,
1673 size_t *data_size, void **rctx)
1675 struct MHD_Response *resp;
1676 my_state *ms = *rctx;
1677 int rc;
1679 (void)cctx;
1680 (void)method;
1681 (void)version;
1682 (void)data;
1683 (void)data_size;
1684 (void)url;
1686 /* curl -T moo.empty http://localhost:9090/_new by accident */
1687 rc = create_bucket(ms->bucket,ms);
1689 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1690 if (!resp) {
1691 fprintf(stderr,"MHD_crfd failed\n");
1692 return MHD_NO;
1694 MHD_queue_response(conn,rc,resp);
1695 MHD_destroy_response(resp);
1697 free_ms(ms);
1698 return MHD_YES;
1701 static const rule my_rules[] = {
1702 { /* get bucket list */
1703 "GET", URL_ROOT, proxy_api_root },
1704 { /* perform a control operation on the API root */
1705 "POST", URL_ROOT, control_api_root },
1706 { /* get object list */
1707 "GET", URL_BUCKET, proxy_list_objs },
1708 { /* create bucket */
1709 "PUT", URL_BUCKET, proxy_create_bucket },
1710 { /* get object data */
1711 "GET", URL_OBJECT, proxy_get_data },
1712 { /* get attribute data */
1713 "GET", URL_ATTR, proxy_get_attr },
1714 { /* put object data */
1715 "PUT", URL_OBJECT, proxy_put_data },
1716 { /* put attribute data */
1717 "PUT", URL_ATTR, proxy_put_attr },
1718 { /* create object and/or modify attributes */
1719 "POST", URL_BUCKET, proxy_bucket_post },
1720 { /* perform control operations on an object */
1721 "POST", URL_OBJECT, proxy_object_post },
1722 { /* query */
1723 "POST", URL_QUERY, proxy_query },
1724 { /* delete object */
1725 "DELETE", URL_OBJECT, proxy_delete },
1726 { /* delete attribute (TBD) */
1727 "DELETE", URL_ATTR, NULL },
1728 { /* get provider list */
1729 "GET", URL_PROVLIST, proxy_list_provs },
1730 { /* update a provider */
1731 "POST", URL_PROVLIST, proxy_update_prov },
1732 { NULL, 0, NULL }
1735 static url_type
1736 parse_url (const char *url, my_state *ms)
1738 unsigned short esize;
1739 unsigned short eindex;
1740 char *parts[URL_INVAL];
1742 if (strstr(url,"../")) {
1743 /* Naughty, naughty. Never a good reason to allow this. */
1744 DPRINTF("Rejecting ../ in path.\n");
1745 return URL_INVAL;
1748 eindex = URL_ROOT;
1749 parts[URL_BUCKET] = ms->bucket;
1750 parts[URL_OBJECT] = ms->key;
1751 parts[URL_ATTR] = ms->attr;
1753 for (;;) {
1754 while (*url == '/') {
1755 ++url;
1758 if (!*url) {
1759 if (eindex == URL_BUCKET) {
1760 if (!strcmp(ms->bucket,"_providers")) {
1761 eindex = URL_PROVLIST;
1764 else if (eindex == URL_OBJECT) {
1765 if (!strcmp(ms->key,"_query")) {
1766 eindex = URL_QUERY;
1769 break;
1772 if (++eindex >= URL_INVAL) {
1773 return URL_INVAL;
1775 esize = 0;
1777 while (*url && (*url != '/')) {
1778 parts[eindex][esize++] = *(url++);
1779 if (esize >= MAX_FIELD_LEN) {
1780 return URL_INVAL;
1785 return eindex;
1788 static int
1789 access_handler (void *cctx, struct MHD_Connection *conn, const char *url,
1790 const char *method, const char *version, const char *data,
1791 size_t *data_size, void **rctx)
1793 unsigned int i;
1794 url_type utype;
1795 struct MHD_Response *resp;
1796 my_state *ms = *rctx;
1798 if (ms) {
1799 return ms->handler(cctx,conn,url,method,version,
1800 data,data_size,rctx);
1803 ms = calloc(sizeof(*ms), 1);
1804 if (!ms) {
1805 return MHD_NO;
1807 ms->refcnt = 1;
1809 utype = parse_url(url,ms);
1811 for (i = 0; my_rules[i].method; ++i) {
1812 if (utype != my_rules[i].utype) {
1813 continue;
1815 if (strcmp(method,my_rules[i].method)) {
1816 continue;
1818 if (!my_rules[i].handler) {
1819 break;
1821 ms->handler = my_rules[i].handler;
1822 ms->state = MS_NEW;
1823 ms->url = NULL;
1824 ms->post = NULL;
1825 ms->conn = conn;
1826 *rctx = ms;
1827 return ms->handler(cctx,conn,url,method,version,
1828 data,data_size,rctx);
1831 /* Don't need this after all. Free before the next check. */
1832 free_ms(ms);
1834 if (!strcmp(method,"QUIT")) {
1835 (void)sem_post((sem_t *)cctx);
1836 return MHD_NO;
1839 fprintf(stderr,"bad request m=%s u=%s\n",method,url);
1841 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1842 if (!resp) {
1843 return MHD_NO;
1845 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
1846 MHD_destroy_response(resp);
1847 return MHD_YES;
1850 /* These enum values cannot possibly conflict with the option values
1851 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1852 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1853 enum
1855 GETOPT_HELP_CHAR = (CHAR_MIN - 2),
1856 GETOPT_VERSION_CHAR = (CHAR_MIN - 3)
1859 static const struct option my_options[] = {
1860 { "autostart", no_argument, NULL, 'a' },
1861 { "config", required_argument, NULL, 'c' },
1862 { "db", required_argument, NULL, 'd' },
1863 { "master", required_argument, NULL, 'm' },
1864 { "port", required_argument, NULL, 'p' },
1865 { "verbose", no_argument, NULL, 'v' },
1866 { "version", no_argument, NULL, GETOPT_VERSION_CHAR },
1867 { "help", no_argument, NULL, GETOPT_HELP_CHAR },
1868 { NULL, 0, NULL, '\0' }
1871 static void
1872 usage (int status)
1874 if (status != EXIT_SUCCESS)
1875 fprintf (stderr, _("Try `%s --help' for more information.\n"),
1876 program_name);
1877 else
1879 printf (_("\
1880 Usage: %s [OPTION]\n\
1882 program_name);
1883 fputs (_("\
1884 Deltacloud image-warehouse daemon.\n\
1885 A configuration file must be specified.\n\
1887 -a, --autostart start necessary back-end services\n\
1888 -c, --config=FILE config file [required]\n\
1889 -d, --db=HOST_PORT database server as ip[:port]\n\
1890 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1891 -p, --port=PORT alternate listen port (default 9090)\n\
1892 -v, --verbose verbose/debug output\n\
1894 --help display this help and exit\n\
1895 --version output version information and exit\n\
1896 "), stdout);
1897 printf (_("\
1899 Report %s bugs to %s.\n\
1901 program_name, PACKAGE_BUGREPORT);
1903 exit (status);
1908 main (int argc, char **argv)
1910 struct MHD_Daemon *the_daemon;
1911 sem_t the_sem;
1912 char *stctx = NULL;
1913 char *port_tmp;
1914 int autostart = 0;
1916 program_name = argv[0];
1918 for (;;) switch (getopt_long(argc,argv,"ac:d:m:p:v",my_options,NULL)) {
1919 case 'a':
1920 ++autostart;
1921 break;
1922 case 'c':
1923 cfg_file = optarg;
1924 break;
1925 case 'd':
1926 assert (optarg);
1927 db_host = strtok_r(optarg,":",&stctx);
1928 port_tmp = strtok_r(NULL,":",&stctx);
1929 if (port_tmp) {
1930 db_port = (unsigned short)strtoul(port_tmp,NULL,10);
1932 break;
1933 case 'm':
1934 assert (optarg);
1935 master_host = strtok_r(optarg,":",&stctx);
1936 port_tmp = strtok_r(NULL,":",&stctx);
1937 if (port_tmp) {
1938 master_port = (unsigned short)strtoul(port_tmp,NULL,10);
1940 break;
1941 case 'p':
1942 my_port = (unsigned short)strtoul(optarg,NULL,10);
1943 break;
1944 case 'v':
1945 ++verbose;
1946 break;
1947 case GETOPT_HELP_CHAR:
1948 usage(EXIT_SUCCESS);
1949 break;
1950 case GETOPT_VERSION_CHAR:
1951 printf ("%s version %s\n", program_name, PACKAGE_VERSION);
1952 exit (EXIT_SUCCESS);
1953 break;
1955 case -1:
1956 goto args_done;
1957 default:
1958 usage(EXIT_FAILURE);
1959 break;
1961 args_done:
1963 if (!db_port) {
1964 db_port = autostart ? AUTO_MONGOD_PORT : 27017;
1967 if (autostart && cfg_file) {
1968 error(0,0,"do not use -c and -a simultaneously");
1969 return !0;
1971 else if (autostart && !cfg_file) {
1972 me = auto_config();
1973 if (!me) {
1974 /* error printed */
1975 return !0;
1978 else if (!autostart && cfg_file) {
1979 me = parse_config(cfg_file);
1980 if (!me) {
1981 error(0,0,"could not parse %s",cfg_file);
1982 return !0;
1985 else {
1986 error(0,0,"specify at least -c or -a");
1987 usage (EXIT_FAILURE);
1990 sem_init(&the_sem,0,0);
1992 if (verbose) {
1993 printf("primary store type is %s\n",main_prov->type);
1994 if (master_host) {
1995 printf("operating as slave to %s:%u\n",
1996 master_host, master_port);
1998 printf("db is at %s:%u\n",db_host,db_port);
1999 printf("will listen on port %u\n",my_port);
2000 printf("my location is \"%s\"\n",me);
2001 if (fflush(stdout) || ferror(stdout))
2002 error(EXIT_FAILURE, 0, "write failed");
2005 backend_init();
2006 meta_init();
2007 repl_init();
2010 * Gotcha: if we don't set the connection memory limit explicitly,
2011 * the per-connection buffer for MHD will be smaller than that used
2012 * by CURL, so proxy_writefunc will never be able to do its job.
2014 the_daemon = MHD_start_daemon(MY_MHD_FLAGS,
2015 my_port, NULL, NULL, &access_handler, &the_sem,
2016 MHD_OPTION_CONNECTION_MEMORY_LIMIT, (size_t)1048576,
2017 MHD_OPTION_END);
2018 if (!the_daemon) {
2019 fprintf(stderr,"Could not create daemon.\n");
2020 return !0;
2023 sem_wait(&the_sem);
2024 return 0;