build: make the "rpm" rule work once again
[iwhd.git] / rest.c
blobea8b289f2c22b8cab02e28c1f44ff755119de6a4
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <error.h>
19 #include <fcntl.h>
20 #include <getopt.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30 #include <assert.h>
32 #include <microhttpd.h>
33 #include <hstor.h> /* only for ARRAY_SIZE at this point */
34 #include <curl/curl.h>
35 #include <glib.h>
37 #include "iwh.h"
38 #include "closeout.h"
39 #include "progname.h"
40 #include "meta.h"
41 #include "backend.h"
42 #include "setup.h"
43 #include "replica.h"
44 #include "template.h"
45 #include "mpipe.h"
46 #include "state_defs.h"
48 /* Define-away for now. Eventually, define to gettext. */
49 #define _(msgid) (msgid)
51 #if defined(DEBUG)
52 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
53 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
54 #else
55 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
56 #endif
58 typedef enum {
59 URL_ROOT=0, URL_BUCKET, URL_OBJECT, URL_ATTR, URL_INVAL,
60 URL_QUERY, URL_PROVLIST
61 } url_type;
63 typedef struct {
64 const char *method;
65 url_type utype;
66 MHD_AccessHandlerCallback handler;
67 } rule;
69 static unsigned short my_port = MY_PORT;
70 char *cfg_file = NULL;
72 static const char *const (reserved_name[]) = {"_default", "_new", "_policy", "_query", NULL};
73 static const char *const (reserved_attr[]) = {"_bucket", "_date", "_etag", "_key", "_loc", "_size", NULL};
74 static const char *const (reserved_bucket_name[]) = {"_new", "_providers", NULL};
76 void
77 free_ms (my_state *ms)
79 if (!g_atomic_int_dec_and_test(&ms->refcnt)) {
80 return;
83 if (ms->cleanup & CLEANUP_BUF_PTR) {
84 free(ms->pipe.data_ptr);
87 if (ms->cleanup & CLEANUP_POST) {
88 MHD_destroy_post_processor(ms->post);
91 if (ms->cleanup & CLEANUP_DICT) {
92 g_hash_table_destroy(ms->dict);
95 if (ms->cleanup & CLEANUP_QUERY) {
96 meta_query_stop(ms->query);
99 if (ms->cleanup & CLEANUP_TMPL) {
100 free(ms->gen_ctx);
103 if (ms->cleanup & CLEANUP_URL) {
104 free(ms->url);
107 if (ms->cleanup & CLEANUP_AQUERY) {
108 meta_attr_stop(ms->aquery);
111 free(ms);
114 static int
115 validate_put (struct MHD_Connection *conn)
117 const char *mhdr;
119 mhdr = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
120 "X-redhat-role");
122 * This will fail most obviously in the case where we are not the
123 * master, we know we're not the master, and we don't see this
124 * header (which is set in master-to-slave replication requests).
125 * It will *also* fail, deliberately, if we do see this header when
126 * we think we're the master, as it means there's a mismatch between
127 * their config and ours. This avoids "strange" behavior in such
128 * cases, in favor of a more obvious failure.
129 * TBD: this will be less of a problem if/when we identify the
130 * master and DB via the config file instead of -m/-d.
132 if (master_host) {
133 return (mhdr && !strcmp(mhdr,"master"));
135 else {
136 return !mhdr;
140 static int
141 is_reserved (const char *cand, char const *const *resv_list)
143 int i;
145 for (i = 0; resv_list[i]; ++i) {
146 if (!strcmp(cand,resv_list[i])) {
147 return TRUE;
151 return FALSE;
154 static int
155 validate_url (const char *url)
157 char *slash = strrchr(url,'/');
159 if (!slash) {
160 /* There should be at least one betwixt bucket and key. */
161 return 0;
164 return !is_reserved(slash+1,reserved_name);
167 /**********
168 * The proxy has MHD on one side and CURL on the other. The CURL side is
169 * always run in a child thread. Yes, there are both context switches
170 * and copies between the threads. Get over it. The focus here is on
171 * supporting multi-way replication on PUT, with minimal complexity. These
172 * were the two easiest libraries to use, and they both want to allocate
173 * their own buffers so we're kind of stuck with the copies unless we want
174 * to buffer whole files in memory (which would have to be virtual since
175 * they're potentialy bigger than physical) or explicitly ping them through
176 * a local filesystem. We could potentially take over scheduling from one
177 * or both to avoid some of the context switching, but those interfaces are
178 * significantly more error-prone and (at least in CURL's case) buggy.
180 * For a GET, the CURL child acts as a producer while the MHD parent acts
181 * as consumer. For a PUT, the MHD parent is the producer and the CURL
182 * child is the consumer. For GET the MHD component is invoked via a
183 * callback set up in the access handler; for PUT it's invoked via repeated
184 * calls to the access handler itself. Either way, the producer's job is
185 * to post its pointer+length to the my_state structure and then wait for
186 * all consumers to check back in saying they've finished it. This might
187 * involve multiple passes through each consumer for one pass through the
188 * single producer. When the producer is done, it does a similar handshake
189 * with the consumers. Each consumer has its own pipe_private structure,
190 * containing a pointer to the shared my_state plus a per-consumer offset
191 * into the current chunk.
193 * Attribute functions don't use CURL, so they do much simpler in-memory
194 * buffering. Queries also don't use CURL, but the MHD POST interface
195 * introduces some of its own complexity so see below for that.
196 **********/
198 static void
199 simple_closer (void *ctx)
201 my_state *ms = ctx;
203 DPRINTF("%s: cleaning up\n",__func__);
204 free_ms(ms);
207 static void
208 child_closer (void * ctx)
210 pipe_private *pp = ctx;
212 DPRINTF("in %s\n",__func__);
214 free(pp);
217 /* Invoked from MHD. */
218 static ssize_t
219 proxy_get_cons (void *ctx, uint64_t pos, char *buf, size_t max)
221 pipe_private *pp = ctx;
222 pipe_shared *ps = pp->shared;
223 my_state *ms = ps->owner;
224 ssize_t done;
225 void *child_res;
227 (void)pos;
229 DPRINTF("consumer asked to read %zu\n",max);
231 if (pipe_cons_wait(pp)) {
232 DPRINTF("consumer offset %zu into %zu\n",
233 pp->offset, ps->data_len);
234 if (ps->data_len < pp->offset)
235 // Warn about bogus offset?
236 done = -1;
237 else {
238 done = ps->data_len - pp->offset;
239 if ((size_t) done > max) {
240 done = max;
242 memcpy(buf,(char *)(ps->data_ptr)+pp->offset,done);
243 pp->offset += done;
244 DPRINTF("consumer copied %zu, new offset %zu\n",
245 done, pp->offset);
246 if (pp->offset == ps->data_len) {
247 DPRINTF("consumer finished chunk\n");
248 pipe_cons_signal(pp, 0);
252 else {
253 done = -1;
256 if (done == (-1)) {
257 child_res = NULL;
258 pthread_join(ms->backend_th,&child_res);
259 if (child_res == THREAD_FAILED) {
260 DPRINTF("GET producer failed\n");
261 /* Nothing we can do; already sent status. */
263 if (ms->from_master) {
264 pthread_join(ms->cache_th,NULL);
265 /* TBD: do something about cache failure? */
267 free_ms(ms);
270 return done;
273 static int
274 proxy_get_data (void *cctx, struct MHD_Connection *conn, const char *url,
275 const char *method, const char *version, const char *data,
276 size_t *data_size, void **rctx)
278 struct MHD_Response *resp;
279 my_state *ms = *rctx;
280 pipe_private *pp;
281 pipe_private *pp2;
282 char *my_etag;
283 const char *user_etag;
284 int rc;
286 (void)cctx;
287 (void)method;
288 (void)version;
289 (void)data;
290 (void)data_size;
292 DPRINTF("PROXY GET DATA %s\n",url);
294 ms->url = strdup(url);
295 if (!ms->url) {
296 return MHD_NO;
298 ms->cleanup |= CLEANUP_URL;
300 my_etag = meta_has_copy(ms->bucket,ms->key,me);
301 if (!my_etag) {
302 DPRINTF("falling back to local for %s/%s\n",ms->bucket,ms->key);
303 ms->from_master = 0;
305 else if (*my_etag) {
306 user_etag = MHD_lookup_connection_value(
307 conn, MHD_HEADER_KIND, "If-None-Match");
308 if (user_etag && !strcmp(user_etag,my_etag)) {
309 DPRINTF("ETag match!\n");
310 free(my_etag);
311 resp = MHD_create_response_from_data(0,NULL,
312 MHD_NO,MHD_NO);
313 MHD_queue_response(conn,MHD_HTTP_NOT_MODIFIED,resp);
314 MHD_destroy_response(resp);
315 return MHD_YES;
317 free(my_etag);
318 ms->from_master = 0;
320 else {
321 DPRINTF("%s/%s not found locally\n",ms->bucket,ms->key);
322 if (!master_host) {
323 DPRINTF(" that means it doesn't exist\n");
324 resp = MHD_create_response_from_data(0,NULL,
325 MHD_NO,MHD_NO);
326 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
327 MHD_destroy_response(resp);
328 free_ms(ms);
329 return MHD_YES;
331 DPRINTF(" will fetch from %s:%u\n", master_host,master_port);
332 ms->from_master = 1;
335 pipe_init_shared(&ms->pipe,ms,ms->from_master+1);
336 pp = pipe_init_private(&ms->pipe);
337 if (!pp) {
338 return MHD_NO;
340 ms->thunk.parent = ms;
341 ms->thunk.prov = ms->from_master ? master_prov : main_prov;
342 pthread_create(&ms->backend_th,NULL,
343 ms->thunk.prov->func_tbl->get_child_func,&ms->thunk);
344 /* TBD: check return value */
346 if (ms->from_master) {
347 pp2 = pipe_init_private(&ms->pipe);
348 if (!pp2) {
349 return MHD_NO;
351 pp2->prov = main_prov;
352 pthread_create(&ms->cache_th,NULL,
353 main_prov->func_tbl->cache_child_func,pp2);
354 /* TBD: check return value */
356 else {
357 pp2 = NULL;
360 rc = pipe_cons_wait_init(&ms->pipe);
361 ms->rc = (rc == 0) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR;
363 resp = MHD_create_response_from_callback(
364 MHD_SIZE_UNKNOWN, 65536, proxy_get_cons, pp, child_closer);
365 if (!resp) {
366 fprintf(stderr,"MHD_crfc failed\n");
367 if (pp2) {
368 /* TBD: terminate thread */
369 free(pp2);
371 child_closer(pp);
372 return MHD_NO;
374 MHD_queue_response(conn,ms->rc,resp);
375 MHD_destroy_response(resp);
377 return MHD_YES;
380 static void
381 recheck_replication (my_state * ms, char *policy)
383 int rc;
384 int free_it = FALSE;
385 char fixed[MAX_FIELD_LEN];
387 if (is_reserved(ms->key,reserved_name)) {
388 DPRINTF("declining to replicate reserved object %s\n",ms->key);
389 return;
392 if (!policy && ms->dict) {
393 DPRINTF("using new policy for %s/%s\n",ms->bucket,ms->key);
394 policy = g_hash_table_lookup(ms->dict,"_policy");
397 if (!policy) {
398 /* If we get a policy here or below, we have to free it. */
399 free_it = TRUE;
400 DPRINTF("fetching policy for %s/%s\n",ms->bucket,ms->key);
401 rc = meta_get_value(ms->bucket,ms->key, "_policy", &policy);
404 if (!policy) {
405 DPRINTF(" inheriting policy from %s\n",ms->bucket);
406 rc = meta_get_value(ms->bucket,
407 "_default", "_policy", &policy);
410 if (policy) {
411 DPRINTF(" implementing policy %s\n",policy);
413 * Can't use ms->url here because it might be a bucket POST
414 * and in that case ms->url points to the bucket.
416 snprintf(fixed,sizeof(fixed),"%s/%s",ms->bucket,ms->key);
417 replicate(fixed,0,policy,ms);
418 if (free_it) {
419 free(policy);
422 else {
423 DPRINTF(" could not find a policy anywhere!\n");
427 static int
428 proxy_put_data (void *cctx, struct MHD_Connection *conn, const char *url,
429 const char *method, const char *version, const char *data,
430 size_t *data_size, void **rctx)
432 struct MHD_Response *resp;
433 my_state *ms = *rctx;
434 pipe_private *pp;
435 int rc;
436 char *etag = NULL;
437 void *child_res;
439 (void)cctx;
440 (void)method;
441 (void)version;
443 DPRINTF("PROXY PUT DATA %s (%zu)\n",url,*data_size);
445 if (ms->state == MS_NEW) {
446 if (!validate_put(conn) || !validate_url(url)) {
447 DPRINTF("rejecting %s\n",url);
448 resp = MHD_create_response_from_data(0,NULL,
449 MHD_NO,MHD_NO);
450 if (!resp) {
451 return MHD_NO;
453 MHD_queue_response(conn,MHD_HTTP_FORBIDDEN,resp);
454 MHD_destroy_response(resp);
455 free_ms(ms);
456 return MHD_YES;
458 ms->state = MS_NORMAL;
459 ms->url = strdup(url);
460 if (!ms->url) {
461 return MHD_NO;
463 ms->cleanup |= CLEANUP_URL;
464 ms->size = 0;
465 pipe_init_shared(&ms->pipe,ms,1);
466 pp = pipe_init_private(&ms->pipe);
467 if (!pp) {
468 return MHD_NO;
470 pp->prov = main_prov;
471 ms->be_flags = BACKEND_GET_SIZE;
472 pthread_create(&ms->backend_th,NULL,
473 main_prov->func_tbl->put_child_func,pp);
474 /* TBD: check return value */
477 * Do the initial handshake with children. If we return from
478 * this callback without an error response, Microhttpd posts
479 * the "100 Continue" header and the client starts sending
480 * the data. We must report errors here or forever keep
481 * our peace.
483 rc = pipe_prod_wait_init(&ms->pipe);
484 if (rc != 0) {
485 DPRINTF("producer wait failed\n");
486 resp = MHD_create_response_from_data(0,NULL,
487 MHD_NO,MHD_NO);
488 if (!resp) {
489 return MHD_NO;
491 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
492 resp);
493 MHD_destroy_response(resp);
494 } else if (rc > 0) {
496 * Note that we fail here even if 1 of N replicas fail.
497 * Might want to fix this when we start looping over
498 * pipe_init_private() above.
500 DPRINTF("producer replicas failed (%u of %u)\n",
501 rc, ms->pipe.cons_total);
502 resp = MHD_create_response_from_data(0,NULL,
503 MHD_NO,MHD_NO);
504 if (!resp) {
505 return MHD_NO;
507 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
508 resp);
509 MHD_destroy_response(resp);
510 } else {
511 DPRINTF("producer proceeding\n");
514 else if (*data_size) {
515 pipe_prod_signal(&ms->pipe,(void *)data,*data_size);
516 ms->size += *data_size;
517 DPRINTF("producer chunk finished\n");
518 *data_size = 0;
520 else {
521 pipe_prod_finish(&ms->pipe);
522 pthread_join(ms->backend_th,&child_res);
523 if (child_res == THREAD_FAILED) {
524 DPRINTF("thread failed\n");
525 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
527 else if (ms->pipe.cons_error == ms->pipe.cons_total) {
528 DPRINTF("all %u consumers failed\n",
529 ms->pipe.cons_error);
530 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
532 else {
533 if (master_host) {
534 meta_got_copy(ms->bucket,ms->key,me);
535 etag = NULL;
537 else {
538 etag = meta_did_put(ms->bucket,ms->key,me,
539 ms->size);
541 DPRINTF("rereplicate (obj PUT)\n");
542 recheck_replication(ms,NULL);
543 rc = MHD_HTTP_OK;
545 free_ms(ms);
546 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
547 if (!resp) {
548 free(etag);
549 return MHD_NO;
551 if (etag) {
552 MHD_add_response_header(resp,"ETag",etag);
553 free(etag);
555 MHD_queue_response(conn,rc,resp);
556 MHD_destroy_response(resp);
559 return MHD_YES;
562 static int
563 proxy_get_attr (void *cctx, struct MHD_Connection *conn, const char *url,
564 const char *method, const char *version, const char *data,
565 size_t *data_size, void **rctx)
567 struct MHD_Response *resp;
568 char *fixed;
569 my_state *ms = *rctx;
570 int rc = MHD_HTTP_NOT_FOUND;
572 (void)cctx;
573 (void)method;
574 (void)version;
575 (void)data;
576 (void)data_size;
578 DPRINTF("PROXY GET ATTR %s\n",url);
580 if (meta_get_value(ms->bucket,ms->key,ms->attr,&fixed) == 0) {
581 resp = MHD_create_response_from_data(strlen(fixed),fixed,
582 MHD_YES,MHD_NO);
583 rc = MHD_HTTP_OK;
585 else {
586 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
588 if (!resp) {
589 return MHD_NO;
591 MHD_queue_response(conn,rc,resp);
592 MHD_destroy_response(resp);
594 free_ms(ms);
595 return MHD_YES;
598 static int
599 proxy_put_attr (void *cctx, struct MHD_Connection *conn, const char *url,
600 const char *method, const char *version, const char *data,
601 size_t *data_size, void **rctx)
603 struct MHD_Response *resp;
604 my_state *ms = *rctx;
605 const char *attrval;
606 int send_resp = 0;
608 (void)cctx;
609 (void)method;
610 (void)version;
612 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url,*data_size);
614 if (ms->state == MS_NEW) {
615 ms->state = MS_NORMAL;
616 ms->url = strdup(url);
617 if (!ms->url) {
618 return MHD_NO;
620 ms->cleanup |= CLEANUP_URL;
621 attrval = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
622 "X-redhat-value");
623 if (attrval) {
624 meta_set_value(ms->bucket,ms->key,ms->attr,
625 (char *)attrval);
626 send_resp = 1;
629 else if (*data_size) {
630 if (ms->pipe.data_len) {
631 ms->pipe.data_len += *data_size;
632 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
633 if (!p) {
634 return MHD_NO;
636 ms->pipe.data_ptr = p;
638 else {
639 ms->pipe.data_len = *data_size + 1;
640 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
641 if (!ms->pipe.data_ptr) {
642 return MHD_NO;
644 ((char *)ms->pipe.data_ptr)[0] = '\0';
645 ms->cleanup |= CLEANUP_BUF_PTR;
647 (void)strncat(ms->pipe.data_ptr,data,*data_size);
648 /* TBD: check return value */
649 *data_size = 0;
651 else {
652 if (!ms->pipe.data_ptr) {
653 return MHD_NO;
655 if (is_reserved(ms->attr,reserved_attr)) {
656 resp = MHD_create_response_from_data(
657 0,NULL,MHD_NO,MHD_NO);
658 if (!resp) {
659 return MHD_NO;
661 MHD_queue_response(conn,MHD_HTTP_BAD_REQUEST,
662 resp);
663 MHD_destroy_response(resp);
664 free_ms(ms);
665 return MHD_YES;
667 meta_set_value(ms->bucket,ms->key,ms->attr,ms->pipe.data_ptr);
668 /* This might get stomped by replication. */
669 if (ms->cleanup & CLEANUP_BUF_PTR) {
670 free(ms->pipe.data_ptr);
671 ms->cleanup &= ~CLEANUP_BUF_PTR;
674 * We should always re-replicate, because the replication
675 * policy might refer to this attr.
677 DPRINTF("rereplicate (attr PUT)\n");
678 recheck_replication(ms,NULL);
679 free_ms(ms);
680 send_resp = 1;
683 if (send_resp) {
684 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
685 if (!resp) {
686 return MHD_NO;
688 MHD_queue_response(conn,MHD_HTTP_CREATED,resp);
689 MHD_destroy_response(resp);
691 * TBD: check if the attribute was a replication policy, and
692 * start/stop replication activities as appropriate.
696 return MHD_YES;
699 /**********
700 * For queries, we have to deal with MHD's post-iterator interface (not
701 * quite the same as the data-iteration even though we use it that way) on
702 * one side, and a query-iterator interface on the other. Data on both
703 * sides could be quite large, so we can't just stick them in header lines.
704 * We do still buffer the query in memory, though. Once that's done, we do
705 * very simple parsing - it will be more complicated later - and create the
706 * query iterator. That's also driven by MHD, this time though the
707 * content-callback interface, and repeatedly calls in to the metadata
708 * module to fetch one object name at a time.
709 **********/
711 static int
712 query_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
713 const char *filename, const char *content_type,
714 const char *transfer_encoding, const char *data,
715 uint64_t off, size_t size)
717 (void)ctx;
718 (void)kind;
719 (void)key;
720 (void)filename;
721 (void)content_type;
722 (void)transfer_encoding;
723 (void)data;
724 (void)off;
725 (void)size;
727 /* We actually accumulate the data in proxy_query. */
728 return MHD_YES;
731 /* MHD reader function during queries. Return -1 for EOF. */
732 static ssize_t
733 proxy_query_func (void *ctx, uint64_t pos, char *buf, size_t max)
735 my_state *ms = ctx;
736 size_t len;
737 const char *accept_hdr;
738 char *bucket;
739 char *key;
741 (void)pos;
743 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
744 "Accept");
746 if (!ms->gen_ctx) {
747 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
748 if (!ms->gen_ctx) {
749 return -1;
751 ms->cleanup |= CLEANUP_TMPL;
752 len = tmpl_list_header(ms->gen_ctx);
753 if (!len) {
754 return -1;
756 if (len > max) {
757 len = max;
759 memcpy(buf,ms->gen_ctx->buf,len);
760 return len;
763 if (ms->gen_ctx == TMPL_CTX_DONE) {
764 return -1;
767 for(;;) {
768 if (!meta_query_next(ms->query,&bucket,&key)) {
769 break;
771 if (is_reserved(key,reserved_name)) {
772 continue;
774 len = tmpl_list_entry(ms->gen_ctx,bucket,key);
775 if (!len) {
776 return -1;
778 if (len > max) {
779 len = max;
781 memcpy(buf,ms->gen_ctx->buf,len);
782 return len;
785 len = tmpl_list_footer(ms->gen_ctx);
786 if (!len) {
787 return -1;
789 if (len > max) {
790 len = max;
792 memcpy(buf,ms->gen_ctx->buf,len);
793 free(ms->gen_ctx);
794 ms->cleanup &= ~CLEANUP_TMPL;
795 ms->gen_ctx = TMPL_CTX_DONE;
796 return len;
799 static int
800 proxy_query (void *cctx, struct MHD_Connection *conn, const char *url,
801 const char *method, const char *version, const char *data,
802 size_t *data_size, void **rctx)
804 struct MHD_Response *resp;
805 my_state *ms = *rctx;
807 (void)cctx;
808 (void)method;
809 (void)version;
811 DPRINTF("PROXY QUERY %s (%zu)\n",url,*data_size);
813 if (ms->state == MS_NEW) {
814 ms->state = MS_NORMAL;
815 ms->post = MHD_create_post_processor(conn,4096,
816 query_iterator,ms);
817 ms->cleanup |= CLEANUP_POST;
819 else if (*data_size) {
820 MHD_post_process(ms->post,data,*data_size);
821 if (ms->pipe.data_len) {
822 ms->pipe.data_len += *data_size;
823 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
824 if (!p) {
825 return MHD_NO;
827 ms->pipe.data_ptr = p;
829 else {
830 ms->pipe.data_len = *data_size + 1;
831 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
832 if (!ms->pipe.data_ptr) {
833 return MHD_NO;
835 ((char *)ms->pipe.data_ptr)[0] = '\0';
836 ms->cleanup |= CLEANUP_BUF_PTR;
838 (void)strncat(ms->pipe.data_ptr,data,*data_size);
839 /* TBD: check return value */
840 *data_size = 0;
842 else {
843 if (!ms->pipe.data_ptr) {
844 return MHD_NO;
846 ms->query = meta_query_new(ms->bucket,NULL,ms->pipe.data_ptr);
847 ms->cleanup |= CLEANUP_QUERY;
848 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
849 65536, proxy_query_func, ms, simple_closer);
850 if (!resp) {
851 fprintf(stderr,"MHD_crfc failed\n");
852 simple_closer(ms);
853 return MHD_NO;
855 MHD_queue_response(conn,MHD_HTTP_OK,resp);
856 MHD_destroy_response(resp);
857 free_ms(ms);
860 return MHD_YES;
863 static int
864 proxy_list_objs (void *cctx, struct MHD_Connection *conn, const char *url,
865 const char *method, const char *version, const char *data,
866 size_t *data_size, void **rctx)
868 my_state *ms = *rctx;
869 struct MHD_Response *resp;
871 (void)cctx;
872 (void)url;
873 (void)method;
874 (void)version;
875 (void)data;
876 (void)data_size;
878 ms->query = meta_query_new((char *)ms->bucket,NULL,NULL);
879 ms->cleanup |= CLEANUP_QUERY;
881 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
882 65536, proxy_query_func, ms, simple_closer);
883 if (!resp) {
884 fprintf(stderr,"MHD_crfc failed\n");
885 simple_closer(ms);
886 return MHD_NO;
889 MHD_queue_response(conn,MHD_HTTP_OK,resp);
890 MHD_destroy_response(resp);
891 return MHD_YES;
894 static int
895 proxy_delete (void *cctx, struct MHD_Connection *conn, const char *url,
896 const char *method, const char *version, const char *data,
897 size_t *data_size, void **rctx)
899 my_state *ms = *rctx;
900 struct MHD_Response *resp;
901 char *copied_url;
902 char *bucket;
903 char *key;
904 char *stctx = NULL;
905 int rc;
907 (void)cctx;
908 (void)method;
909 (void)version;
910 (void)data;
911 (void)data_size;
913 DPRINTF("PROXY DELETE %s\n",url);
915 ms->thunk.parent = ms;
916 ms->thunk.prov = main_prov;
917 rc = ms->thunk.prov->func_tbl->delete_func(main_prov,
918 ms->bucket,ms->key,url);
919 if (rc == MHD_HTTP_OK) {
920 copied_url = strdup(url);
921 assert (copied_url);
922 bucket = strtok_r(copied_url,"/",&stctx);
923 key = strtok_r(NULL,"/",&stctx);
924 meta_delete(bucket,key);
925 free(copied_url);
926 replicate_delete(url,ms);
929 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
930 if (!resp) {
931 free_ms(ms);
932 return MHD_NO;
934 MHD_queue_response(conn,rc,resp);
935 MHD_destroy_response(resp);
937 free_ms(ms);
938 return MHD_YES;
941 /* TBD: get actual bucket list */
942 typedef struct {
943 const char *rel;
944 const char *link;
945 } fake_bucket_t;
947 /* FIXME: ensure that the RHS values here stay in sync with those
948 in reserved_bucket_name. */
949 static const fake_bucket_t fake_bucket_list[] = {
950 { "bucket_factory", "_new" },
951 { "provider_list", "_providers" },
954 static ssize_t
955 root_blob_generator (void *ctx, uint64_t pos, char *buf, size_t max)
957 my_state *ms = ctx;
958 const fake_bucket_t *fb;
959 size_t len;
960 const char *accept_hdr;
961 const char *host;
962 char *bucket;
963 char *key;
965 (void)pos;
967 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
968 "Accept");
969 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
971 if (!ms->gen_ctx) {
972 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
973 if (!ms->gen_ctx) {
974 return -1;
976 ms->cleanup |= CLEANUP_TMPL;
977 ms->gen_ctx->base = host;
978 len = tmpl_root_header(ms->gen_ctx,"image_warehouse",VERSION);
979 if (!len) {
980 return -1;
982 if (len > max) {
983 len = max;
985 memcpy(buf,ms->gen_ctx->buf,len);
986 return len;
989 if (ms->gen_ctx == TMPL_CTX_DONE) {
990 return -1;
993 if (ms->gen_ctx->index < ARRAY_SIZE(fake_bucket_list)) {
994 fb = fake_bucket_list + ms->gen_ctx->index;
995 len = tmpl_root_entry(ms->gen_ctx,fb->rel,fb->link);
996 if (!len) {
997 return -1;
999 if (len > max) {
1000 len = max;
1002 memcpy(buf,ms->gen_ctx->buf,len);
1003 return len;
1006 if (meta_query_next(ms->query,&bucket,&key)) {
1007 len = tmpl_root_entry(ms->gen_ctx,"bucket",bucket);
1008 if (!len) {
1009 return -1;
1011 if (len > max) {
1012 len = max;
1014 memcpy(buf,ms->gen_ctx->buf,len);
1015 return len;
1018 len = tmpl_root_footer(ms->gen_ctx);
1019 if (!len) {
1020 return -1;
1022 if (len > max) {
1023 len = max;
1025 memcpy(buf,ms->gen_ctx->buf,len);
1026 free(ms->gen_ctx);
1027 ms->cleanup &= ~CLEANUP_TMPL;
1028 ms->gen_ctx = TMPL_CTX_DONE;
1029 return len;
1032 static int
1033 proxy_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1034 const char *method, const char *version, const char *data,
1035 size_t *data_size, void **rctx)
1037 struct MHD_Response *resp = NULL;
1038 unsigned int rc = MHD_HTTP_OK;
1039 my_state *ms = *rctx;
1041 (void)cctx;
1042 (void)method;
1043 (void)version;
1044 (void)data;
1046 DPRINTF("PROXY API ROOT (%s, %zu)\n",url,*data_size);
1048 ms->query = meta_query_new(NULL,"_default",NULL);
1049 if (!ms->query) {
1050 free_ms(ms);
1051 return MHD_NO;
1053 ms->cleanup |= CLEANUP_QUERY;
1055 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1056 65536, root_blob_generator, ms, simple_closer);
1057 if (!resp) {
1058 return MHD_NO;
1060 MHD_queue_response(conn,rc,resp);
1061 MHD_destroy_response(resp);
1063 return MHD_YES;
1067 static int
1068 post_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1069 const char *filename, const char *content_type,
1070 const char *transfer_encoding, const char *data,
1071 uint64_t off, size_t size)
1073 char *old_val;
1074 size_t old_len;
1075 char *new_val;
1077 (void)kind;
1078 (void)filename;
1079 (void)content_type;
1080 (void)transfer_encoding;
1081 (void)off;
1083 printf("adding %s, size=%zu\n",key,size);
1085 // TBD: don't assume that values are null-terminated strings
1086 old_val = g_hash_table_lookup(ctx,key);
1087 if (old_val) {
1088 old_len = strlen(old_val);
1089 new_val = malloc(old_len+size+1);
1090 if (!new_val) {
1091 return MHD_NO;
1093 memcpy(new_val,old_val,old_len);
1094 memcpy(new_val+old_len,data,size);
1095 new_val[old_len+size] = '\0';
1097 else {
1098 new_val = malloc(size+1);
1099 if (!new_val) {
1100 return MHD_NO;
1102 memcpy(new_val,data,size);
1103 new_val[size] = '\0';
1106 g_hash_table_insert(ctx,strdup(key),new_val);
1107 /* TBD: check return value for strdups (none avail for insert) */
1108 return MHD_YES;
1111 /* Returns TRUE if we found an *invalid* key. */
1112 static gboolean
1113 post_find (gpointer key, gpointer value, gpointer ctx)
1115 (void)value;
1116 (void)ctx;
1118 if (!is_reserved(key,reserved_attr)) {
1119 return FALSE;
1122 DPRINTF("bad attr %s\n", (char *)key);
1123 return TRUE;
1126 static void
1127 post_foreach (gpointer key, gpointer value, gpointer ctx)
1129 my_state *ms = ctx;
1131 DPRINTF("setting %s = %s for %s/%s\n",(char *)key, (char *)value,
1132 ms->bucket,ms->key);
1133 meta_set_value(ms->bucket,ms->key,key,value);
1136 static int
1137 create_bucket (char *name, my_state *ms)
1139 int rc;
1141 if (is_reserved(name, reserved_name)
1142 || is_reserved(name, reserved_bucket_name)) {
1143 return MHD_HTTP_BAD_REQUEST;
1146 rc = main_prov->func_tbl->bcreate_func(main_prov,name);
1147 if (rc == MHD_HTTP_OK) {
1148 if (meta_set_value(name,"_default", "_policy","0") != 0) {
1149 DPRINTF("default-policy " "create failed\n");
1150 /* Non-fatal. */
1152 DPRINTF("created bucket %s\n",name);
1154 * There's not a whole lot to do about bucket-creation
1155 * failures on replicas, other than to report them, unless
1156 * we adopt an "all or nothing" approach and unwind the
1157 * create on the primary as well. Then what if that fails?
1158 * It's just one example of the general "fewer replicas
1159 * than desired" distributed-system problem, not worth a
1160 * point solution here/now. Revisit when we have a more
1161 * general replica-repair policy/system in place.
1163 replicate_bcreate(name,ms);
1166 return rc;
1169 static int
1170 control_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1171 const char *method, const char *version, const char *data,
1172 size_t *data_size, void **rctx)
1174 struct MHD_Response *resp;
1175 my_state *ms = *rctx;
1176 int rc;
1177 char *op;
1178 char buf[80];
1179 int len;
1181 (void)cctx;
1182 (void)method;
1183 (void)version;
1185 DPRINTF("ROOT POST (%s, %zu)\n",url,*data_size);
1187 if (ms->state == MS_NEW) {
1188 ms->state = MS_NORMAL;
1189 ms->url = (char *)url;
1190 ms->dict = g_hash_table_new_full(
1191 g_str_hash,g_str_equal,free,free);
1192 ms->cleanup |= CLEANUP_DICT;
1193 ms->post = MHD_create_post_processor(conn,4096,
1194 post_iterator,ms->dict);
1195 ms->cleanup |= CLEANUP_POST;
1196 return MHD_YES;
1199 if (*data_size) {
1200 MHD_post_process(ms->post,data,*data_size);
1201 *data_size = 0;
1202 return MHD_YES;
1205 rc = MHD_HTTP_BAD_REQUEST;
1207 op = g_hash_table_lookup(ms->dict,"op");
1208 if (op) {
1209 if (!strcmp(op,"rep_status")) {
1210 len = snprintf(buf,sizeof(buf),"%d requests\n",
1211 get_rep_count());
1212 rc = MHD_HTTP_OK;
1214 else {
1215 len = snprintf(buf,sizeof(buf),"unknown op");
1218 else {
1219 len = snprintf(buf,sizeof(buf),"missing op");
1222 if (len >= (int)sizeof(buf)) {
1223 len = 0;
1224 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
1227 /* NB The last arg tells MHD to copy the arg and free it later. */
1228 resp = MHD_create_response_from_data(len,buf,MHD_NO,MHD_YES);
1229 if (!resp) {
1230 return MHD_NO;
1232 MHD_queue_response(conn,rc,resp);
1233 MHD_destroy_response(resp);
1235 free_ms(ms);
1236 return MHD_YES;
1239 static int
1240 proxy_bucket_post (void *cctx, struct MHD_Connection *conn, const char *url,
1241 const char *method, const char *version, const char *data,
1242 size_t *data_size, void **rctx)
1244 struct MHD_Response *resp;
1245 my_state *ms = *rctx;
1246 int rc;
1247 char *key;
1249 (void)cctx;
1250 (void)method;
1251 (void)version;
1253 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1255 if (ms->state == MS_NEW) {
1256 ms->state = MS_NORMAL;
1257 ms->url = (char *)url;
1258 ms->dict = g_hash_table_new_full(
1259 g_str_hash,g_str_equal,free,free);
1260 ms->cleanup |= CLEANUP_DICT;
1261 ms->post = MHD_create_post_processor(conn,4096,
1262 post_iterator,ms->dict);
1263 ms->cleanup |= CLEANUP_POST;
1265 else if (*data_size) {
1266 MHD_post_process(ms->post,data,*data_size);
1267 *data_size = 0;
1269 else {
1270 rc = MHD_HTTP_BAD_REQUEST;
1271 key = g_hash_table_lookup(ms->dict,"_key");
1272 if (key) {
1273 strncpy(ms->key,key,MAX_FIELD_LEN-1);
1274 g_hash_table_remove(ms->dict,"_key");
1275 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1276 g_hash_table_foreach(ms->dict,post_foreach,ms);
1277 DPRINTF("rereplicate (bucket POST)\n");
1278 recheck_replication(ms,NULL);
1279 rc = MHD_HTTP_OK;
1282 else if (!strcmp(ms->bucket,"_new")) {
1283 key = g_hash_table_lookup(ms->dict,"name");
1284 if (key != NULL) {
1285 rc = create_bucket(key,ms);
1288 else {
1289 DPRINTF("A parameter is MISSING (fail)\n");
1291 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1292 if (!resp) {
1293 fprintf(stderr,"MHD_crfd failed\n");
1294 return MHD_NO;
1296 MHD_queue_response(conn,rc,resp);
1297 MHD_destroy_response(resp);
1298 free_ms(ms);
1301 return MHD_YES;
1304 static int
1305 check_location (my_state *ms)
1307 char *loc = g_hash_table_lookup(ms->dict,"depot");
1309 if (!loc) {
1310 DPRINTF("missing loc on check for %s/%s\n",ms->bucket,ms->key);
1311 return MHD_HTTP_BAD_REQUEST;
1314 if (!meta_has_copy(ms->bucket,ms->key,loc)) {
1315 DPRINTF("did not find %s/%s at %s\n",ms->bucket,ms->key,loc);
1316 return MHD_HTTP_NOT_FOUND;
1319 /* TBD: meta_has_copy returns an etag which we should check */
1320 DPRINTF("found %s/%s at %s\n",ms->bucket,ms->key,loc);
1321 return MHD_HTTP_OK;
1324 static int
1325 register_image (my_state *ms)
1327 const char *site;
1328 const provider_t *prov;
1329 char *next;
1331 site = g_hash_table_lookup(ms->dict,"site");
1332 if (!site) {
1333 printf("site MISSING\n");
1334 return MHD_HTTP_BAD_REQUEST;
1337 next = strchr(site,':');
1338 if (next) {
1339 *(next++) = '\0';
1342 prov = get_provider(site);
1343 if (!prov) {
1344 DPRINTF("site %s not found\n",site);
1345 return MHD_HTTP_BAD_REQUEST;
1348 return prov->func_tbl->register_func(ms,prov,next,ms->dict);
1352 static ssize_t
1353 parts_callback (void *ctx, uint64_t pos, char *buf, size_t max)
1355 my_state *ms = ctx;
1356 size_t len;
1357 const char *accept_hdr;
1358 const char *name;
1359 const char *value;
1360 const char *host;
1362 (void)pos;
1364 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1365 "Accept");
1366 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
1368 if (!ms->gen_ctx) {
1369 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1370 if (!ms->gen_ctx) {
1371 return -1;
1373 ms->cleanup |= CLEANUP_TMPL;
1374 ms->gen_ctx->base = host;
1375 len = tmpl_obj_header(ms->gen_ctx,ms->bucket,ms->key);
1376 if (!len) {
1377 return -1;
1379 if (len > max) {
1380 len = max;
1382 memcpy(buf,ms->gen_ctx->buf,len);
1383 return len;
1386 if (ms->gen_ctx == TMPL_CTX_DONE) {
1387 return -1;
1391 // Set up and use query for what attributes exist.
1392 for(;;) {
1393 if (!meta_attr_next(ms->aquery,&name,&value)) {
1394 break;
1396 if (is_reserved(name,reserved_attr)) {
1397 continue;
1399 len = tmpl_obj_entry(ms->gen_ctx,ms->bucket,ms->key,name);
1400 if (!len) {
1401 return -1;
1403 if (len > max) {
1404 len = max;
1406 memcpy(buf,ms->gen_ctx->buf,len);
1407 return len;
1410 len = tmpl_obj_footer(ms->gen_ctx);
1411 if (!len) {
1412 return -1;
1414 if (len > max) {
1415 len = max;
1417 memcpy(buf,ms->gen_ctx->buf,len);
1418 free(ms->gen_ctx);
1419 ms->cleanup &= ~CLEANUP_TMPL;
1420 ms->gen_ctx = TMPL_CTX_DONE;
1421 return len;
1424 static int
1425 show_parts (struct MHD_Connection *conn, my_state *ms)
1427 struct MHD_Response *resp;
1429 ms->aquery = meta_get_attrs(ms->bucket,ms->key);
1430 if (!ms->aquery) {
1431 return MHD_HTTP_NOT_FOUND;
1433 ms->cleanup |= CLEANUP_AQUERY;
1435 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1436 65536, parts_callback, ms, simple_closer);
1437 if (!resp) {
1438 fprintf(stderr,"MHD_crfc failed\n");
1439 simple_closer(ms);
1440 return MHD_NO;
1442 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1443 MHD_destroy_response(resp);
1444 return MHD_HTTP_PROCESSING;
1447 static int
1448 proxy_object_post (void *cctx, struct MHD_Connection *conn, const char *url,
1449 const char *method, const char *version, const char *data,
1450 size_t *data_size, void **rctx)
1452 struct MHD_Response *resp;
1453 my_state *ms = *rctx;
1454 int rc;
1455 char *op;
1457 (void)cctx;
1458 (void)method;
1459 (void)version;
1461 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1463 if (ms->state == MS_NEW) {
1464 ms->state = MS_NORMAL;
1465 ms->url = (char *)url;
1466 ms->dict = g_hash_table_new_full(
1467 g_str_hash,g_str_equal,free,free);
1468 ms->cleanup |= CLEANUP_DICT;
1469 ms->post = MHD_create_post_processor(conn,4096,
1470 post_iterator,ms->dict);
1471 ms->cleanup |= CLEANUP_POST;
1473 else if (*data_size) {
1474 MHD_post_process(ms->post,data,*data_size);
1475 *data_size = 0;
1477 else {
1478 rc = MHD_HTTP_BAD_REQUEST;
1479 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1480 op = g_hash_table_lookup(ms->dict,"op");
1481 if (op) {
1482 if (!strcmp(op,"push")) {
1483 DPRINTF("rereplicate (obj POST)\n");
1484 recheck_replication(ms,NULL);
1485 rc = MHD_HTTP_OK;
1487 else if (!strcmp(op,"check")) {
1488 rc = check_location(ms);
1490 else if (!strcmp(op,"register")) {
1491 rc = register_image(ms);
1493 else if (!strcmp(op,"parts")) {
1494 rc = show_parts(conn,ms);
1496 else {
1497 DPRINTF("unknown op %s for %s/%s\n",
1498 op, ms->bucket, ms->key);
1501 else {
1502 DPRINTF("op is MISSING (fail)\n");
1505 if (rc != MHD_HTTP_PROCESSING) {
1507 * MHD_HTTP_PROCESSING is a special response that
1508 * means a request-specific routine (e.g. show_parts)
1509 * created its own response. Therefore we shouldn't.
1511 resp = MHD_create_response_from_data(0,NULL,
1512 MHD_NO,MHD_NO);
1513 if (!resp) {
1514 fprintf(stderr,"MHD_crfd failed\n");
1515 free_ms(ms);
1516 return MHD_NO;
1518 MHD_queue_response(conn,rc,resp);
1519 MHD_destroy_response(resp);
1520 free_ms(ms);
1524 return MHD_YES;
1529 static ssize_t
1530 prov_list_generator (void *ctx, uint64_t pos, char *buf, size_t max)
1532 my_state *ms = ctx;
1533 size_t len;
1534 gpointer key;
1535 const provider_t *prov;
1536 const char *accept_hdr;
1538 (void)pos;
1540 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1541 "Accept");
1543 if (!ms->gen_ctx) {
1544 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1545 if (!ms->gen_ctx) {
1546 return -1;
1548 ms->cleanup |= CLEANUP_TMPL;
1549 init_prov_iter(&ms->prov_iter);
1550 len = tmpl_prov_header(ms->gen_ctx);
1551 if (!len) {
1552 return -1;
1554 if (len > max) {
1555 len = max;
1557 memcpy(buf,ms->gen_ctx->buf,len);
1558 return len;
1561 if (ms->gen_ctx == TMPL_CTX_DONE) {
1562 return -1;
1565 if (g_hash_table_iter_next(&ms->prov_iter,&key,(gpointer *)&prov)) {
1566 len = tmpl_prov_entry(ms->gen_ctx,prov->name,prov->type,
1567 prov->host, prov->port, prov->username, prov->password);
1568 if (!len) {
1569 return -1;
1571 if (len > max) {
1572 len = max;
1574 memcpy(buf,ms->gen_ctx->buf,len);
1575 return len;
1578 len = tmpl_prov_footer(ms->gen_ctx);
1579 if (!len) {
1580 return -1;
1582 if (len > max) {
1583 len = max;
1585 memcpy(buf,ms->gen_ctx->buf,len);
1586 free(ms->gen_ctx);
1587 ms->cleanup &= ~CLEANUP_TMPL;
1588 ms->gen_ctx = TMPL_CTX_DONE;
1589 return len;
1592 static int
1593 proxy_list_provs (void *cctx, struct MHD_Connection *conn, const char *url,
1594 const char *method, const char *version, const char *data,
1595 size_t *data_size, void **rctx)
1597 struct MHD_Response *resp;
1598 my_state *ms = *rctx;
1600 (void)cctx;
1601 (void)url;
1602 (void)method;
1603 (void)version;
1604 (void)data;
1605 (void)data_size;
1607 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1608 65536, prov_list_generator, ms, simple_closer);
1609 if (!resp) {
1610 fprintf(stderr,"MHD_crfd failed\n");
1611 simple_closer(ms);
1612 return MHD_NO;
1614 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1615 MHD_destroy_response(resp);
1617 return MHD_YES;
1620 static int
1621 prov_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1622 const char *filename, const char *content_type,
1623 const char *transfer_encoding, const char *data,
1624 uint64_t off, size_t size)
1626 (void)kind;
1627 (void)filename;
1628 (void)content_type;
1629 (void)transfer_encoding;
1630 (void)off;
1632 g_hash_table_insert(ctx,strdup(key),strndup(data,size));
1633 /* TBD: check return value for strdups (none avail for insert) */
1634 return MHD_YES;
1638 static int
1639 proxy_update_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1640 const char *method, const char *version, const char *data,
1641 size_t *data_size, void **rctx)
1643 struct MHD_Response *resp;
1644 my_state *ms = *rctx;
1645 int rc;
1646 char *provider;
1647 char *username;
1648 char *password;
1650 (void)cctx;
1651 (void)method;
1652 (void)version;
1654 if (ms->state == MS_NEW) {
1655 ms->state = MS_NORMAL;
1656 ms->url = (char *)url;
1657 ms->dict = g_hash_table_new_full(
1658 g_str_hash,g_str_equal,free,free);
1659 ms->cleanup |= CLEANUP_DICT;
1660 ms->post = MHD_create_post_processor(conn,4096,
1661 prov_iterator,ms->dict);
1662 ms->cleanup |= CLEANUP_POST;
1664 else if (*data_size) {
1665 MHD_post_process(ms->post,data,*data_size);
1666 *data_size = 0;
1668 else {
1669 rc = MHD_HTTP_BAD_REQUEST;
1670 provider = g_hash_table_lookup(ms->dict,"provider");
1671 username = g_hash_table_lookup(ms->dict,"username");
1672 password = g_hash_table_lookup(ms->dict,"password");
1673 if (provider && username && password) {
1674 update_provider(provider,username,password);
1675 rc = MHD_HTTP_OK;
1677 else {
1678 DPRINTF("provider/username/password MISSING\n");
1680 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1681 if (!resp) {
1682 fprintf(stderr,"MHD_crfd failed\n");
1683 free_ms(ms);
1684 return MHD_NO;
1686 MHD_queue_response(conn,rc,resp);
1687 MHD_destroy_response(resp);
1688 free_ms(ms);
1691 return MHD_YES;
1694 static int
1695 proxy_create_bucket (void *cctx, struct MHD_Connection *conn, const char *url,
1696 const char *method, const char *version, const char *data,
1697 size_t *data_size, void **rctx)
1699 struct MHD_Response *resp;
1700 my_state *ms = *rctx;
1701 int rc;
1703 (void)cctx;
1704 (void)method;
1705 (void)version;
1706 (void)data;
1707 (void)data_size;
1708 (void)url;
1710 /* curl -T moo.empty http://localhost:9090/_new by accident */
1711 rc = create_bucket(ms->bucket,ms);
1713 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1714 if (!resp) {
1715 fprintf(stderr,"MHD_crfd failed\n");
1716 return MHD_NO;
1718 MHD_queue_response(conn,rc,resp);
1719 MHD_destroy_response(resp);
1721 free_ms(ms);
1722 return MHD_YES;
1725 static const rule my_rules[] = {
1726 { /* get bucket list */
1727 "GET", URL_ROOT, proxy_api_root },
1728 { /* perform a control operation on the API root */
1729 "POST", URL_ROOT, control_api_root },
1730 { /* get object list */
1731 "GET", URL_BUCKET, proxy_list_objs },
1732 { /* create bucket */
1733 "PUT", URL_BUCKET, proxy_create_bucket },
1734 { /* get object data */
1735 "GET", URL_OBJECT, proxy_get_data },
1736 { /* get attribute data */
1737 "GET", URL_ATTR, proxy_get_attr },
1738 { /* put object data */
1739 "PUT", URL_OBJECT, proxy_put_data },
1740 { /* put attribute data */
1741 "PUT", URL_ATTR, proxy_put_attr },
1742 { /* create object and/or modify attributes */
1743 "POST", URL_BUCKET, proxy_bucket_post },
1744 { /* perform control operations on an object */
1745 "POST", URL_OBJECT, proxy_object_post },
1746 { /* query */
1747 "POST", URL_QUERY, proxy_query },
1748 { /* delete object */
1749 "DELETE", URL_OBJECT, proxy_delete },
1750 { /* delete attribute (TBD) */
1751 "DELETE", URL_ATTR, NULL },
1752 { /* get provider list */
1753 "GET", URL_PROVLIST, proxy_list_provs },
1754 { /* update a provider */
1755 "POST", URL_PROVLIST, proxy_update_prov },
1756 { NULL, 0, NULL }
1759 static url_type
1760 parse_url (const char *url, my_state *ms)
1762 unsigned short esize;
1763 unsigned short eindex;
1764 char *parts[URL_INVAL];
1766 if (strstr(url,"../")) {
1767 /* Naughty, naughty. Never a good reason to allow this. */
1768 DPRINTF("Rejecting ../ in path.\n");
1769 return URL_INVAL;
1772 eindex = URL_ROOT;
1773 parts[URL_BUCKET] = ms->bucket;
1774 parts[URL_OBJECT] = ms->key;
1775 parts[URL_ATTR] = ms->attr;
1777 for (;;) {
1778 while (*url == '/') {
1779 ++url;
1782 if (!*url) {
1783 if (eindex == URL_BUCKET) {
1784 if (!strcmp(ms->bucket,"_providers")) {
1785 eindex = URL_PROVLIST;
1788 else if (eindex == URL_OBJECT) {
1789 if (!strcmp(ms->key,"_query")) {
1790 eindex = URL_QUERY;
1793 break;
1796 if (++eindex >= URL_INVAL) {
1797 return URL_INVAL;
1799 esize = 0;
1801 while (*url && (*url != '/')) {
1802 parts[eindex][esize++] = *(url++);
1803 if (esize >= MAX_FIELD_LEN) {
1804 return URL_INVAL;
1809 return eindex;
1812 static int
1813 access_handler (void *cctx, struct MHD_Connection *conn, const char *url,
1814 const char *method, const char *version, const char *data,
1815 size_t *data_size, void **rctx)
1817 unsigned int i;
1818 url_type utype;
1819 struct MHD_Response *resp;
1820 my_state *ms = *rctx;
1822 if (ms) {
1823 return ms->handler(cctx,conn,url,method,version,
1824 data,data_size,rctx);
1827 ms = calloc(sizeof(*ms), 1);
1828 if (!ms) {
1829 return MHD_NO;
1831 ms->refcnt = 1;
1833 utype = parse_url(url,ms);
1835 for (i = 0; my_rules[i].method; ++i) {
1836 if (utype != my_rules[i].utype) {
1837 continue;
1839 if (strcmp(method,my_rules[i].method)) {
1840 continue;
1842 if (!my_rules[i].handler) {
1843 break;
1845 ms->handler = my_rules[i].handler;
1846 ms->state = MS_NEW;
1847 ms->url = NULL;
1848 ms->post = NULL;
1849 ms->conn = conn;
1850 *rctx = ms;
1851 return ms->handler(cctx,conn,url,method,version,
1852 data,data_size,rctx);
1855 /* Don't need this after all. Free before the next check. */
1856 free_ms(ms);
1858 if (!strcmp(method,"QUIT")) {
1859 (void)sem_post((sem_t *)cctx);
1860 return MHD_NO;
1863 fprintf(stderr,"bad request m=%s u=%s\n",method,url);
1865 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1866 if (!resp) {
1867 return MHD_NO;
1869 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
1870 MHD_destroy_response(resp);
1871 return MHD_YES;
1874 /* These enum values cannot possibly conflict with the option values
1875 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1876 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1877 enum
1879 GETOPT_HELP_CHAR = (CHAR_MIN - 2),
1880 GETOPT_VERSION_CHAR = (CHAR_MIN - 3)
1883 static const struct option my_options[] = {
1884 { "autostart", no_argument, NULL, 'a' },
1885 { "config", required_argument, NULL, 'c' },
1886 { "db", required_argument, NULL, 'd' },
1887 { "master", required_argument, NULL, 'm' },
1888 { "port", required_argument, NULL, 'p' },
1889 { "verbose", no_argument, NULL, 'v' },
1890 { "version", no_argument, NULL, GETOPT_VERSION_CHAR },
1891 { "help", no_argument, NULL, GETOPT_HELP_CHAR },
1892 { NULL, 0, NULL, '\0' }
1895 static void ATTRIBUTE_NORETURN
1896 usage (int status)
1898 if (status != EXIT_SUCCESS)
1899 fprintf (stderr, _("Try `%s --help' for more information.\n"),
1900 program_name);
1901 else
1903 printf (_("\
1904 Usage: %s [OPTION]\n\
1906 program_name);
1907 fputs (_("\
1908 Deltacloud image-warehouse daemon.\n\
1909 A configuration file must be specified.\n\
1911 -a, --autostart start necessary back-end services\n\
1912 -c, --config=FILE config file [required]\n\
1913 -d, --db=HOST_PORT database server as ip[:port]\n\
1914 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1915 -p, --port=PORT alternate listen port (default 9090)\n\
1916 -v, --verbose verbose/debug output\n\
1918 --help display this help and exit\n\
1919 --version output version information and exit\n\
1920 "), stdout);
1921 printf (_("\
1923 Report %s bugs to %s.\n\
1925 program_name, PACKAGE_BUGREPORT);
1927 exit (status);
1932 main (int argc, char **argv)
1934 struct MHD_Daemon *the_daemon;
1935 sem_t the_sem;
1936 char *stctx = NULL;
1937 char *port_tmp;
1938 bool autostart = false;
1940 set_program_name (argv[0]);
1941 atexit (close_stdout);
1943 for (;;) switch (getopt_long(argc,argv,"ac:d:m:p:v",my_options,NULL)) {
1944 case 'a':
1945 autostart = true;
1946 break;
1947 case 'c':
1948 cfg_file = optarg;
1949 break;
1950 case 'd':
1951 assert (optarg);
1952 db_host = strtok_r(optarg,":",&stctx);
1953 port_tmp = strtok_r(NULL,":",&stctx);
1954 if (port_tmp) {
1955 db_port = (unsigned short)strtoul(port_tmp,NULL,10);
1957 break;
1958 case 'm':
1959 assert (optarg);
1960 master_host = strtok_r(optarg,":",&stctx);
1961 port_tmp = strtok_r(NULL,":",&stctx);
1962 if (port_tmp) {
1963 master_port = (unsigned short)strtoul(port_tmp,NULL,10);
1965 break;
1966 case 'p':
1967 my_port = (unsigned short)strtoul(optarg,NULL,10);
1968 break;
1969 case 'v':
1970 ++verbose;
1971 break;
1972 case GETOPT_HELP_CHAR:
1973 usage(EXIT_SUCCESS);
1974 break;
1975 case GETOPT_VERSION_CHAR:
1976 printf ("%s version %s\n", program_name, PACKAGE_VERSION);
1977 exit (EXIT_SUCCESS);
1978 break;
1980 case -1:
1981 goto args_done;
1982 default:
1983 usage(EXIT_FAILURE);
1984 break;
1986 args_done:
1988 if (!db_port) {
1989 db_port = autostart ? AUTO_MONGOD_PORT : 27017;
1992 if (autostart && cfg_file) {
1993 error(0,0,"do not use -c and -a simultaneously");
1994 return !0;
1996 else if (autostart && !cfg_file) {
1997 me = auto_config();
1998 if (!me) {
1999 /* error printed */
2000 return !0;
2003 else if (!autostart && cfg_file) {
2004 me = parse_config(cfg_file);
2005 if (!me) {
2006 error(0,0,"could not parse %s",cfg_file);
2007 return !0;
2010 else {
2011 error(0,0,"specify at least -c or -a");
2012 usage (EXIT_FAILURE);
2015 sem_init(&the_sem,0,0);
2017 if (verbose) {
2018 printf("primary store type is %s\n",main_prov->type);
2019 if (master_host) {
2020 printf("operating as slave to %s:%u\n",
2021 master_host, master_port);
2023 printf("db is at %s:%u\n",db_host,db_port);
2024 printf("will listen on port %u\n",my_port);
2025 printf("my location is \"%s\"\n",me);
2026 if (fflush(stdout) || ferror(stdout))
2027 error(EXIT_FAILURE, 0, "write failed");
2030 backend_init();
2031 meta_init();
2032 repl_init();
2035 * Gotcha: if we don't set the connection memory limit explicitly,
2036 * the per-connection buffer for MHD will be smaller than that used
2037 * by CURL, so proxy_writefunc will never be able to do its job.
2039 the_daemon = MHD_start_daemon(MY_MHD_FLAGS,
2040 my_port, NULL, NULL, &access_handler, &the_sem,
2041 MHD_OPTION_CONNECTION_MEMORY_LIMIT, (size_t)1048576,
2042 MHD_OPTION_END);
2043 if (!the_daemon) {
2044 fprintf(stderr,"Could not create daemon.\n");
2045 return !0;
2048 sem_wait(&the_sem);
2049 return 0;