remove functions and struct members that are no longer needed
[iwhd.git] / rest.c
blobdfa84b4c3163287bf50f450c275d3fb56fa37f6b
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <error.h>
19 #include <fcntl.h>
20 #include <getopt.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/stat.h>
30 #include <assert.h>
31 #include <errno.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
36 #include <glib.h>
38 #include "dirname.h"
39 #include "iwh.h"
40 #include "closeout.h"
41 #include "progname.h"
42 #include "meta.h"
43 #include "backend.h"
44 #include "setup.h"
45 #include "replica.h"
46 #include "template.h"
47 #include "mpipe.h"
48 #include "state_defs.h"
50 /* Define-away for now. Eventually, define to gettext. */
51 #define _(msgid) (msgid)
53 #if defined(DEBUG)
54 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
55 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
56 #else
57 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
58 #endif
60 typedef enum {
61 URL_ROOT=0, URL_BUCKET, URL_OBJECT, URL_ATTR, URL_INVAL,
62 URL_QUERY, URL_PROVLIST, URL_PROVIDER, URL_PROVIDER_SET_PRIMARY
63 } url_type;
65 typedef struct {
66 const char *method;
67 url_type utype;
68 MHD_AccessHandlerCallback handler;
69 } rule;
71 static unsigned short my_port = MY_PORT;
72 char *cfg_file = NULL;
74 static const char *const (reserved_name[]) = {"_default", "_new", "_policy", "_query", NULL};
75 static const char *const (reserved_attr[]) = {"_bucket", "_date", "_etag", "_key", "_loc", "_size", NULL};
76 static const char *const (reserved_bucket_name[]) = {"_new", "_providers", NULL};
78 static int
79 validate_put (struct MHD_Connection *conn)
81 const char *mhdr;
83 mhdr = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
84 "X-redhat-role");
86 * This will fail most obviously in the case where we are not the
87 * master, we know we're not the master, and we don't see this
88 * header (which is set in master-to-slave replication requests).
89 * It will *also* fail, deliberately, if we do see this header when
90 * we think we're the master, as it means there's a mismatch between
91 * their config and ours. This avoids "strange" behavior in such
92 * cases, in favor of a more obvious failure.
93 * TBD: this will be less of a problem if/when we identify the
94 * master and DB via the config file instead of -m/-d.
96 if (master_host) {
97 return (mhdr && !strcmp(mhdr,"master"));
99 else {
100 return !mhdr;
104 static int
105 is_reserved (const char *cand, char const *const *resv_list)
107 int i;
109 for (i = 0; resv_list[i]; ++i) {
110 if (!strcmp(cand,resv_list[i])) {
111 return TRUE;
115 return FALSE;
118 static int
119 validate_url (const char *url)
121 char *slash = strrchr(url,'/');
123 if (!slash) {
124 /* There should be at least one betwixt bucket and key. */
125 return 0;
128 return !is_reserved(slash+1,reserved_name);
131 /**********
132 * The proxy has MHD on one side and CURL on the other. The CURL side is
133 * always run in a child thread. Yes, there are both context switches
134 * and copies between the threads. Get over it. The focus here is on
135 * supporting multi-way replication on PUT, with minimal complexity. These
136 * were the two easiest libraries to use, and they both want to allocate
137 * their own buffers so we're kind of stuck with the copies unless we want
138 * to buffer whole files in memory (which would have to be virtual since
139 * they're potentialy bigger than physical) or explicitly ping them through
140 * a local filesystem. We could potentially take over scheduling from one
141 * or both to avoid some of the context switching, but those interfaces are
142 * significantly more error-prone and (at least in CURL's case) buggy.
144 * For a GET, the CURL child acts as a producer while the MHD parent acts
145 * as consumer. For a PUT, the MHD parent is the producer and the CURL
146 * child is the consumer. For GET the MHD component is invoked via a
147 * callback set up in the access handler; for PUT it's invoked via repeated
148 * calls to the access handler itself. Either way, the producer's job is
149 * to post its pointer+length to the my_state structure and then wait for
150 * all consumers to check back in saying they've finished it. This might
151 * involve multiple passes through each consumer for one pass through the
152 * single producer. When the producer is done, it does a similar handshake
153 * with the consumers. Each consumer has its own pipe_private structure,
154 * containing a pointer to the shared my_state plus a per-consumer offset
155 * into the current chunk.
157 * Attribute functions don't use CURL, so they do much simpler in-memory
158 * buffering. Queries also don't use CURL, but the MHD POST interface
159 * introduces some of its own complexity so see below for that.
160 **********/
162 static void
163 simple_closer (void *ctx)
165 my_state *ms = ctx;
167 DPRINTF("%s: cleaning up\n",__func__);
170 static void
171 child_closer (void * ctx)
173 pipe_private *pp = ctx;
175 DPRINTF("in %s\n",__func__);
177 free(pp);
180 /* Invoked from MHD. */
181 static ssize_t
182 proxy_get_cons (void *ctx, uint64_t pos, char *buf, size_t max)
184 pipe_private *pp = ctx;
185 pipe_shared *ps = pp->shared;
186 my_state *ms = ps->owner;
187 ssize_t done;
188 void *child_res;
190 (void)pos;
192 DPRINTF("consumer asked to read %zu\n",max);
194 if (pipe_cons_wait(pp)) {
195 DPRINTF("consumer offset %zu into %zu\n",
196 pp->offset, ps->data_len);
197 if (ps->data_len < pp->offset)
198 // Warn about bogus offset?
199 done = -1;
200 else {
201 done = ps->data_len - pp->offset;
202 if ((size_t) done > max) {
203 done = max;
205 memcpy(buf,(char *)(ps->data_ptr)+pp->offset,done);
206 pp->offset += done;
207 DPRINTF("consumer copied %zu, new offset %zu\n",
208 done, pp->offset);
209 if (pp->offset == ps->data_len) {
210 DPRINTF("consumer finished chunk\n");
211 pipe_cons_signal(pp, 0);
215 else {
216 done = -1;
219 if (done == (-1)) {
220 child_res = NULL;
221 pthread_join(ms->backend_th,&child_res);
222 if (child_res == THREAD_FAILED) {
223 DPRINTF("GET producer failed\n");
224 /* Nothing we can do; already sent status. */
226 if (ms->from_master) {
227 pthread_join(ms->cache_th,NULL);
228 /* TBD: do something about cache failure? */
232 return done;
235 static int
236 proxy_get_data (void *cctx, struct MHD_Connection *conn, const char *url,
237 const char *method, const char *version, const char *data,
238 size_t *data_size, void **rctx)
240 struct MHD_Response *resp;
241 my_state *ms = *rctx;
242 pipe_private *pp;
243 pipe_private *pp2;
244 char *my_etag;
245 const char *user_etag;
246 int rc;
248 (void)cctx;
249 (void)method;
250 (void)version;
251 (void)data;
252 (void)data_size;
254 DPRINTF("PROXY GET DATA %s\n",url);
256 ms->url = strdup(url);
257 if (!ms->url) {
258 return MHD_NO;
261 my_etag = meta_has_copy(ms->bucket,ms->key,me);
262 if (!my_etag) {
263 DPRINTF("falling back to local for %s/%s\n",ms->bucket,ms->key);
264 ms->from_master = 0;
266 else if (*my_etag) {
267 user_etag = MHD_lookup_connection_value(
268 conn, MHD_HEADER_KIND, "If-None-Match");
269 if (user_etag && !strcmp(user_etag,my_etag)) {
270 DPRINTF("ETag match!\n");
271 free(my_etag);
272 resp = MHD_create_response_from_data(0,NULL,
273 MHD_NO,MHD_NO);
274 MHD_queue_response(conn,MHD_HTTP_NOT_MODIFIED,resp);
275 MHD_destroy_response(resp);
276 return MHD_YES;
278 ms->from_master = 0;
280 else {
281 DPRINTF("%s/%s not found locally\n",ms->bucket,ms->key);
282 if (!master_host) {
283 DPRINTF(" that means it doesn't exist\n");
284 resp = MHD_create_response_from_data(0,NULL,
285 MHD_NO,MHD_NO);
286 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
287 MHD_destroy_response(resp);
288 return MHD_YES;
290 DPRINTF(" will fetch from %s:%u\n", master_host,master_port);
291 ms->from_master = 1;
294 pipe_init_shared(&ms->pipe,ms,ms->from_master+1);
295 pp = pipe_init_private(&ms->pipe);
296 if (!pp) {
297 return MHD_NO;
299 provider_t *main_prov = get_main_provider();
300 ms->thunk.parent = ms;
301 ms->thunk.prov = ms->from_master ? g_master_prov : main_prov;
302 pthread_create(&ms->backend_th,NULL,
303 ms->thunk.prov->func_tbl->get_child_func,&ms->thunk);
304 /* TBD: check return value */
306 if (ms->from_master) {
307 pp2 = pipe_init_private(&ms->pipe);
308 if (!pp2) {
309 return MHD_NO;
311 pp2->prov = main_prov;
312 pthread_create(&ms->cache_th,NULL,
313 main_prov->func_tbl->cache_child_func,pp2);
314 /* TBD: check return value */
316 else {
317 pp2 = NULL;
320 rc = pipe_cons_wait_init(&ms->pipe);
321 ms->rc = (rc == 0) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR;
323 resp = MHD_create_response_from_callback(
324 MHD_SIZE_UNKNOWN, 65536, proxy_get_cons, pp, child_closer);
325 if (!resp) {
326 fprintf(stderr,"MHD_crfc failed\n");
327 if (pp2) {
328 /* TBD: terminate thread */
329 free(pp2);
331 child_closer(pp);
332 return MHD_NO;
334 MHD_queue_response(conn,ms->rc,resp);
335 MHD_destroy_response(resp);
337 return MHD_YES;
340 static void
341 recheck_replication (my_state * ms, char *policy)
343 int rc;
344 char fixed[MAX_FIELD_LEN];
346 if (is_reserved(ms->key,reserved_name)) {
347 DPRINTF("declining to replicate reserved object %s\n",ms->key);
348 return;
351 if (!policy && ms->dict) {
352 DPRINTF("using new policy for %s/%s\n",ms->bucket,ms->key);
353 policy = g_hash_table_lookup(ms->dict,"_policy");
356 if (!policy) {
357 DPRINTF("fetching policy for %s/%s\n",ms->bucket,ms->key);
358 rc = meta_get_value(ms->bucket,ms->key, "_policy", &policy);
361 if (!policy) {
362 DPRINTF(" inheriting policy from %s\n",ms->bucket);
363 rc = meta_get_value(ms->bucket,
364 "_default", "_policy", &policy);
367 if (policy) {
368 DPRINTF(" implementing policy %s\n",policy);
370 * Can't use ms->url here because it might be a bucket POST
371 * and in that case ms->url points to the bucket.
373 snprintf(fixed,sizeof(fixed),"%s/%s",ms->bucket,ms->key);
374 replicate(fixed,0,policy,ms);
376 else {
377 DPRINTF(" could not find a policy anywhere!\n");
381 static int
382 proxy_put_data (void *cctx, struct MHD_Connection *conn, const char *url,
383 const char *method, const char *version, const char *data,
384 size_t *data_size, void **rctx)
386 struct MHD_Response *resp;
387 my_state *ms = *rctx;
388 pipe_private *pp;
389 int rc;
390 char *etag = NULL;
391 void *child_res;
393 (void)cctx;
394 (void)method;
395 (void)version;
397 DPRINTF("PROXY PUT DATA %s (%zu)\n",url,*data_size);
399 if (ms->state == MS_NEW) {
400 if (!validate_put(conn) || !validate_url(url)) {
401 DPRINTF("rejecting %s\n",url);
402 resp = MHD_create_response_from_data(0,NULL,
403 MHD_NO,MHD_NO);
404 if (!resp) {
405 return MHD_NO;
407 MHD_queue_response(conn,MHD_HTTP_FORBIDDEN,resp);
408 MHD_destroy_response(resp);
409 return MHD_YES;
411 ms->state = MS_NORMAL;
412 ms->url = strdup(url);
413 if (!ms->url) {
414 return MHD_NO;
416 ms->size = 0;
417 pipe_init_shared(&ms->pipe,ms,1);
418 pp = pipe_init_private(&ms->pipe);
419 if (!pp) {
420 return MHD_NO;
422 provider_t *main_prov = get_main_provider();
423 pp->prov = main_prov;
424 ms->be_flags = BACKEND_GET_SIZE;
425 pthread_create(&ms->backend_th,NULL,
426 main_prov->func_tbl->put_child_func,pp);
427 /* TBD: check return value */
430 * Do the initial handshake with children. If we return from
431 * this callback without an error response, Microhttpd posts
432 * the "100 Continue" header and the client starts sending
433 * the data. We must report errors here or forever keep
434 * our peace.
436 rc = pipe_prod_wait_init(&ms->pipe);
437 if (rc != 0) {
438 DPRINTF("producer wait failed\n");
439 resp = MHD_create_response_from_data(0,NULL,
440 MHD_NO,MHD_NO);
441 if (!resp) {
442 return MHD_NO;
444 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
445 resp);
446 MHD_destroy_response(resp);
447 } else if (rc > 0) {
449 * Note that we fail here even if 1 of N replicas fail.
450 * Might want to fix this when we start looping over
451 * pipe_init_private() above.
453 DPRINTF("producer replicas failed (%u of %u)\n",
454 rc, ms->pipe.cons_total);
455 resp = MHD_create_response_from_data(0,NULL,
456 MHD_NO,MHD_NO);
457 if (!resp) {
458 return MHD_NO;
460 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
461 resp);
462 MHD_destroy_response(resp);
463 } else {
464 DPRINTF("producer proceeding\n");
467 else if (*data_size) {
468 pipe_prod_signal(&ms->pipe,(void *)data,*data_size);
469 ms->size += *data_size;
470 DPRINTF("producer chunk finished\n");
471 *data_size = 0;
473 else {
474 pipe_prod_finish(&ms->pipe);
475 pthread_join(ms->backend_th,&child_res);
476 if (child_res == THREAD_FAILED) {
477 DPRINTF("thread failed\n");
478 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
480 else if (ms->pipe.cons_error == ms->pipe.cons_total) {
481 DPRINTF("all %u consumers failed\n",
482 ms->pipe.cons_error);
483 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
485 else {
486 if (master_host) {
487 meta_got_copy(ms->bucket,ms->key,me);
488 etag = NULL;
490 else {
491 etag = meta_did_put(ms->bucket,ms->key,me,
492 ms->size);
494 DPRINTF("rereplicate (obj PUT)\n");
495 recheck_replication(ms,NULL);
496 rc = MHD_HTTP_OK;
498 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
499 if (!resp) {
500 free(etag);
501 return MHD_NO;
503 if (etag) {
504 MHD_add_response_header(resp,"ETag",etag);
506 MHD_queue_response(conn,rc,resp);
507 MHD_destroy_response(resp);
510 return MHD_YES;
513 static int
514 proxy_get_attr (void *cctx, struct MHD_Connection *conn, const char *url,
515 const char *method, const char *version, const char *data,
516 size_t *data_size, void **rctx)
518 struct MHD_Response *resp;
519 char *fixed;
520 my_state *ms = *rctx;
521 int rc = MHD_HTTP_NOT_FOUND;
523 (void)cctx;
524 (void)method;
525 (void)version;
526 (void)data;
527 (void)data_size;
529 DPRINTF("PROXY GET ATTR %s\n",url);
531 if (meta_get_value(ms->bucket,ms->key,ms->attr,&fixed) == 0) {
532 resp = MHD_create_response_from_data(strlen(fixed),fixed,
533 MHD_NO,MHD_NO);
534 rc = MHD_HTTP_OK;
536 else {
537 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
539 if (!resp) {
540 return MHD_NO;
542 MHD_queue_response(conn,rc,resp);
543 MHD_destroy_response(resp);
545 return MHD_YES;
548 static int
549 proxy_put_attr (void *cctx, struct MHD_Connection *conn, const char *url,
550 const char *method, const char *version, const char *data,
551 size_t *data_size, void **rctx)
553 struct MHD_Response *resp;
554 my_state *ms = *rctx;
555 const char *attrval;
556 int send_resp = 0;
558 (void)cctx;
559 (void)method;
560 (void)version;
562 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url,*data_size);
564 if (ms->state == MS_NEW) {
565 ms->state = MS_NORMAL;
566 ms->url = strdup(url);
567 if (!ms->url) {
568 return MHD_NO;
570 attrval = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
571 "X-redhat-value");
572 if (attrval) {
573 meta_set_value(ms->bucket,ms->key,ms->attr,
574 (char *)attrval);
575 send_resp = 1;
578 else if (*data_size) {
579 if (ms->pipe.data_len) {
580 ms->pipe.data_len += *data_size;
581 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
582 if (!p) {
583 return MHD_NO;
585 ms->pipe.data_ptr = p;
587 else {
588 ms->pipe.data_len = *data_size + 1;
589 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
590 if (!ms->pipe.data_ptr) {
591 return MHD_NO;
593 ((char *)ms->pipe.data_ptr)[0] = '\0';
595 (void)strncat(ms->pipe.data_ptr,data,*data_size);
596 /* TBD: check return value */
597 *data_size = 0;
599 else {
600 if (!ms->pipe.data_ptr) {
601 return MHD_NO;
603 if (is_reserved(ms->attr,reserved_attr)) {
604 resp = MHD_create_response_from_data(
605 0,NULL,MHD_NO,MHD_NO);
606 if (!resp) {
607 return MHD_NO;
609 MHD_queue_response(conn,MHD_HTTP_BAD_REQUEST,
610 resp);
611 MHD_destroy_response(resp);
612 return MHD_YES;
614 meta_set_value(ms->bucket,ms->key,ms->attr,ms->pipe.data_ptr);
616 * We should always re-replicate, because the replication
617 * policy might refer to this attr.
619 DPRINTF("rereplicate (attr PUT)\n");
620 recheck_replication(ms,NULL);
621 send_resp = 1;
624 if (send_resp) {
625 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
626 if (!resp) {
627 return MHD_NO;
629 MHD_queue_response(conn,MHD_HTTP_CREATED,resp);
630 MHD_destroy_response(resp);
632 * TBD: check if the attribute was a replication policy, and
633 * start/stop replication activities as appropriate.
637 return MHD_YES;
640 /**********
641 * For queries, we have to deal with MHD's post-iterator interface (not
642 * quite the same as the data-iteration even though we use it that way) on
643 * one side, and a query-iterator interface on the other. Data on both
644 * sides could be quite large, so we can't just stick them in header lines.
645 * We do still buffer the query in memory, though. Once that's done, we do
646 * very simple parsing - it will be more complicated later - and create the
647 * query iterator. That's also driven by MHD, this time though the
648 * content-callback interface, and repeatedly calls in to the metadata
649 * module to fetch one object name at a time.
650 **********/
652 static int
653 query_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
654 const char *filename, const char *content_type,
655 const char *transfer_encoding, const char *data,
656 uint64_t off, size_t size)
658 (void)ctx;
659 (void)kind;
660 (void)key;
661 (void)filename;
662 (void)content_type;
663 (void)transfer_encoding;
664 (void)data;
665 (void)off;
666 (void)size;
668 /* We actually accumulate the data in proxy_query. */
669 return MHD_YES;
672 /* MHD reader function during queries. Return -1 for EOF. */
673 static ssize_t
674 proxy_query_func (void *ctx, uint64_t pos, char *buf, size_t max)
676 my_state *ms = ctx;
677 size_t len;
678 const char *accept_hdr;
679 char *bucket;
680 char *key;
682 (void)pos;
684 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
685 "Accept");
687 if (!ms->gen_ctx) {
688 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
689 if (!ms->gen_ctx) {
690 return -1;
692 len = tmpl_list_header(ms->gen_ctx);
693 if (!len) {
694 return -1;
696 if (len > max) {
697 len = max;
699 memcpy(buf,ms->gen_ctx->buf,len);
700 return len;
703 if (ms->gen_ctx == TMPL_CTX_DONE) {
704 return -1;
707 for(;;) {
708 if (!meta_query_next(ms->query,&bucket,&key)) {
709 break;
711 if (is_reserved(key,reserved_name)) {
712 continue;
714 len = tmpl_list_entry(ms->gen_ctx,bucket,key);
715 if (!len) {
716 return -1;
718 if (len > max) {
719 len = max;
721 memcpy(buf,ms->gen_ctx->buf,len);
722 return len;
725 len = tmpl_list_footer(ms->gen_ctx);
726 if (!len) {
727 return -1;
729 if (len > max) {
730 len = max;
732 memcpy(buf,ms->gen_ctx->buf,len);
733 free(ms->gen_ctx);
734 ms->gen_ctx = TMPL_CTX_DONE;
735 return len;
738 static int
739 proxy_query (void *cctx, struct MHD_Connection *conn, const char *url,
740 const char *method, const char *version, const char *data,
741 size_t *data_size, void **rctx)
743 struct MHD_Response *resp;
744 my_state *ms = *rctx;
746 (void)cctx;
747 (void)method;
748 (void)version;
750 DPRINTF("PROXY QUERY %s (%zu)\n",url,*data_size);
752 if (ms->state == MS_NEW) {
753 ms->state = MS_NORMAL;
754 ms->post = MHD_create_post_processor(conn,4096,
755 query_iterator,ms);
757 else if (*data_size) {
758 MHD_post_process(ms->post,data,*data_size);
759 if (ms->pipe.data_len) {
760 ms->pipe.data_len += *data_size;
761 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
762 if (!p) {
763 return MHD_NO;
765 ms->pipe.data_ptr = p;
767 else {
768 ms->pipe.data_len = *data_size + 1;
769 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
770 if (!ms->pipe.data_ptr) {
771 return MHD_NO;
773 ((char *)ms->pipe.data_ptr)[0] = '\0';
775 (void)strncat(ms->pipe.data_ptr,data,*data_size);
776 /* TBD: check return value */
777 *data_size = 0;
779 else {
780 if (!ms->pipe.data_ptr) {
781 return MHD_NO;
783 ms->query = meta_query_new(ms->bucket,NULL,ms->pipe.data_ptr);
784 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
785 65536, proxy_query_func, ms, simple_closer);
786 if (!resp) {
787 fprintf(stderr,"MHD_crfc failed\n");
788 simple_closer(ms);
789 return MHD_NO;
791 MHD_queue_response(conn,MHD_HTTP_OK,resp);
792 MHD_destroy_response(resp);
795 return MHD_YES;
798 static int
799 proxy_list_objs (void *cctx, struct MHD_Connection *conn, const char *url,
800 const char *method, const char *version, const char *data,
801 size_t *data_size, void **rctx)
803 my_state *ms = *rctx;
804 struct MHD_Response *resp;
806 (void)cctx;
807 (void)url;
808 (void)method;
809 (void)version;
810 (void)data;
811 (void)data_size;
813 ms->query = meta_query_new((char *)ms->bucket,NULL,NULL);
815 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
816 65536, proxy_query_func, ms, simple_closer);
817 if (!resp) {
818 fprintf(stderr,"MHD_crfc failed\n");
819 simple_closer(ms);
820 return MHD_NO;
823 MHD_queue_response(conn,MHD_HTTP_OK,resp);
824 MHD_destroy_response(resp);
825 return MHD_YES;
828 static int
829 proxy_delete (void *cctx, struct MHD_Connection *conn, const char *url,
830 const char *method, const char *version, const char *data,
831 size_t *data_size, void **rctx)
833 my_state *ms = *rctx;
834 struct MHD_Response *resp;
835 char *copied_url;
836 char *bucket;
837 char *key;
838 char *stctx = NULL;
839 int rc;
841 (void)cctx;
842 (void)method;
843 (void)version;
844 (void)data;
845 (void)data_size;
847 DPRINTF("PROXY DELETE %s\n",url);
849 provider_t *main_prov = get_main_provider();
850 ms->thunk.parent = ms;
851 ms->thunk.prov = main_prov;
852 rc = ms->thunk.prov->func_tbl->delete_func(main_prov,
853 ms->bucket,ms->key,url);
854 if (rc == MHD_HTTP_OK) {
855 copied_url = strdup(url);
856 assert (copied_url);
857 bucket = strtok_r(copied_url,"/",&stctx);
858 key = strtok_r(NULL,"/",&stctx);
859 meta_delete(bucket,key);
860 free(copied_url);
861 replicate_delete(url,ms);
864 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
865 if (!resp) {
866 return MHD_NO;
868 error (0, 0, "DELETE BUCKET: rc=%d", rc);
869 MHD_queue_response(conn,rc,resp);
870 MHD_destroy_response(resp);
872 return MHD_YES;
875 /* TBD: get actual bucket list */
876 typedef struct {
877 const char *rel;
878 const char *link;
879 } fake_bucket_t;
881 /* FIXME: ensure that the RHS values here stay in sync with those
882 in reserved_bucket_name. */
883 static const fake_bucket_t fake_bucket_list[] = {
884 { "bucket_factory", "_new" },
885 { "provider_list", "_providers" },
888 static ssize_t
889 root_blob_generator (void *ctx, uint64_t pos, char *buf, size_t max)
891 my_state *ms = ctx;
892 const fake_bucket_t *fb;
893 size_t len;
894 const char *accept_hdr;
895 const char *host;
896 char *bucket;
897 char *key;
899 (void)pos;
901 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
902 "Accept");
903 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
905 if (!ms->gen_ctx) {
906 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
907 if (!ms->gen_ctx) {
908 return -1;
910 ms->gen_ctx->base = host;
911 len = tmpl_root_header(ms->gen_ctx,"image_warehouse",VERSION);
912 if (!len) {
913 return -1;
915 if (len > max) {
916 len = max;
918 memcpy(buf,ms->gen_ctx->buf,len);
919 return len;
922 if (ms->gen_ctx == TMPL_CTX_DONE) {
923 return -1;
926 if (ms->gen_ctx->index < ARRAY_SIZE(fake_bucket_list)) {
927 fb = fake_bucket_list + ms->gen_ctx->index;
928 len = tmpl_root_entry(ms->gen_ctx,fb->rel,fb->link);
929 if (!len) {
930 return -1;
932 if (len > max) {
933 len = max;
935 memcpy(buf,ms->gen_ctx->buf,len);
936 return len;
939 if (meta_query_next(ms->query,&bucket,&key)) {
940 len = tmpl_root_entry(ms->gen_ctx,"bucket",bucket);
941 if (!len) {
942 return -1;
944 if (len > max) {
945 len = max;
947 memcpy(buf,ms->gen_ctx->buf,len);
948 return len;
951 len = tmpl_root_footer(ms->gen_ctx);
952 if (!len) {
953 return -1;
955 if (len > max) {
956 len = max;
958 memcpy(buf,ms->gen_ctx->buf,len);
959 free(ms->gen_ctx);
960 ms->gen_ctx = TMPL_CTX_DONE;
961 return len;
964 static int
965 proxy_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
966 const char *method, const char *version, const char *data,
967 size_t *data_size, void **rctx)
969 struct MHD_Response *resp = NULL;
970 unsigned int rc = MHD_HTTP_OK;
971 my_state *ms = *rctx;
973 (void)cctx;
974 (void)method;
975 (void)version;
976 (void)data;
978 DPRINTF("PROXY API ROOT (%s, %zu)\n",url,*data_size);
980 ms->query = meta_query_new(NULL,"_default",NULL);
981 if (!ms->query) {
982 return MHD_NO;
984 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
985 65536, root_blob_generator, ms, simple_closer);
986 if (!resp) {
987 return MHD_NO;
989 MHD_queue_response(conn,rc,resp);
990 MHD_destroy_response(resp);
992 return MHD_YES;
996 static int
997 post_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
998 const char *filename, const char *content_type,
999 const char *transfer_encoding, const char *data,
1000 uint64_t off, size_t size)
1002 char *old_val;
1003 size_t old_len;
1004 char *new_val;
1006 (void)kind;
1007 (void)filename;
1008 (void)content_type;
1009 (void)transfer_encoding;
1010 (void)off;
1012 printf("adding %s, size=%zu\n",key,size);
1014 // TBD: don't assume that values are null-terminated strings
1015 old_val = g_hash_table_lookup(ctx,key);
1016 if (old_val) {
1017 old_len = strlen(old_val);
1018 new_val = malloc(old_len+size+1);
1019 if (!new_val) {
1020 return MHD_NO;
1022 memcpy(new_val,old_val,old_len);
1023 memcpy(new_val+old_len,data,size);
1024 new_val[old_len+size] = '\0';
1026 else {
1027 new_val = malloc(size+1);
1028 if (!new_val) {
1029 return MHD_NO;
1031 memcpy(new_val,data,size);
1032 new_val[size] = '\0';
1035 g_hash_table_insert(ctx,strdup(key),new_val);
1036 /* TBD: check return value for strdups (none avail for insert) */
1037 return MHD_YES;
1040 /* Returns TRUE if we found an *invalid* key. */
1041 static gboolean
1042 post_find (gpointer key, gpointer value, gpointer ctx)
1044 (void)value;
1045 (void)ctx;
1047 if (!is_reserved(key,reserved_attr)) {
1048 return FALSE;
1051 DPRINTF("bad attr %s\n", (char *)key);
1052 return TRUE;
1055 static void
1056 post_foreach (gpointer key, gpointer value, gpointer ctx)
1058 my_state *ms = ctx;
1060 DPRINTF("setting %s = %s for %s/%s\n",(char *)key, (char *)value,
1061 ms->bucket,ms->key);
1062 meta_set_value(ms->bucket,ms->key,key,value);
1065 static int
1066 create_bucket (char *name, my_state *ms)
1068 int rc;
1070 if (is_reserved(name, reserved_name)
1071 || is_reserved(name, reserved_bucket_name)) {
1072 return MHD_HTTP_BAD_REQUEST;
1075 provider_t *main_prov = get_main_provider();
1076 rc = main_prov->func_tbl->bcreate_func(main_prov,name);
1077 if (rc == MHD_HTTP_OK) {
1078 if (meta_set_value(name,"_default", "_policy","0") != 0) {
1079 DPRINTF("default-policy " "create failed\n");
1080 /* Non-fatal. */
1082 DPRINTF("created bucket %s\n",name);
1084 * There's not a whole lot to do about bucket-creation
1085 * failures on replicas, other than to report them, unless
1086 * we adopt an "all or nothing" approach and unwind the
1087 * create on the primary as well. Then what if that fails?
1088 * It's just one example of the general "fewer replicas
1089 * than desired" distributed-system problem, not worth a
1090 * point solution here/now. Revisit when we have a more
1091 * general replica-repair policy/system in place.
1093 replicate_bcreate(name,ms);
1096 return rc;
1099 static int
1100 control_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1101 const char *method, const char *version, const char *data,
1102 size_t *data_size, void **rctx)
1104 struct MHD_Response *resp;
1105 my_state *ms = *rctx;
1106 int rc;
1107 char *op;
1108 char buf[80];
1109 int len;
1111 (void)cctx;
1112 (void)method;
1113 (void)version;
1115 DPRINTF("ROOT POST (%s, %zu)\n",url,*data_size);
1117 if (ms->state == MS_NEW) {
1118 ms->state = MS_NORMAL;
1119 ms->url = (char *)url;
1120 ms->dict = g_hash_table_new_full(
1121 g_str_hash,g_str_equal,NULL,NULL);
1122 ms->post = MHD_create_post_processor(conn,4096,
1123 post_iterator,ms->dict);
1124 return MHD_YES;
1127 if (*data_size) {
1128 MHD_post_process(ms->post,data,*data_size);
1129 *data_size = 0;
1130 return MHD_YES;
1133 rc = MHD_HTTP_BAD_REQUEST;
1135 op = g_hash_table_lookup(ms->dict,"op");
1136 if (op) {
1137 if (!strcmp(op,"rep_status")) {
1138 len = snprintf(buf,sizeof(buf),"%d requests\n",
1139 get_rep_count());
1140 rc = MHD_HTTP_OK;
1142 else {
1143 len = snprintf(buf,sizeof(buf),"unknown op");
1146 else {
1147 len = snprintf(buf,sizeof(buf),"missing op");
1150 if (len >= (int)sizeof(buf)) {
1151 len = 0;
1152 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
1155 /* NB The last arg tells MHD to copy the arg and free it later. */
1156 resp = MHD_create_response_from_data(len,buf,MHD_NO,MHD_YES);
1157 if (!resp) {
1158 return MHD_NO;
1160 MHD_queue_response(conn,rc,resp);
1161 MHD_destroy_response(resp);
1163 return MHD_YES;
1166 static int
1167 proxy_bucket_post (void *cctx, struct MHD_Connection *conn, const char *url,
1168 const char *method, const char *version, const char *data,
1169 size_t *data_size, void **rctx)
1171 struct MHD_Response *resp;
1172 my_state *ms = *rctx;
1173 int rc;
1174 char *key;
1176 (void)cctx;
1177 (void)method;
1178 (void)version;
1180 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1182 if (ms->state == MS_NEW) {
1183 ms->state = MS_NORMAL;
1184 ms->url = (char *)url;
1185 ms->dict = g_hash_table_new_full(
1186 g_str_hash,g_str_equal,NULL,NULL);
1187 ms->post = MHD_create_post_processor(conn,4096,
1188 post_iterator,ms->dict);
1190 else if (*data_size) {
1191 MHD_post_process(ms->post,data,*data_size);
1192 *data_size = 0;
1194 else {
1195 rc = MHD_HTTP_BAD_REQUEST;
1196 key = g_hash_table_lookup(ms->dict,"_key");
1197 if (key) {
1198 strncpy(ms->key,key,MAX_FIELD_LEN-1);
1199 g_hash_table_remove(ms->dict,"_key");
1200 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1201 g_hash_table_foreach(ms->dict,post_foreach,ms);
1202 DPRINTF("rereplicate (bucket POST)\n");
1203 recheck_replication(ms,NULL);
1204 rc = MHD_HTTP_OK;
1207 else if (!strcmp(ms->bucket,"_new")) {
1208 key = g_hash_table_lookup(ms->dict,"name");
1209 if (key != NULL) {
1210 rc = create_bucket(key,ms);
1213 else {
1214 DPRINTF("A parameter is MISSING (fail)\n");
1216 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1217 if (!resp) {
1218 fprintf(stderr,"MHD_crfd failed\n");
1219 return MHD_NO;
1221 MHD_queue_response(conn,rc,resp);
1222 MHD_destroy_response(resp);
1225 return MHD_YES;
1228 static int
1229 check_location (my_state *ms)
1231 char *loc = g_hash_table_lookup(ms->dict,"depot");
1233 if (!loc) {
1234 DPRINTF("missing loc on check for %s/%s\n",ms->bucket,ms->key);
1235 return MHD_HTTP_BAD_REQUEST;
1238 if (!meta_has_copy(ms->bucket,ms->key,loc)) {
1239 DPRINTF("did not find %s/%s at %s\n",ms->bucket,ms->key,loc);
1240 return MHD_HTTP_NOT_FOUND;
1243 /* TBD: meta_has_copy returns an etag which we should check */
1244 DPRINTF("found %s/%s at %s\n",ms->bucket,ms->key,loc);
1245 return MHD_HTTP_OK;
1248 static int
1249 register_image (my_state *ms)
1251 const char *site;
1252 const provider_t *prov;
1253 char *next;
1255 site = g_hash_table_lookup(ms->dict,"site");
1256 if (!site) {
1257 printf("site MISSING\n");
1258 return MHD_HTTP_BAD_REQUEST;
1261 next = strchr(site,':');
1262 if (next) {
1263 *(next++) = '\0';
1266 prov = get_provider(site);
1267 if (!prov) {
1268 DPRINTF("site %s not found\n",site);
1269 return MHD_HTTP_BAD_REQUEST;
1272 return prov->func_tbl->register_func(ms,prov,next,ms->dict);
1276 static ssize_t
1277 parts_callback (void *ctx, uint64_t pos, char *buf, size_t max)
1279 my_state *ms = ctx;
1280 size_t len;
1281 const char *accept_hdr;
1282 const char *name;
1283 const char *value;
1284 const char *host;
1286 (void)pos;
1288 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1289 "Accept");
1290 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
1292 if (!ms->gen_ctx) {
1293 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1294 if (!ms->gen_ctx) {
1295 return -1;
1297 ms->gen_ctx->base = host;
1298 len = tmpl_obj_header(ms->gen_ctx,ms->bucket,ms->key);
1299 if (!len) {
1300 return -1;
1302 if (len > max) {
1303 len = max;
1305 memcpy(buf,ms->gen_ctx->buf,len);
1306 return len;
1309 if (ms->gen_ctx == TMPL_CTX_DONE) {
1310 return -1;
1314 // Set up and use query for what attributes exist.
1315 for(;;) {
1316 if (!meta_attr_next(ms->aquery,&name,&value)) {
1317 break;
1319 if (is_reserved(name,reserved_attr)) {
1320 continue;
1322 len = tmpl_obj_entry(ms->gen_ctx,ms->bucket,ms->key,name);
1323 if (!len) {
1324 return -1;
1326 if (len > max) {
1327 len = max;
1329 memcpy(buf,ms->gen_ctx->buf,len);
1330 return len;
1333 len = tmpl_obj_footer(ms->gen_ctx);
1334 if (!len) {
1335 return -1;
1337 if (len > max) {
1338 len = max;
1340 memcpy(buf,ms->gen_ctx->buf,len);
1341 free(ms->gen_ctx);
1342 ms->gen_ctx = TMPL_CTX_DONE;
1343 return len;
1346 static int
1347 show_parts (struct MHD_Connection *conn, my_state *ms)
1349 struct MHD_Response *resp;
1351 ms->aquery = meta_get_attrs(ms->bucket,ms->key);
1352 if (!ms->aquery) {
1353 return MHD_HTTP_NOT_FOUND;
1356 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1357 65536, parts_callback, ms, simple_closer);
1358 if (!resp) {
1359 fprintf(stderr,"MHD_crfc failed\n");
1360 simple_closer(ms);
1361 return MHD_NO;
1363 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1364 MHD_destroy_response(resp);
1365 return MHD_HTTP_PROCESSING;
1368 static int
1369 proxy_object_post (void *cctx, struct MHD_Connection *conn, const char *url,
1370 const char *method, const char *version, const char *data,
1371 size_t *data_size, void **rctx)
1373 struct MHD_Response *resp;
1374 my_state *ms = *rctx;
1375 int rc;
1376 char *op;
1378 (void)cctx;
1379 (void)method;
1380 (void)version;
1382 DPRINTF("PROXY POST obj (%s, %zu)\n",url,*data_size);
1384 if (ms->state == MS_NEW) {
1385 ms->state = MS_NORMAL;
1386 ms->url = (char *)url;
1387 ms->dict = g_hash_table_new_full(
1388 g_str_hash,g_str_equal,NULL,NULL);
1389 ms->post = MHD_create_post_processor(conn,4096,
1390 post_iterator,ms->dict);
1392 else if (*data_size) {
1393 MHD_post_process(ms->post,data,*data_size);
1394 *data_size = 0;
1396 else {
1397 rc = MHD_HTTP_BAD_REQUEST;
1398 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1399 op = g_hash_table_lookup(ms->dict,"op");
1400 if (op) {
1401 if (!strcmp(op,"push")) {
1402 DPRINTF("rereplicate (obj POST)\n");
1403 recheck_replication(ms,NULL);
1404 rc = MHD_HTTP_OK;
1406 else if (!strcmp(op,"check")) {
1407 rc = check_location(ms);
1409 else if (!strcmp(op,"register")) {
1410 rc = register_image(ms);
1412 else if (!strcmp(op,"parts")) {
1413 rc = show_parts(conn,ms);
1415 else {
1416 DPRINTF("unknown op %s for %s/%s\n",
1417 op, ms->bucket, ms->key);
1420 else {
1421 DPRINTF("op is MISSING (fail)\n");
1424 if (rc != MHD_HTTP_PROCESSING) {
1426 * MHD_HTTP_PROCESSING is a special response that
1427 * means a request-specific routine (e.g. show_parts)
1428 * created its own response. Therefore we shouldn't.
1430 resp = MHD_create_response_from_data(0,NULL,
1431 MHD_NO,MHD_NO);
1432 if (!resp) {
1433 fprintf(stderr,"MHD_crfd failed\n");
1434 return MHD_NO;
1436 MHD_queue_response(conn,rc,resp);
1437 MHD_destroy_response(resp);
1441 return MHD_YES;
1446 static ssize_t
1447 prov_list_generator (void *ctx, uint64_t pos, char *buf, size_t max)
1449 my_state *ms = ctx;
1450 size_t len;
1451 gpointer key;
1452 const provider_t *prov;
1453 const char *accept_hdr;
1455 (void)pos;
1457 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1458 "Accept");
1460 if (!ms->gen_ctx) {
1461 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1462 if (!ms->gen_ctx) {
1463 return -1;
1465 init_prov_iter(&ms->prov_iter);
1466 len = tmpl_prov_header(ms->gen_ctx);
1467 if (!len) {
1468 return -1;
1470 if (len > max) {
1471 len = max;
1473 memcpy(buf,ms->gen_ctx->buf,len);
1474 return len;
1477 if (ms->gen_ctx == TMPL_CTX_DONE) {
1478 return -1;
1481 if (g_hash_table_iter_next(&ms->prov_iter,&key,(gpointer *)&prov)) {
1482 if (prov->deleted)
1483 return 0;
1484 len = tmpl_prov_entry(ms->gen_ctx,prov->name,prov->type,
1485 prov->host, prov->port, prov->username, prov->password);
1486 if (!len) {
1487 return -1;
1489 if (len > max) {
1490 len = max;
1492 memcpy(buf,ms->gen_ctx->buf,len);
1493 return len;
1496 len = tmpl_prov_footer(ms->gen_ctx);
1497 if (!len) {
1498 return -1;
1500 if (len > max) {
1501 len = max;
1503 memcpy(buf,ms->gen_ctx->buf,len);
1504 free(ms->gen_ctx);
1505 ms->gen_ctx = TMPL_CTX_DONE;
1506 return len;
1509 static int
1510 proxy_list_provs (void *cctx, struct MHD_Connection *conn, const char *url,
1511 const char *method, const char *version, const char *data,
1512 size_t *data_size, void **rctx)
1514 struct MHD_Response *resp;
1515 my_state *ms = *rctx;
1517 (void)cctx;
1518 (void)url;
1519 (void)method;
1520 (void)version;
1521 (void)data;
1522 (void)data_size;
1524 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1525 65536, prov_list_generator, ms, simple_closer);
1526 if (!resp) {
1527 fprintf(stderr,"MHD_crfd failed\n");
1528 simple_closer(ms);
1529 return MHD_NO;
1531 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1532 MHD_destroy_response(resp);
1534 return MHD_YES;
1537 static int
1538 prov_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1539 const char *filename, const char *content_type,
1540 const char *transfer_encoding, const char *data,
1541 uint64_t off, size_t size)
1543 (void)kind;
1544 (void)filename;
1545 (void)content_type;
1546 (void)transfer_encoding;
1547 (void)off;
1549 g_hash_table_insert(ctx,strdup(key),strndup(data,size));
1550 /* TBD: check return value for strdups (none avail for insert) */
1551 return MHD_YES;
1555 static int
1556 proxy_update_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1557 const char *method, const char *version, const char *data,
1558 size_t *data_size, void **rctx)
1560 struct MHD_Response *resp;
1561 my_state *ms = *rctx;
1562 int rc;
1563 char *provider;
1564 char *username;
1565 char *password;
1567 (void)cctx;
1568 (void)method;
1569 (void)version;
1571 if (ms->state == MS_NEW) {
1572 ms->state = MS_NORMAL;
1573 ms->url = (char *)url;
1574 ms->dict = g_hash_table_new_full(
1575 g_str_hash,g_str_equal,NULL,NULL);
1576 ms->post = MHD_create_post_processor(conn,4096,
1577 prov_iterator,ms->dict);
1579 else if (*data_size) {
1580 MHD_post_process(ms->post,data,*data_size);
1581 *data_size = 0;
1583 else {
1584 rc = MHD_HTTP_BAD_REQUEST;
1585 provider = g_hash_table_lookup(ms->dict,"provider");
1586 username = g_hash_table_lookup(ms->dict,"username");
1587 password = g_hash_table_lookup(ms->dict,"password");
1588 if (provider && username && password) {
1589 update_provider(provider,username,password);
1590 rc = MHD_HTTP_OK;
1592 else {
1593 DPRINTF("provider/username/password MISSING\n");
1595 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1596 if (!resp) {
1597 fprintf(stderr,"MHD_crfd failed\n");
1598 return MHD_NO;
1600 MHD_queue_response(conn,rc,resp);
1601 MHD_destroy_response(resp);
1604 return MHD_YES;
1607 static char *
1608 url_to_provider_name (const char *url)
1610 char *p = strdup (url);
1611 if (p == NULL)
1612 return NULL;
1614 /* Ensure we handle trailing slashes (i.e., remove them). */
1615 strip_trailing_slashes (p);
1617 char *prov_name = strdup (last_component (p));
1618 free (p);
1619 return prov_name;
1622 static int
1623 proxy_primary_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1624 const char *method, const char *version, const char *data,
1625 size_t *data_size, void **rctx)
1627 (void)cctx;
1628 (void)method;
1629 (void)version;
1630 (void)data;
1632 DPRINTF("PROXY GET PRIMARY PROVIDER (%s)\n", url);
1634 my_state *ms = *rctx;
1636 // "/_providers/_primary" is the only one we accept for now.
1637 bool valid = strcmp (url, "/_providers/_primary") == 0;
1638 unsigned int rc = (valid ? MHD_HTTP_OK : MHD_HTTP_BAD_REQUEST);
1639 if (!valid)
1640 error (0, 0, "invalid request: %s", url);
1642 const char *name = get_main_provider()->name;
1643 struct MHD_Response *resp;
1644 resp = MHD_create_response_from_data(valid ? strlen (name) : 0,
1645 valid ? (void *) name : NULL,
1646 MHD_NO, MHD_NO);
1647 if (!resp) {
1648 return MHD_NO;
1650 MHD_queue_response(conn,rc,resp);
1651 MHD_destroy_response(resp);
1653 return MHD_YES;
1656 static int
1657 proxy_set_primary (void *cctx, struct MHD_Connection *conn, const char *url,
1658 const char *method, const char *version, const char *data,
1659 size_t *data_size, void **rctx)
1661 (void)cctx;
1662 (void)method;
1663 (void)version;
1664 (void)data;
1666 DPRINTF("PROXY SET PRIMARY PROVIDER (%s)\n", url);
1668 my_state *ms = *rctx;
1669 char *name = NULL;
1670 unsigned int rc = MHD_HTTP_BAD_REQUEST;
1672 /* URL is guaranteed to be of the form "/_providers/NAME/_primary"
1673 Extract NAME: */
1674 bool valid = memcmp (url, "/_providers/", strlen("/_providers/")) == 0;
1675 if (!valid) {
1676 error (0, 0, "invalid request: %s", url);
1677 goto bad_set;
1679 const char *start = url + strlen("/_providers/");
1680 const char *slash = strchr (start, '/');
1681 if (slash == NULL) {
1682 error (0, 0, "invalid request: %s", url);
1683 goto bad_set;
1685 name = strndup (start, slash - start);
1686 if (name == NULL) {
1687 error (0, errno, "failed to extract provider name: %s", url);
1688 goto bad_set;
1691 /* If it's not a provider name, you lose. */
1692 provider_t *prov = find_provider (name);
1693 if (prov) {
1694 rc = MHD_HTTP_OK;
1695 set_main_provider (prov);
1698 bad_set:
1699 free (name);
1701 struct MHD_Response *resp;
1702 resp = MHD_create_response_from_data(0, NULL, MHD_NO, MHD_NO);
1703 if (!resp) {
1704 return MHD_NO;
1706 MHD_queue_response(conn,rc,resp);
1707 MHD_destroy_response(resp);
1709 return MHD_YES;
1712 static int
1713 proxy_delete_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1714 const char *method, const char *version, const char *data,
1715 size_t *data_size, void **rctx)
1717 DPRINTF("PROXY DELETE PROVIDER %s\n",url);
1718 (void)cctx;
1719 (void)method;
1720 (void)version;
1721 (void)data;
1722 (void)data_size;
1724 my_state *ms = *rctx;
1725 struct MHD_Response *resp
1726 = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1727 if (!resp) {
1728 return MHD_NO;
1731 char *prov_name = url_to_provider_name (url);
1732 provider_t *prov = find_provider (prov_name);
1734 // don't allow removal of current main provider.
1735 if (prov == get_main_provider())
1736 prov = NULL;
1738 int rc = prov ? MHD_HTTP_OK : MHD_HTTP_NOT_FOUND;
1740 DPRINTF("PROXY DELETE PROVIDER prov=%s rc=%d\n", prov_name, rc);
1742 if (prov) {
1743 prov->deleted = 1;
1746 free (prov_name);
1747 MHD_queue_response(conn,rc,resp);
1748 MHD_destroy_response(resp);
1750 return MHD_YES;
1753 static int
1754 proxy_add_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1755 const char *method, const char *version, const char *data,
1756 size_t *data_size, void **rctx)
1758 DPRINTF("PROXY ADD PROVIDER %s\n",url);
1759 struct MHD_Response *resp;
1760 my_state *ms = *rctx;
1762 (void)cctx;
1763 (void)method;
1764 (void)version;
1766 if (ms->state == MS_NEW) {
1767 ms->state = MS_NORMAL;
1768 ms->url = (char *)url;
1769 ms->dict = g_hash_table_new_full(
1770 g_str_hash,g_str_equal,NULL,NULL);
1771 ms->post = MHD_create_post_processor(conn,4096,
1772 prov_iterator,ms->dict);
1774 else if (*data_size) {
1775 MHD_post_process(ms->post,data,*data_size);
1776 *data_size = 0;
1778 else {
1779 int rc = MHD_HTTP_BAD_REQUEST;
1780 char *prov_name = url_to_provider_name (url);
1781 /* We're about to insert "name -> $prov_name".
1782 Ensure there is no "name" key already there. */
1783 const char *name = g_hash_table_lookup (ms->dict, "name");
1784 if (name) {
1785 fprintf(stderr,
1786 "add_provider: do not specify name: name=%s\n",
1787 name);
1788 goto add_fail;
1791 // another reserved word: provider name
1792 // FIXME: don't hard-code it here
1793 if (strcmp (prov_name, "_primary") == 0) {
1794 fprintf(stderr,
1795 "add_provider: %s is a reserved name\n",
1796 prov_name);
1797 goto add_fail;
1800 // FIXME: unchecked strdup
1801 g_hash_table_insert(ms->dict,strdup("name"),prov_name);
1803 if (validate_provider (ms->dict)) {
1804 if (!add_provider (ms->dict)) {
1805 DPRINTF("add provider failed\n");
1806 } else {
1807 rc = MHD_HTTP_OK;
1810 else {
1811 DPRINTF("invalid provider\n");
1814 add_fail:
1815 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1816 if (!resp) {
1817 fprintf(stderr,"MHD_crfd failed\n");
1818 return MHD_NO;
1820 MHD_queue_response(conn,rc,resp);
1821 MHD_destroy_response(resp);
1824 return MHD_YES;
1827 static int
1828 proxy_create_bucket (void *cctx, struct MHD_Connection *conn, const char *url,
1829 const char *method, const char *version, const char *data,
1830 size_t *data_size, void **rctx)
1832 struct MHD_Response *resp;
1833 my_state *ms = *rctx;
1834 int rc;
1836 (void)cctx;
1837 (void)method;
1838 (void)version;
1839 (void)data;
1840 (void)data_size;
1841 (void)url;
1843 /* curl -T moo.empty http://localhost:9090/_new by accident */
1844 rc = create_bucket(ms->bucket,ms);
1846 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1847 if (!resp) {
1848 fprintf(stderr,"MHD_crfd failed\n");
1849 return MHD_NO;
1851 MHD_queue_response(conn,rc,resp);
1852 MHD_destroy_response(resp);
1854 return MHD_YES;
1857 static const rule my_rules[] = {
1858 { /* get bucket list */
1859 "GET", URL_ROOT, proxy_api_root },
1860 { /* perform a control operation on the API root */
1861 "POST", URL_ROOT, control_api_root },
1862 { /* get object list */
1863 "GET", URL_BUCKET, proxy_list_objs },
1864 { /* create bucket */
1865 "PUT", URL_BUCKET, proxy_create_bucket },
1866 { /* get object data */
1867 "GET", URL_OBJECT, proxy_get_data },
1868 { /* get attribute data */
1869 "GET", URL_ATTR, proxy_get_attr },
1870 { /* put object data */
1871 "PUT", URL_OBJECT, proxy_put_data },
1872 { /* put attribute data */
1873 "PUT", URL_ATTR, proxy_put_attr },
1874 { /* create object and/or modify attributes */
1875 "POST", URL_BUCKET, proxy_bucket_post },
1876 { /* perform control operations on an object */
1877 "POST", URL_OBJECT, proxy_object_post },
1878 { /* query */
1879 "POST", URL_QUERY, proxy_query },
1880 { /* delete object */
1881 "DELETE", URL_OBJECT, proxy_delete },
1882 { /* delete attribute (TBD) */
1883 "DELETE", URL_ATTR, NULL },
1884 { /* get provider list */
1885 "GET", URL_PROVLIST, proxy_list_provs },
1886 { /* update a provider */
1887 "POST", URL_PROVLIST, proxy_update_prov },
1888 { /* get the primary provider */
1889 "GET", URL_PROVIDER, proxy_primary_prov },
1890 { /* create a provider */
1891 "POST", URL_PROVIDER, proxy_add_prov },
1892 { /* delete a provider */
1893 "DELETE", URL_PROVIDER, proxy_delete_prov },
1894 { /* set the primary provider */
1895 "PUT", URL_PROVIDER_SET_PRIMARY, proxy_set_primary },
1896 { NULL, 0, NULL }
1899 static url_type
1900 parse_url (const char *url, my_state *ms)
1902 unsigned short esize;
1903 unsigned short eindex;
1904 char *parts[URL_INVAL];
1906 if (strstr(url,"../")) {
1907 /* Naughty, naughty. Never a good reason to allow this. */
1908 DPRINTF("Rejecting ../ in path.\n");
1909 return URL_INVAL;
1912 eindex = URL_ROOT;
1913 parts[URL_BUCKET] = ms->bucket;
1914 parts[URL_OBJECT] = ms->key;
1915 parts[URL_ATTR] = ms->attr;
1917 for (;;) {
1918 while (*url == '/') {
1919 ++url;
1922 if (!*url) {
1923 if (eindex == URL_BUCKET) {
1924 if (!strcmp(ms->bucket,"_providers")) {
1925 eindex = URL_PROVLIST;
1928 else if (eindex == URL_OBJECT) {
1929 if (!strcmp(ms->key,"_query")) {
1930 eindex = URL_QUERY;
1933 break;
1936 if (++eindex >= URL_INVAL) {
1937 return URL_INVAL;
1939 esize = 0;
1941 while (*url && (*url != '/')) {
1942 parts[eindex][esize++] = *(url++);
1943 if (esize >= MAX_FIELD_LEN) {
1944 return URL_INVAL;
1949 if (eindex == URL_OBJECT
1950 && !strcmp (parts[URL_BUCKET], "_providers"))
1951 eindex = URL_PROVIDER;
1952 else if (eindex == URL_ATTR
1953 && !strcmp (parts[URL_BUCKET], "_providers")
1954 && !strcmp (parts[URL_ATTR], "_primary"))
1955 eindex = URL_PROVIDER_SET_PRIMARY;
1957 DPRINTF("parse_url: %d: %s %s %s", eindex, parts[URL_BUCKET],
1958 parts[URL_OBJECT], parts[URL_ATTR]);
1959 return eindex;
1962 static int
1963 access_handler (void *cctx, struct MHD_Connection *conn, const char *url,
1964 const char *method, const char *version, const char *data,
1965 size_t *data_size, void **rctx)
1967 unsigned int i;
1968 url_type utype;
1969 struct MHD_Response *resp;
1970 my_state *ms = *rctx;
1972 if (ms) {
1973 return ms->handler(cctx,conn,url,method,version,
1974 data,data_size,rctx);
1977 ms = calloc(sizeof(*ms), 1);
1978 if (!ms) {
1979 return MHD_NO;
1982 utype = parse_url(url,ms);
1984 for (i = 0; my_rules[i].method; ++i) {
1985 if (utype != my_rules[i].utype) {
1986 continue;
1988 if (strcmp(method,my_rules[i].method)) {
1989 continue;
1991 if (!my_rules[i].handler) {
1992 break;
1994 ms->handler = my_rules[i].handler;
1995 ms->state = MS_NEW;
1996 ms->url = NULL;
1997 ms->post = NULL;
1998 ms->conn = conn;
1999 *rctx = ms;
2000 return ms->handler(cctx,conn,url,method,version,
2001 data,data_size,rctx);
2004 if (!strcmp(method,"QUIT")) {
2005 (void)sem_post((sem_t *)cctx);
2006 return MHD_NO;
2009 fprintf(stderr,"bad request m=%s u=%s\n",method,url);
2011 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
2012 if (!resp) {
2013 return MHD_NO;
2015 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
2016 MHD_destroy_response(resp);
2017 return MHD_YES;
2020 /* These enum values cannot possibly conflict with the option values
2021 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
2022 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
2023 enum
2025 GETOPT_HELP_CHAR = (CHAR_MIN - 2),
2026 GETOPT_VERSION_CHAR = (CHAR_MIN - 3)
2029 static const struct option my_options[] = {
2030 { "autostart", no_argument, NULL, 'a' },
2031 { "config", required_argument, NULL, 'c' },
2032 { "db", required_argument, NULL, 'd' },
2033 { "master", required_argument, NULL, 'm' },
2034 { "port", required_argument, NULL, 'p' },
2035 { "verbose", no_argument, NULL, 'v' },
2036 { "version", no_argument, NULL, GETOPT_VERSION_CHAR },
2037 { "help", no_argument, NULL, GETOPT_HELP_CHAR },
2038 { NULL, 0, NULL, '\0' }
2041 static void ATTRIBUTE_NORETURN
2042 usage (int status)
2044 if (status != EXIT_SUCCESS)
2045 fprintf (stderr, _("Try `%s --help' for more information.\n"),
2046 program_name);
2047 else
2049 printf (_("\
2050 Usage: %s [OPTION]\n\
2052 program_name);
2053 fputs (_("\
2054 Deltacloud image-warehouse daemon.\n\
2055 A configuration file must be specified.\n\
2057 -a, --autostart start necessary back-end services\n\
2058 -c, --config=FILE config file [required]\n\
2059 -d, --db=HOST_PORT database server as ip[:port]\n\
2060 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
2061 -p, --port=PORT alternate listen port (default 9090)\n\
2062 -v, --verbose verbose/debug output\n\
2064 --help display this help and exit\n\
2065 --version output version information and exit\n\
2066 "), stdout);
2067 printf (_("\
2069 Report %s bugs to %s.\n\
2071 program_name, PACKAGE_BUGREPORT);
2073 exit (status);
2078 main (int argc, char **argv)
2080 struct MHD_Daemon *the_daemon;
2081 sem_t the_sem;
2082 char *stctx = NULL;
2083 char *port_tmp;
2084 bool autostart = false;
2086 set_program_name (argv[0]);
2087 atexit (close_stdout);
2088 GC_INIT ();
2090 for (;;) switch (getopt_long(argc,argv,"ac:d:m:p:v",my_options,NULL)) {
2091 case 'a':
2092 autostart = true;
2093 break;
2094 case 'c':
2095 cfg_file = optarg;
2096 break;
2097 case 'd':
2098 assert (optarg);
2099 db_host = strtok_r(optarg,":",&stctx);
2100 port_tmp = strtok_r(NULL,":",&stctx);
2101 if (port_tmp) {
2102 db_port = (unsigned short)strtoul(port_tmp,NULL,10);
2104 break;
2105 case 'm':
2106 assert (optarg);
2107 master_host = strtok_r(optarg,":",&stctx);
2108 port_tmp = strtok_r(NULL,":",&stctx);
2109 if (port_tmp) {
2110 master_port = (unsigned short)strtoul(port_tmp,NULL,10);
2112 break;
2113 case 'p':
2114 my_port = (unsigned short)strtoul(optarg,NULL,10);
2115 break;
2116 case 'v':
2117 ++verbose;
2118 break;
2119 case GETOPT_HELP_CHAR:
2120 usage(EXIT_SUCCESS);
2121 break;
2122 case GETOPT_VERSION_CHAR:
2123 printf ("%s version %s\n", program_name, PACKAGE_VERSION);
2124 exit (EXIT_SUCCESS);
2125 break;
2127 case -1:
2128 goto args_done;
2129 default:
2130 usage(EXIT_FAILURE);
2131 break;
2133 args_done:
2135 if (!db_port) {
2136 db_port = autostart ? AUTO_MONGOD_PORT : 27017;
2139 if (autostart && cfg_file) {
2140 error(0,0,"do not use -c and -a simultaneously");
2141 return !0;
2143 else if (autostart && !cfg_file) {
2144 me = auto_config();
2145 if (!me) {
2146 /* error printed */
2147 return !0;
2150 else if (!autostart && cfg_file) {
2151 me = parse_config(cfg_file);
2152 if (!me) {
2153 error(0,0,"could not parse %s",cfg_file);
2154 return !0;
2157 else {
2158 error(0,0,"specify at least -c or -a");
2159 usage (EXIT_FAILURE);
2162 sem_init(&the_sem,0,0);
2164 if (verbose) {
2165 provider_t *main_prov = get_main_provider();
2166 printf("primary store type is %s\n",main_prov->type);
2167 if (master_host) {
2168 printf("operating as slave to %s:%u\n",
2169 master_host, master_port);
2171 printf("db is at %s:%u\n",db_host,db_port);
2172 printf("will listen on port %u\n",my_port);
2173 printf("my location is \"%s\"\n",me);
2174 if (fflush(stdout) || ferror(stdout))
2175 error(EXIT_FAILURE, 0, "write failed");
2178 backend_init();
2179 meta_init();
2180 repl_init();
2183 * Gotcha: if we don't set the connection memory limit explicitly,
2184 * the per-connection buffer for MHD will be smaller than that used
2185 * by CURL, so proxy_writefunc will never be able to do its job.
2187 the_daemon = MHD_start_daemon(MY_MHD_FLAGS,
2188 my_port, NULL, NULL, &access_handler, &the_sem,
2189 MHD_OPTION_CONNECTION_MEMORY_LIMIT, (size_t)1048576,
2190 MHD_OPTION_END);
2191 if (!the_daemon) {
2192 fprintf(stderr,"Could not create daemon.\n");
2193 return !0;
2196 sem_wait(&the_sem);
2197 return 0;