qlexer.l: rewrite not to need static var, at_eof.
[iwhd.git] / rest.c
blobd09e7c15f7c73ffe1b52a58fe062e205b0fd2ade
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <error.h>
19 #include <fcntl.h>
20 #include <getopt.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <strings.h>
29 #include <unistd.h>
30 #include <sys/stat.h>
31 #include <assert.h>
33 #include <microhttpd.h>
34 #include <hstor.h> /* only for ARRAY_SIZE at this point */
35 #include <curl/curl.h>
36 #include <glib.h>
38 #include "iwh.h"
39 #include "meta.h"
40 #include "backend.h"
41 #include "setup.h"
42 #include "replica.h"
43 #include "template.h"
44 #include "mpipe.h"
45 #include "state_defs.h"
47 /* Define-away for now. Eventually, define to gettext. */
48 #define _(msgid) (msgid)
50 #if defined(DEBUG)
51 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG
52 //#define MY_MHD_FLAGS MHD_USE_SELECT_INTERNALLY | MHD_USE_DEBUG
53 #else
54 #define MY_MHD_FLAGS MHD_USE_THREAD_PER_CONNECTION
55 #endif
57 typedef enum {
58 URL_ROOT=0, URL_BUCKET, URL_OBJECT, URL_ATTR, URL_INVAL,
59 URL_QUERY, URL_PROVLIST
60 } url_type;
62 typedef struct {
63 char *method;
64 url_type utype;
65 MHD_AccessHandlerCallback handler;
66 } rule;
68 static unsigned short my_port = MY_PORT;
69 const char *program_name;
70 char *cfg_file = NULL;
72 static char *(reserved_name[]) = { "_default", "_query", "_new", NULL };
73 static char *(reserved_attr[]) = { "bucket", "key", "date", "etag", "loc", NULL };
75 void
76 free_ms (my_state *ms)
78 if (!g_atomic_int_dec_and_test(&ms->refcnt)) {
79 return;
82 if (ms->cleanup & CLEANUP_BUF_PTR) {
83 free(ms->pipe.data_ptr);
86 if (ms->cleanup & CLEANUP_POST) {
87 MHD_destroy_post_processor(ms->post);
90 if (ms->cleanup & CLEANUP_DICT) {
91 g_hash_table_destroy(ms->dict);
94 if (ms->cleanup & CLEANUP_QUERY) {
95 meta_query_stop(ms->query);
98 if (ms->cleanup & CLEANUP_TMPL) {
99 free(ms->gen_ctx);
102 if (ms->cleanup & CLEANUP_URL) {
103 free(ms->url);
106 if (ms->cleanup & CLEANUP_AQUERY) {
107 meta_attr_stop(ms->aquery);
110 free(ms);
113 static int
114 validate_put (struct MHD_Connection *conn)
116 const char *mhdr;
118 if (!master_host) {
119 /* We're not a slave, so we don't care. */
120 return 1;
123 mhdr = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
124 "X-redhat-role");
125 return (mhdr && !strcmp(mhdr,"master"));
128 static int
129 is_reserved (const char *cand, char **resv_list)
131 int i;
133 for (i = 0; resv_list[i]; ++i) {
134 if (!strcmp(cand,resv_list[i])) {
135 return TRUE;
139 return FALSE;
142 static int
143 validate_url (const char *url)
145 char *slash = rindex(url,'/');
147 if (!slash) {
148 /* There should be at least one betwixt bucket and key. */
149 return 0;
152 return !is_reserved(slash+1,reserved_name);
155 /**********
156 * The proxy has MHD on one side and CURL on the other. The CURL side is
157 * always run in a child thread. Yes, there are both context switches
158 * and copies between the threads. Get over it. The focus here is on
159 * supporting multi-way replication on PUT, with minimal complexity. These
160 * were the two easiest libraries to use, and they both want to allocate
161 * their own buffers so we're kind of stuck with the copies unless we want
162 * to buffer whole files in memory (which would have to be virtual since
163 * they're potentialy bigger than physical) or explicitly ping them through
164 * a local filesystem. We could potentially take over scheduling from one
165 * or both to avoid some of the context switching, but those interfaces are
166 * significantly more error-prone and (at least in CURL's case) buggy.
168 * For a GET, the CURL child acts as a producer while the MHD parent acts
169 * as consumer. For a PUT, the MHD parent is the producer and the CURL
170 * child is the consumer. For GET the MHD component is invoked via a
171 * callback set up in the access handler; for PUT it's invoked via repeated
172 * calls to the access handler itself. Either way, the producer's job is
173 * to post its pointer+length to the my_state structure and then wait for
174 * all consumers to check back in saying they've finished it. This might
175 * involve multiple passes through each consumer for one pass through the
176 * single producer. When the producer is done, it does a similar handshake
177 * with the consumers. Each consumer has its own pipe_private structure,
178 * containing a pointer to the shared my_state plus a per-consumer offset
179 * into the current chunk.
181 * Attribute functions don't use CURL, so they do much simpler in-memory
182 * buffering. Queries also don't use CURL, but the MHD POST interface
183 * introduces some of its own complexity so see below for that.
184 **********/
186 static void
187 simple_closer (void *ctx)
189 my_state *ms = ctx;
191 DPRINTF("%s: cleaning up\n",__func__);
192 free_ms(ms);
195 static void
196 child_closer (void * ctx)
198 pipe_private *pp = ctx;
200 DPRINTF("in %s\n",__func__);
202 free(pp);
205 /* Invoked from MHD. */
206 static int
207 proxy_get_cons (void *ctx, uint64_t pos, char *buf, int max)
209 pipe_private *pp = ctx;
210 pipe_shared *ps = pp->shared;
211 my_state *ms = ps->owner;
212 int done;
213 void *child_res;
215 (void)pos;
217 DPRINTF("consumer asked to read %d\n",max);
219 if (pipe_cons_wait(pp)) {
220 DPRINTF("consumer offset %zu into %zu\n",
221 pp->offset, ps->data_len);
222 done = ps->data_len - pp->offset;
223 if (done > max) {
224 done = max;
226 memcpy(buf,ps->data_ptr+pp->offset,done);
227 pp->offset += done;
228 DPRINTF("consumer copied %d, new offset %zu\n",
229 done, pp->offset);
230 if (pp->offset == ps->data_len) {
231 DPRINTF("consumer finished chunk\n");
232 pipe_cons_signal(pp, 0);
235 else {
236 done = -1;
239 if (done == (-1)) {
240 child_res = NULL;
241 pthread_join(ms->backend_th,&child_res);
242 if (child_res == THREAD_FAILED) {
243 DPRINTF("GET producer failed\n");
244 /* Nothing we can do; already sent status. */
246 if (ms->from_master) {
247 pthread_join(ms->cache_th,NULL);
248 /* TBD: do something about cache failure? */
250 free_ms(ms);
253 return done;
256 static int
257 proxy_get_data (void *cctx, struct MHD_Connection *conn, const char *url,
258 const char *method, const char *version, const char *data,
259 size_t *data_size, void **rctx)
261 struct MHD_Response *resp;
262 my_state *ms = *rctx;
263 pipe_private *pp;
264 pipe_private *pp2;
265 char *my_etag;
266 const char *user_etag;
267 int rc;
269 (void)cctx;
270 (void)method;
271 (void)version;
272 (void)data;
273 (void)data_size;
275 DPRINTF("PROXY GET DATA %s\n",url);
277 ms->url = strdup(url);
278 if (!ms->url) {
279 return MHD_NO;
281 ms->cleanup |= CLEANUP_URL;
283 my_etag = meta_has_copy(ms->bucket,ms->key,me);
284 if (!my_etag) {
285 DPRINTF("falling back to local for %s/%s\n",ms->bucket,ms->key);
286 ms->from_master = 0;
288 else if (*my_etag) {
289 user_etag = MHD_lookup_connection_value(
290 conn, MHD_HEADER_KIND, "If-None-Match");
291 if (user_etag && !strcmp(user_etag,my_etag)) {
292 DPRINTF("ETag match!\n");
293 free(my_etag);
294 resp = MHD_create_response_from_data(0,NULL,
295 MHD_NO,MHD_NO);
296 MHD_queue_response(conn,MHD_HTTP_NOT_MODIFIED,resp);
297 MHD_destroy_response(resp);
298 return MHD_YES;
300 free(my_etag);
301 ms->from_master = 0;
303 else {
304 DPRINTF("%s/%s not found locally\n",ms->bucket,ms->key);
305 if (!master_host) {
306 DPRINTF(" that means it doesn't exist\n");
307 resp = MHD_create_response_from_data(0,NULL,
308 MHD_NO,MHD_NO);
309 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
310 MHD_destroy_response(resp);
311 free_ms(ms);
312 return MHD_YES;
314 DPRINTF(" will fetch from %s:%u\n", master_host,master_port);
315 ms->from_master = 1;
318 pipe_init_shared(&ms->pipe,ms,ms->from_master+1);
319 pp = pipe_init_private(&ms->pipe);
320 if (!pp) {
321 return MHD_NO;
323 ms->thunk.parent = ms;
324 ms->thunk.prov = ms->from_master ? master_prov : main_prov;
325 pthread_create(&ms->backend_th,NULL,
326 ms->thunk.prov->func_tbl->get_child_func,&ms->thunk);
327 /* TBD: check return value */
329 if (ms->from_master) {
330 pp2 = pipe_init_private(&ms->pipe);
331 if (!pp2) {
332 return MHD_NO;
334 pp2->prov = main_prov;
335 pthread_create(&ms->cache_th,NULL,
336 main_prov->func_tbl->cache_child_func,pp2);
337 /* TBD: check return value */
339 else {
340 pp2 = NULL;
343 rc = pipe_cons_wait_init(&ms->pipe);
344 ms->rc = (rc == 0) ? MHD_HTTP_OK : MHD_HTTP_INTERNAL_SERVER_ERROR;
346 resp = MHD_create_response_from_callback(
347 MHD_SIZE_UNKNOWN, 65536, proxy_get_cons, pp, child_closer);
348 if (!resp) {
349 fprintf(stderr,"MHD_crfc failed\n");
350 if (pp2) {
351 /* TBD: terminate thread */
352 free(pp2);
354 child_closer(pp);
355 return MHD_NO;
357 MHD_queue_response(conn,ms->rc,resp);
358 MHD_destroy_response(resp);
360 return MHD_YES;
363 static void
364 recheck_replication (my_state * ms, char *policy)
366 int rc;
367 int free_it = FALSE;
368 char fixed[MAX_FIELD_LEN];
370 if (is_reserved(ms->key,reserved_name)) {
371 DPRINTF("declining to replicate reserved object %s\n",ms->key);
372 return;
375 if (!policy && ms->dict) {
376 DPRINTF("using new policy for %s/%s\n",ms->bucket,ms->key);
377 policy = g_hash_table_lookup(ms->dict,"_policy");
380 if (!policy) {
381 /* If we get a policy here or below, we have to free it. */
382 free_it = TRUE;
383 DPRINTF("fetching policy for %s/%s\n",ms->bucket,ms->key);
384 rc = meta_get_value(ms->bucket,ms->key, "_policy", &policy);
387 if (!policy) {
388 DPRINTF(" inheriting policy from %s\n",ms->bucket);
389 rc = meta_get_value(ms->bucket,
390 "_default", "_policy", &policy);
393 if (policy) {
394 DPRINTF(" implementing policy %s\n",policy);
396 * Can't use ms->url here because it might be a bucket POST
397 * and in that case ms->url points to the bucket.
399 snprintf(fixed,sizeof(fixed),"%s/%s",ms->bucket,ms->key);
400 replicate(fixed,0,policy,ms);
401 if (free_it) {
402 free(policy);
405 else {
406 DPRINTF(" could not find a policy anywhere!\n");
410 static int
411 proxy_put_data (void *cctx, struct MHD_Connection *conn, const char *url,
412 const char *method, const char *version, const char *data,
413 size_t *data_size, void **rctx)
415 struct MHD_Response *resp;
416 my_state *ms = *rctx;
417 pipe_private *pp;
418 int rc;
419 char *etag = NULL;
420 void *child_res;
422 (void)cctx;
423 (void)method;
424 (void)version;
426 DPRINTF("PROXY PUT DATA %s (%zu)\n",url,*data_size);
428 if (ms->state == MS_NEW) {
429 if (!validate_put(conn) || !validate_url(url)) {
430 DPRINTF("rejecting %s\n",url);
431 resp = MHD_create_response_from_data(0,NULL,
432 MHD_NO,MHD_NO);
433 if (!resp) {
434 return MHD_NO;
436 MHD_queue_response(conn,MHD_HTTP_FORBIDDEN,resp);
437 MHD_destroy_response(resp);
438 return MHD_YES;
440 ms->state = MS_NORMAL;
441 ms->url = strdup(url);
442 if (!ms->url) {
443 return MHD_NO;
445 ms->cleanup |= CLEANUP_URL;
446 ms->size = 0;
447 pipe_init_shared(&ms->pipe,ms,1);
448 pp = pipe_init_private(&ms->pipe);
449 if (!pp) {
450 return MHD_NO;
452 pp->prov = main_prov;
453 ms->be_flags = BACKEND_GET_SIZE;
454 pthread_create(&ms->backend_th,NULL,
455 main_prov->func_tbl->put_child_func,pp);
456 /* TBD: check return value */
459 * Do the initial handshake with children. If we return from
460 * this callback without an error response, Microhttpd posts
461 * the "100 Continue" header and the client starts sending
462 * the data. We must report errors here or forever keep
463 * out peace.
465 rc = pipe_prod_wait_init(&ms->pipe);
466 if (rc < 0) {
467 DPRINTF("producer wait failed\n");
468 resp = MHD_create_response_from_data(0,NULL,
469 MHD_NO,MHD_NO);
470 if (!resp) {
471 return MHD_NO;
473 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
474 resp);
475 MHD_destroy_response(resp);
476 } else if (rc > 0) {
478 * Note that we fail here even if 1 of N replicas fail.
479 * Might want to fix this when we start looping over
480 * pipe_init_private() above.
482 DPRINTF("producer replicas failed (%u of %u)\n",
483 rc, ms->pipe.cons_total);
484 resp = MHD_create_response_from_data(0,NULL,
485 MHD_NO,MHD_NO);
486 if (!resp) {
487 return MHD_NO;
489 MHD_queue_response(conn,MHD_HTTP_INTERNAL_SERVER_ERROR,
490 resp);
491 MHD_destroy_response(resp);
492 } else {
493 DPRINTF("producer proceeding\n");
496 else if (*data_size) {
497 pipe_prod_signal(&ms->pipe,(void *)data,*data_size);
498 ms->size += *data_size;
499 DPRINTF("producer chunk finished\n");
500 *data_size = 0;
502 else {
503 pipe_prod_finish(&ms->pipe);
504 pthread_join(ms->backend_th,&child_res);
505 if (child_res == THREAD_FAILED) {
506 DPRINTF("thread failed\n");
507 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
509 else if (ms->pipe.cons_error == ms->pipe.cons_total) {
510 DPRINTF("all %u consumers failed\n",
511 ms->pipe.cons_error);
512 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
514 else {
515 if (master_host) {
516 meta_got_copy(ms->bucket,ms->key,me);
517 etag = NULL;
519 else {
520 etag = meta_did_put(ms->bucket,ms->key,me,
521 ms->size);
523 DPRINTF("rereplicate (obj PUT)\n");
524 recheck_replication(ms,NULL);
525 rc = MHD_HTTP_OK;
527 free_ms(ms);
528 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
529 if (!resp) {
530 free(etag);
531 return MHD_NO;
533 if (etag) {
534 MHD_add_response_header(resp,"ETag",etag);
535 free(etag);
537 MHD_queue_response(conn,rc,resp);
538 MHD_destroy_response(resp);
541 return MHD_YES;
544 static int
545 proxy_get_attr (void *cctx, struct MHD_Connection *conn, const char *url,
546 const char *method, const char *version, const char *data,
547 size_t *data_size, void **rctx)
549 struct MHD_Response *resp;
550 char *fixed;
551 my_state *ms = *rctx;
552 int rc = MHD_HTTP_NOT_FOUND;
554 (void)cctx;
555 (void)method;
556 (void)version;
557 (void)data;
558 (void)data_size;
560 DPRINTF("PROXY GET ATTR %s\n",url);
562 if (meta_get_value(ms->bucket,ms->key,ms->attr,&fixed) == 0) {
563 resp = MHD_create_response_from_data(strlen(fixed),fixed,
564 MHD_YES,MHD_NO);
565 rc = MHD_HTTP_OK;
567 else {
568 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
570 if (!resp) {
571 return MHD_NO;
573 MHD_queue_response(conn,rc,resp);
574 MHD_destroy_response(resp);
576 free_ms(ms);
577 return MHD_YES;
580 static int
581 proxy_put_attr (void *cctx, struct MHD_Connection *conn, const char *url,
582 const char *method, const char *version, const char *data,
583 size_t *data_size, void **rctx)
585 struct MHD_Response *resp;
586 my_state *ms = *rctx;
587 const char *attrval;
588 int send_resp = 0;
590 (void)cctx;
591 (void)method;
592 (void)version;
594 DPRINTF("PROXY PUT ATTR %s (%zu)\n",url,*data_size);
596 if (ms->state == MS_NEW) {
597 ms->state = MS_NORMAL;
598 ms->url = strdup(url);
599 if (!ms->url) {
600 return MHD_NO;
602 ms->cleanup |= CLEANUP_URL;
603 attrval = MHD_lookup_connection_value(conn,MHD_HEADER_KIND,
604 "X-redhat-value");
605 if (attrval) {
606 meta_set_value(ms->bucket,ms->key,ms->attr,
607 (char *)attrval);
608 send_resp = 1;
611 else if (*data_size) {
612 if (ms->pipe.data_len) {
613 ms->pipe.data_len += *data_size;
614 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
615 if (!p) {
616 return MHD_NO;
618 ms->pipe.data_ptr = p;
620 else {
621 ms->pipe.data_len = *data_size + 1;
622 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
623 if (!ms->pipe.data_ptr) {
624 return MHD_NO;
626 ((char *)ms->pipe.data_ptr)[0] = '\0';
627 ms->cleanup |= CLEANUP_BUF_PTR;
629 (void)strncat(ms->pipe.data_ptr,data,*data_size);
630 /* TBD: check return value */
631 *data_size = 0;
633 else {
634 if (!ms->pipe.data_ptr) {
635 return MHD_NO;
637 if (is_reserved(ms->attr,reserved_attr)) {
638 resp = MHD_create_response_from_data(
639 0,NULL,MHD_NO,MHD_NO);
640 if (!resp) {
641 return MHD_NO;
643 MHD_queue_response(conn,MHD_HTTP_BAD_REQUEST,
644 resp);
645 MHD_destroy_response(resp);
646 free_ms(ms);
647 return MHD_YES;
649 meta_set_value(ms->bucket,ms->key,ms->attr,ms->pipe.data_ptr);
651 * We should always re-replicate, because the replication
652 * policy might refer to this attr.
654 DPRINTF("rereplicate (attr PUT)\n");
655 recheck_replication(ms,NULL);
656 free_ms(ms);
657 send_resp = 1;
660 if (send_resp) {
661 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
662 if (!resp) {
663 return MHD_NO;
665 MHD_queue_response(conn,MHD_HTTP_CREATED,resp);
666 MHD_destroy_response(resp);
668 * TBD: check if the attribute was a replication policy, and
669 * start/stop replication activities as appropriate.
673 return MHD_YES;
676 /**********
677 * For queries, we have to deal with MHD's post-iterator interface (not
678 * quite the same as the data-iteration even though we use it that way) on
679 * one side, and a query-iterator interface on the other. Data on both
680 * sides could be quite large, so we can't just stick them in header lines.
681 * We do still buffer the query in memory, though. Once that's done, we do
682 * very simple parsing - it will be more complicated later - and create the
683 * query iterator. That's also driven by MHD, this time though the
684 * content-callback interface, and repeatedly calls in to the metadata
685 * module to fetch one object name at a time.
686 **********/
688 static int
689 query_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
690 const char *filename, const char *content_type,
691 const char *transfer_encoding, const char *data,
692 uint64_t off, size_t size)
694 (void)ctx;
695 (void)kind;
696 (void)key;
697 (void)filename;
698 (void)content_type;
699 (void)transfer_encoding;
700 (void)data;
701 (void)off;
702 (void)size;
704 /* We actually accumulate the data in proxy_query. */
705 return MHD_YES;
708 /* MHD reader function during queries. Return -1 for EOF. */
709 static int
710 proxy_query_func (void *ctx, uint64_t pos, char *buf, int max)
712 my_state *ms = ctx;
713 int len;
714 const char *accept_hdr;
715 char *bucket;
716 char *key;
718 (void)pos;
720 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
721 "Accept");
723 if (!ms->gen_ctx) {
724 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
725 if (!ms->gen_ctx) {
726 return -1;
728 ms->cleanup |= CLEANUP_TMPL;
729 len = tmpl_list_header(ms->gen_ctx);
730 if (!len) {
731 return -1;
733 if (len > max) {
734 len = max;
736 memcpy(buf,ms->gen_ctx->buf,len);
737 return len;
740 if (ms->gen_ctx == TMPL_CTX_DONE) {
741 return -1;
744 for(;;) {
745 if (!meta_query_next(ms->query,&bucket,&key)) {
746 break;
748 if (is_reserved(key,reserved_name)) {
749 continue;
751 len = tmpl_list_entry(ms->gen_ctx,bucket,key);
752 if (!len) {
753 return -1;
755 if (len > max) {
756 len = max;
758 memcpy(buf,ms->gen_ctx->buf,len);
759 return len;
762 len = tmpl_list_footer(ms->gen_ctx);
763 if (!len) {
764 return -1;
766 if (len > max) {
767 len = max;
769 memcpy(buf,ms->gen_ctx->buf,len);
770 free(ms->gen_ctx);
771 ms->cleanup &= ~CLEANUP_TMPL;
772 ms->gen_ctx = TMPL_CTX_DONE;
773 return len;
776 static int
777 proxy_query (void *cctx, struct MHD_Connection *conn, const char *url,
778 const char *method, const char *version, const char *data,
779 size_t *data_size, void **rctx)
781 struct MHD_Response *resp;
782 my_state *ms = *rctx;
784 (void)cctx;
785 (void)method;
786 (void)version;
788 DPRINTF("PROXY QUERY %s (%zu)\n",url,*data_size);
790 if (ms->state == MS_NEW) {
791 ms->state = MS_NORMAL;
792 ms->post = MHD_create_post_processor(conn,4096,
793 query_iterator,ms);
794 ms->cleanup |= CLEANUP_POST;
796 else if (*data_size) {
797 MHD_post_process(ms->post,data,*data_size);
798 if (ms->pipe.data_len) {
799 ms->pipe.data_len += *data_size;
800 char *p = realloc(ms->pipe.data_ptr,ms->pipe.data_len);
801 if (!p) {
802 return MHD_NO;
804 ms->pipe.data_ptr = p;
806 else {
807 ms->pipe.data_len = *data_size + 1;
808 ms->pipe.data_ptr = malloc(ms->pipe.data_len);
809 if (!ms->pipe.data_ptr) {
810 return MHD_NO;
812 ((char *)ms->pipe.data_ptr)[0] = '\0';
813 ms->cleanup |= CLEANUP_BUF_PTR;
815 (void)strncat(ms->pipe.data_ptr,data,*data_size);
816 /* TBD: check return value */
817 *data_size = 0;
819 else {
820 if (!ms->pipe.data_ptr) {
821 return MHD_NO;
823 ms->query = meta_query_new(ms->bucket,NULL,ms->pipe.data_ptr);
824 ms->cleanup |= CLEANUP_QUERY;
825 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
826 65536, proxy_query_func, ms, simple_closer);
827 if (!resp) {
828 fprintf(stderr,"MHD_crfc failed\n");
829 simple_closer(ms);
830 return MHD_NO;
832 MHD_queue_response(conn,MHD_HTTP_OK,resp);
833 MHD_destroy_response(resp);
834 free_ms(ms);
837 return MHD_YES;
840 static int
841 proxy_list_objs (void *cctx, struct MHD_Connection *conn, const char *url,
842 const char *method, const char *version, const char *data,
843 size_t *data_size, void **rctx)
845 my_state *ms = *rctx;
846 struct MHD_Response *resp;
848 (void)cctx;
849 (void)url;
850 (void)method;
851 (void)version;
852 (void)data;
853 (void)data_size;
855 ms->query = meta_query_new((char *)ms->bucket,NULL,NULL);
856 ms->cleanup |= CLEANUP_QUERY;
858 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
859 65536, proxy_query_func, ms, simple_closer);
860 if (!resp) {
861 fprintf(stderr,"MHD_crfc failed\n");
862 simple_closer(ms);
863 return MHD_NO;
866 MHD_queue_response(conn,MHD_HTTP_OK,resp);
867 MHD_destroy_response(resp);
868 return MHD_YES;
871 static int
872 proxy_delete (void *cctx, struct MHD_Connection *conn, const char *url,
873 const char *method, const char *version, const char *data,
874 size_t *data_size, void **rctx)
876 my_state *ms = *rctx;
877 struct MHD_Response *resp;
878 char *copied_url;
879 char *bucket;
880 char *key;
881 char *stctx = NULL;
882 int rc;
884 (void)cctx;
885 (void)method;
886 (void)version;
887 (void)data;
888 (void)data_size;
890 DPRINTF("PROXY DELETE %s\n",url);
892 ms->thunk.parent = ms;
893 ms->thunk.prov = main_prov;
894 rc = ms->thunk.prov->func_tbl->delete_func(main_prov,
895 ms->bucket,ms->key,url);
896 if (rc == MHD_HTTP_OK) {
897 copied_url = strdup(url);
898 assert (copied_url);
899 bucket = strtok_r(copied_url,"/",&stctx);
900 key = strtok_r(NULL,"/",&stctx);
901 meta_delete(bucket,key);
902 free(copied_url);
903 replicate_delete(url,ms);
906 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
907 if (!resp) {
908 return MHD_NO;
910 MHD_queue_response(conn,rc,resp);
911 MHD_destroy_response(resp);
913 free_ms(ms);
914 return MHD_YES;
917 /* TBD: get actual bucket list */
918 typedef struct {
919 char *rel;
920 char *link;
921 } fake_bucket_t;
923 static const fake_bucket_t fake_bucket_list[] = {
924 { "bucket_factory", "_new" },
925 { "provider_list", "_providers" },
928 static int
929 root_blob_generator (void *ctx, uint64_t pos, char *buf, int max)
931 my_state *ms = ctx;
932 const fake_bucket_t *fb;
933 int len;
934 const char *accept_hdr;
935 const char *host;
936 char *bucket;
937 char *key;
939 (void)pos;
941 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
942 "Accept");
943 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
945 if (!ms->gen_ctx) {
946 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
947 if (!ms->gen_ctx) {
948 return -1;
950 ms->cleanup |= CLEANUP_TMPL;
951 ms->gen_ctx->base = host;
952 len = tmpl_root_header(ms->gen_ctx,"image_warehouse","1.0");
953 if (!len) {
954 return -1;
956 if (len > max) {
957 len = max;
959 memcpy(buf,ms->gen_ctx->buf,len);
960 return len;
963 if (ms->gen_ctx == TMPL_CTX_DONE) {
964 return -1;
967 if (ms->gen_ctx->index < ARRAY_SIZE(fake_bucket_list)) {
968 fb = fake_bucket_list + ms->gen_ctx->index;
969 len = tmpl_root_entry(ms->gen_ctx,fb->rel,fb->link);
970 if (!len) {
971 return -1;
973 if (len > max) {
974 len = max;
976 memcpy(buf,ms->gen_ctx->buf,len);
977 return len;
980 if (meta_query_next(ms->query,&bucket,&key)) {
981 len = tmpl_root_entry(ms->gen_ctx,"bucket",bucket);
982 if (!len) {
983 return -1;
985 if (len > max) {
986 len = max;
988 memcpy(buf,ms->gen_ctx->buf,len);
989 return len;
992 len = tmpl_root_footer(ms->gen_ctx);
993 if (!len) {
994 return -1;
996 if (len > max) {
997 len = max;
999 memcpy(buf,ms->gen_ctx->buf,len);
1000 free(ms->gen_ctx);
1001 ms->cleanup &= ~CLEANUP_TMPL;
1002 ms->gen_ctx = TMPL_CTX_DONE;
1003 return len;
1006 static int
1007 proxy_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1008 const char *method, const char *version, const char *data,
1009 size_t *data_size, void **rctx)
1011 struct MHD_Response *resp = NULL;
1012 unsigned int rc = MHD_HTTP_OK;
1013 my_state *ms = *rctx;
1015 (void)cctx;
1016 (void)method;
1017 (void)version;
1018 (void)data;
1020 DPRINTF("PROXY API ROOT (%s, %zu)\n",url,*data_size);
1022 ms->query = meta_query_new(NULL,"_default",NULL);
1023 if (!ms->query) {
1024 free_ms(ms);
1025 return MHD_NO;
1027 ms->cleanup |= CLEANUP_QUERY;
1029 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1030 65536, root_blob_generator, ms, simple_closer);
1031 if (!resp) {
1032 return MHD_NO;
1034 MHD_queue_response(conn,rc,resp);
1035 MHD_destroy_response(resp);
1037 return MHD_YES;
1041 static int
1042 post_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1043 const char *filename, const char *content_type,
1044 const char *transfer_encoding, const char *data,
1045 uint64_t off, size_t size)
1047 char *old_val;
1048 size_t old_len;
1049 char *new_val;
1051 (void)kind;
1052 (void)filename;
1053 (void)content_type;
1054 (void)transfer_encoding;
1055 (void)off;
1057 printf("adding %s, size=%zu\n",key,size);
1059 // TBD: don't assume that values are null-terminated strings
1060 old_val = g_hash_table_lookup(ctx,key);
1061 if (old_val) {
1062 old_len = strlen(old_val);
1063 new_val = malloc(old_len+size+1);
1064 if (!new_val) {
1065 return MHD_NO;
1067 memcpy(new_val,old_val,old_len);
1068 memcpy(new_val+old_len,data,size);
1069 new_val[old_len+size] = '\0';
1071 else {
1072 new_val = malloc(size+1);
1073 if (!new_val) {
1074 return MHD_NO;
1076 memcpy(new_val,data,size);
1077 new_val[size] = '\0';
1080 g_hash_table_insert(ctx,strdup(key),new_val);
1081 /* TBD: check return value for strdups (none avail for insert) */
1082 return MHD_YES;
1085 /* Returns TRUE if we found an *invalid* key. */
1086 static gboolean
1087 post_find (gpointer key, gpointer value, gpointer ctx)
1089 (void)value;
1090 (void)ctx;
1092 if (!is_reserved(key,reserved_attr)) {
1093 return FALSE;
1096 DPRINTF("bad attr %s\n", (char *)key);
1097 return TRUE;
1100 static void
1101 post_foreach (gpointer key, gpointer value, gpointer ctx)
1103 my_state *ms = ctx;
1105 DPRINTF("setting %s = %s for %s/%s\n",(char *)key, (char *)value,
1106 ms->bucket,ms->key);
1107 meta_set_value(ms->bucket,ms->key,key,value);
1110 static int
1111 create_bucket (char *name, my_state *ms)
1113 int rc;
1115 if (is_reserved(name,reserved_name)) {
1116 return MHD_HTTP_BAD_REQUEST;
1119 rc = main_prov->func_tbl->bcreate_func(main_prov,name);
1120 if (rc == MHD_HTTP_OK) {
1121 if (meta_set_value(name,"_default", "_policy","0") != 0) {
1122 DPRINTF("default-policy " "create failed\n");
1123 /* Non-fatal. */
1125 DPRINTF("created bucket %s\n",name);
1127 * There's not a whole lot to do about bucket-creation
1128 * failures on replicas, other than to report them, unless
1129 * we adopt an "all or nothing" approach and unwind the
1130 * create on the primary as well. Then what if that fails?
1131 * It's just one example of the general "fewer replicas
1132 * than desired" distributed-system problem, not worth a
1133 * point solution here/now. Revisit when we have a more
1134 * general replica-repair policy/system in place.
1136 replicate_bcreate(name,ms);
1139 return rc;
1142 static int
1143 control_api_root (void *cctx, struct MHD_Connection *conn, const char *url,
1144 const char *method, const char *version, const char *data,
1145 size_t *data_size, void **rctx)
1147 struct MHD_Response *resp;
1148 my_state *ms = *rctx;
1149 int rc;
1150 char *op;
1151 char buf[80];
1152 int len;
1154 (void)cctx;
1155 (void)method;
1156 (void)version;
1158 DPRINTF("ROOT POST (%s, %zu)\n",url,*data_size);
1160 if (ms->state == MS_NEW) {
1161 ms->state = MS_NORMAL;
1162 ms->url = (char *)url;
1163 ms->dict = g_hash_table_new_full(
1164 g_str_hash,g_str_equal,free,free);
1165 ms->cleanup |= CLEANUP_DICT;
1166 ms->post = MHD_create_post_processor(conn,4096,
1167 post_iterator,ms->dict);
1168 ms->cleanup |= CLEANUP_POST;
1169 return MHD_YES;
1172 if (*data_size) {
1173 MHD_post_process(ms->post,data,*data_size);
1174 *data_size = 0;
1175 return MHD_YES;
1178 rc = MHD_HTTP_BAD_REQUEST;
1180 op = g_hash_table_lookup(ms->dict,"op");
1181 if (op) {
1182 if (!strcmp(op,"rep_status")) {
1183 len = snprintf(buf,sizeof(buf),"%d requests\n",
1184 get_rep_count());
1185 rc = MHD_HTTP_OK;
1187 else {
1188 len = snprintf(buf,sizeof(buf),"unknown op");
1191 else {
1192 len = snprintf(buf,sizeof(buf),"missing op");
1195 if (len >= (int)sizeof(buf)) {
1196 len = 0;
1197 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
1200 /* NB The last arg tells MHD to copy the arg and free it later. */
1201 resp = MHD_create_response_from_data(len,buf,MHD_NO,MHD_YES);
1202 if (!resp) {
1203 return MHD_NO;
1205 MHD_queue_response(conn,rc,resp);
1206 MHD_destroy_response(resp);
1208 free_ms(ms);
1209 return MHD_YES;
1212 static int
1213 proxy_bucket_post (void *cctx, struct MHD_Connection *conn, const char *url,
1214 const char *method, const char *version, const char *data,
1215 size_t *data_size, void **rctx)
1217 struct MHD_Response *resp;
1218 my_state *ms = *rctx;
1219 int rc;
1220 char *key;
1222 (void)cctx;
1223 (void)method;
1224 (void)version;
1226 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1228 if (ms->state == MS_NEW) {
1229 ms->state = MS_NORMAL;
1230 ms->url = (char *)url;
1231 ms->dict = g_hash_table_new_full(
1232 g_str_hash,g_str_equal,free,free);
1233 ms->cleanup |= CLEANUP_DICT;
1234 ms->post = MHD_create_post_processor(conn,4096,
1235 post_iterator,ms->dict);
1236 ms->cleanup |= CLEANUP_POST;
1238 else if (*data_size) {
1239 MHD_post_process(ms->post,data,*data_size);
1240 *data_size = 0;
1242 else {
1243 rc = MHD_HTTP_BAD_REQUEST;
1244 key = g_hash_table_lookup(ms->dict,"key");
1245 if (key) {
1246 strncpy(ms->key,key,MAX_FIELD_LEN-1);
1247 g_hash_table_remove(ms->dict,"key");
1248 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1249 g_hash_table_foreach(ms->dict,post_foreach,ms);
1250 DPRINTF("rereplicate (bucket POST)\n");
1251 recheck_replication(ms,NULL);
1252 rc = MHD_HTTP_OK;
1255 else if (!strcmp(ms->bucket,"_new")) {
1256 key = g_hash_table_lookup(ms->dict,"name");
1257 if (key != NULL) {
1258 rc = create_bucket(key,ms);
1261 else {
1262 DPRINTF("A parameter is MISSING (fail)\n");
1264 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1265 if (!resp) {
1266 fprintf(stderr,"MHD_crfd failed\n");
1267 return MHD_NO;
1269 MHD_queue_response(conn,rc,resp);
1270 MHD_destroy_response(resp);
1271 free_ms(ms);
1274 return MHD_YES;
1277 static int
1278 check_location (my_state *ms)
1280 char *loc = g_hash_table_lookup(ms->dict,"depot");
1282 if (!loc) {
1283 DPRINTF("missing loc on check for %s/%s\n",ms->bucket,ms->key);
1284 return MHD_HTTP_BAD_REQUEST;
1287 if (!meta_has_copy(ms->bucket,ms->key,loc)) {
1288 DPRINTF("did not find %s/%s at %s\n",ms->bucket,ms->key,loc);
1289 return MHD_HTTP_NOT_FOUND;
1292 /* TBD: meta_has_copy returns an etag which we should check */
1293 DPRINTF("found %s/%s at %s\n",ms->bucket,ms->key,loc);
1294 return MHD_HTTP_OK;
1297 static int
1298 register_image (my_state *ms)
1300 const char *site;
1301 const provider_t *prov;
1302 char *next;
1304 site = g_hash_table_lookup(ms->dict,"site");
1305 if (!site) {
1306 printf("site MISSING\n");
1307 return MHD_HTTP_BAD_REQUEST;
1310 next = index(site,':');
1311 if (next) {
1312 *(next++) = '\0';
1315 prov = get_provider(site);
1316 if (!prov) {
1317 DPRINTF("site %s not found\n",site);
1318 return MHD_HTTP_BAD_REQUEST;
1321 return prov->func_tbl->register_func(ms,prov,next,ms->dict);
1325 static int
1326 parts_callback (void *ctx, uint64_t pos, char *buf, int max)
1328 my_state *ms = ctx;
1329 int len;
1330 const char *accept_hdr;
1331 const char *name;
1332 const char *value;
1333 const char *host;
1335 (void)pos;
1337 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1338 "Accept");
1339 host = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,"Host");
1341 if (!ms->gen_ctx) {
1342 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1343 if (!ms->gen_ctx) {
1344 return -1;
1346 ms->cleanup |= CLEANUP_TMPL;
1347 ms->gen_ctx->base = host;
1348 len = tmpl_obj_header(ms->gen_ctx,ms->bucket,ms->key);
1349 if (!len) {
1350 return -1;
1352 if (len > max) {
1353 len = max;
1355 memcpy(buf,ms->gen_ctx->buf,len);
1356 return len;
1359 if (ms->gen_ctx == TMPL_CTX_DONE) {
1360 return -1;
1364 // Set up and use query for what attributes exist.
1365 for(;;) {
1366 if (!meta_attr_next(ms->aquery,&name,&value)) {
1367 break;
1369 if (is_reserved(name,reserved_attr)) {
1370 continue;
1372 len = tmpl_obj_entry(ms->gen_ctx,ms->bucket,ms->key,name);
1373 if (!len) {
1374 return -1;
1376 if (len > max) {
1377 len = max;
1379 memcpy(buf,ms->gen_ctx->buf,len);
1380 return len;
1383 len = tmpl_obj_footer(ms->gen_ctx);
1384 if (!len) {
1385 return -1;
1387 if (len > max) {
1388 len = max;
1390 memcpy(buf,ms->gen_ctx->buf,len);
1391 free(ms->gen_ctx);
1392 ms->cleanup &= ~CLEANUP_TMPL;
1393 ms->gen_ctx = TMPL_CTX_DONE;
1394 return len;
1397 static int
1398 show_parts (struct MHD_Connection *conn, my_state *ms)
1400 struct MHD_Response *resp;
1402 ms->aquery = meta_get_attrs(ms->bucket,ms->key);
1403 if (!ms->aquery) {
1404 return MHD_HTTP_NOT_FOUND;
1406 ms->cleanup |= CLEANUP_AQUERY;
1408 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1409 65536, parts_callback, ms, simple_closer);
1410 if (!resp) {
1411 fprintf(stderr,"MHD_crfc failed\n");
1412 simple_closer(ms);
1413 return MHD_NO;
1415 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1416 MHD_destroy_response(resp);
1417 return MHD_HTTP_PROCESSING;
1420 static int
1421 proxy_object_post (void *cctx, struct MHD_Connection *conn, const char *url,
1422 const char *method, const char *version, const char *data,
1423 size_t *data_size, void **rctx)
1425 struct MHD_Response *resp;
1426 my_state *ms = *rctx;
1427 int rc;
1428 char *op;
1430 (void)cctx;
1431 (void)method;
1432 (void)version;
1434 DPRINTF("PROXY POST (%s, %zu)\n",url,*data_size);
1436 if (ms->state == MS_NEW) {
1437 ms->state = MS_NORMAL;
1438 ms->url = (char *)url;
1439 ms->dict = g_hash_table_new_full(
1440 g_str_hash,g_str_equal,free,free);
1441 ms->cleanup |= CLEANUP_DICT;
1442 ms->post = MHD_create_post_processor(conn,4096,
1443 post_iterator,ms->dict);
1444 ms->cleanup |= CLEANUP_POST;
1446 else if (*data_size) {
1447 MHD_post_process(ms->post,data,*data_size);
1448 *data_size = 0;
1450 else {
1451 rc = MHD_HTTP_BAD_REQUEST;
1452 if (!g_hash_table_find(ms->dict,post_find,ms)) {
1453 op = g_hash_table_lookup(ms->dict,"op");
1454 if (op) {
1455 if (!strcmp(op,"push")) {
1456 DPRINTF("rereplicate (obj POST)\n");
1457 recheck_replication(ms,NULL);
1458 rc = MHD_HTTP_OK;
1460 else if (!strcmp(op,"check")) {
1461 rc = check_location(ms);
1463 else if (!strcmp(op,"register")) {
1464 rc = register_image(ms);
1466 else if (!strcmp(op,"parts")) {
1467 rc = show_parts(conn,ms);
1469 else {
1470 DPRINTF("unknown op %s for %s/%s\n",
1471 op, ms->bucket, ms->key);
1474 else {
1475 DPRINTF("op is MISSING (fail)\n");
1478 if (rc != MHD_HTTP_PROCESSING) {
1480 * MHD_HTTP_PROCESSING is a special response that
1481 * means a request-specific routine (e.g. show_parts)
1482 * created its own response. Therefore we shouldn't.
1484 resp = MHD_create_response_from_data(0,NULL,
1485 MHD_NO,MHD_NO);
1486 if (!resp) {
1487 fprintf(stderr,"MHD_crfd failed\n");
1488 return MHD_NO;
1490 MHD_queue_response(conn,rc,resp);
1491 MHD_destroy_response(resp);
1492 free_ms(ms);
1496 return MHD_YES;
1501 static int
1502 prov_list_generator (void *ctx, uint64_t pos, char *buf, int max)
1504 my_state *ms = ctx;
1505 int len;
1506 gpointer key;
1507 const provider_t *prov;
1508 const char *accept_hdr;
1510 (void)pos;
1512 accept_hdr = MHD_lookup_connection_value(ms->conn,MHD_HEADER_KIND,
1513 "Accept");
1515 if (!ms->gen_ctx) {
1516 ms->gen_ctx = tmpl_get_ctx(accept_hdr);
1517 if (!ms->gen_ctx) {
1518 return -1;
1520 ms->cleanup |= CLEANUP_TMPL;
1521 init_prov_iter(&ms->prov_iter);
1522 len = tmpl_prov_header(ms->gen_ctx);
1523 if (!len) {
1524 return -1;
1526 if (len > max) {
1527 len = max;
1529 memcpy(buf,ms->gen_ctx->buf,len);
1530 return len;
1533 if (ms->gen_ctx == TMPL_CTX_DONE) {
1534 return -1;
1537 if (g_hash_table_iter_next(&ms->prov_iter,&key,(gpointer *)&prov)) {
1538 len = tmpl_prov_entry(ms->gen_ctx,prov->name,prov->type,
1539 prov->host, prov->port, prov->username, prov->password);
1540 if (!len) {
1541 return -1;
1543 if (len > max) {
1544 len = max;
1546 memcpy(buf,ms->gen_ctx->buf,len);
1547 return len;
1550 len = tmpl_prov_footer(ms->gen_ctx);
1551 if (!len) {
1552 return -1;
1554 if (len > max) {
1555 len = max;
1557 memcpy(buf,ms->gen_ctx->buf,len);
1558 free(ms->gen_ctx);
1559 ms->cleanup &= ~CLEANUP_TMPL;
1560 ms->gen_ctx = TMPL_CTX_DONE;
1561 return len;
1564 static int
1565 proxy_list_provs (void *cctx, struct MHD_Connection *conn, const char *url,
1566 const char *method, const char *version, const char *data,
1567 size_t *data_size, void **rctx)
1569 struct MHD_Response *resp;
1570 my_state *ms = *rctx;
1572 (void)cctx;
1573 (void)url;
1574 (void)method;
1575 (void)version;
1576 (void)data;
1577 (void)data_size;
1579 resp = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN,
1580 65536, prov_list_generator, ms, simple_closer);
1581 if (!resp) {
1582 fprintf(stderr,"MHD_crfd failed\n");
1583 simple_closer(ms);
1584 return MHD_NO;
1586 MHD_queue_response(conn,MHD_HTTP_OK,resp);
1587 MHD_destroy_response(resp);
1589 return MHD_YES;
1592 static int
1593 prov_iterator (void *ctx, enum MHD_ValueKind kind, const char *key,
1594 const char *filename, const char *content_type,
1595 const char *transfer_encoding, const char *data,
1596 uint64_t off, size_t size)
1598 (void)kind;
1599 (void)filename;
1600 (void)content_type;
1601 (void)transfer_encoding;
1602 (void)off;
1604 g_hash_table_insert(ctx,strdup(key),strndup(data,size));
1605 /* TBD: check return value for strdups (none avail for insert) */
1606 return MHD_YES;
1610 static int
1611 proxy_update_prov (void *cctx, struct MHD_Connection *conn, const char *url,
1612 const char *method, const char *version, const char *data,
1613 size_t *data_size, void **rctx)
1615 struct MHD_Response *resp;
1616 my_state *ms = *rctx;
1617 int rc;
1618 char *provider;
1619 char *username;
1620 char *password;
1622 (void)cctx;
1623 (void)method;
1624 (void)version;
1626 if (ms->state == MS_NEW) {
1627 ms->state = MS_NORMAL;
1628 ms->url = (char *)url;
1629 ms->dict = g_hash_table_new_full(
1630 g_str_hash,g_str_equal,free,free);
1631 ms->cleanup |= CLEANUP_DICT;
1632 ms->post = MHD_create_post_processor(conn,4096,
1633 prov_iterator,ms->dict);
1634 ms->cleanup |= CLEANUP_POST;
1636 else if (*data_size) {
1637 MHD_post_process(ms->post,data,*data_size);
1638 *data_size = 0;
1640 else {
1641 rc = MHD_HTTP_BAD_REQUEST;
1642 provider = g_hash_table_lookup(ms->dict,"provider");
1643 username = g_hash_table_lookup(ms->dict,"username");
1644 password = g_hash_table_lookup(ms->dict,"password");
1645 if (provider && username && password) {
1646 update_provider(provider,username,password);
1647 rc = MHD_HTTP_OK;
1649 else {
1650 DPRINTF("provider/username/password MISSING\n");
1652 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1653 if (!resp) {
1654 fprintf(stderr,"MHD_crfd failed\n");
1655 return MHD_NO;
1657 MHD_queue_response(conn,rc,resp);
1658 MHD_destroy_response(resp);
1659 free_ms(ms);
1662 return MHD_YES;
1665 static int
1666 proxy_create_bucket (void *cctx, struct MHD_Connection *conn, const char *url,
1667 const char *method, const char *version, const char *data,
1668 size_t *data_size, void **rctx)
1670 struct MHD_Response *resp;
1671 my_state *ms = *rctx;
1672 int rc;
1674 (void)cctx;
1675 (void)method;
1676 (void)version;
1677 (void)data;
1678 (void)data_size;
1679 (void)url;
1681 /* curl -T moo.empty http://localhost:9090/_new by accident */
1682 rc = create_bucket(ms->bucket,ms);
1684 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1685 if (!resp) {
1686 fprintf(stderr,"MHD_crfd failed\n");
1687 return MHD_NO;
1689 MHD_queue_response(conn,rc,resp);
1690 MHD_destroy_response(resp);
1692 free_ms(ms);
1693 return MHD_YES;
1696 static const rule my_rules[] = {
1697 { /* get bucket list */
1698 "GET", URL_ROOT, proxy_api_root },
1699 { /* perform a control operation on the API root */
1700 "POST", URL_ROOT, control_api_root },
1701 { /* get object list */
1702 "GET", URL_BUCKET, proxy_list_objs },
1703 { /* create bucket */
1704 "PUT", URL_BUCKET, proxy_create_bucket },
1705 { /* get object data */
1706 "GET", URL_OBJECT, proxy_get_data },
1707 { /* get attribute data */
1708 "GET", URL_ATTR, proxy_get_attr },
1709 { /* put object data */
1710 "PUT", URL_OBJECT, proxy_put_data },
1711 { /* put attribute data */
1712 "PUT", URL_ATTR, proxy_put_attr },
1713 { /* create object and/or modify attributes */
1714 "POST", URL_BUCKET, proxy_bucket_post },
1715 { /* perform control operations on an object */
1716 "POST", URL_OBJECT, proxy_object_post },
1717 { /* query */
1718 "POST", URL_QUERY, proxy_query },
1719 { /* delete object */
1720 "DELETE", URL_OBJECT, proxy_delete },
1721 { /* delete attribute (TBD) */
1722 "DELETE", URL_ATTR, NULL },
1723 { /* get provider list */
1724 "GET", URL_PROVLIST, proxy_list_provs },
1725 { /* update a provider */
1726 "POST", URL_PROVLIST, proxy_update_prov },
1727 { NULL, 0, NULL }
1730 static url_type
1731 parse_url (const char *url, my_state *ms)
1733 unsigned short esize;
1734 unsigned short eindex;
1735 char *parts[URL_INVAL];
1737 if (strstr(url,"../")) {
1738 /* Naughty, naughty. Never a good reason to allow this. */
1739 DPRINTF("Rejecting ../ in path.\n");
1740 return URL_INVAL;
1743 eindex = URL_ROOT;
1744 parts[URL_BUCKET] = ms->bucket;
1745 parts[URL_OBJECT] = ms->key;
1746 parts[URL_ATTR] = ms->attr;
1748 for (;;) {
1749 while (*url == '/') {
1750 ++url;
1753 if (!*url) {
1754 if (eindex == URL_BUCKET) {
1755 if (!strcmp(ms->bucket,"_providers")) {
1756 eindex = URL_PROVLIST;
1759 else if (eindex == URL_OBJECT) {
1760 if (!strcmp(ms->key,"_query")) {
1761 eindex = URL_QUERY;
1764 break;
1767 if (++eindex >= URL_INVAL) {
1768 return URL_INVAL;
1770 esize = 0;
1772 while (*url && (*url != '/')) {
1773 parts[eindex][esize++] = *(url++);
1774 if (esize >= MAX_FIELD_LEN) {
1775 return URL_INVAL;
1780 return eindex;
1783 static int
1784 access_handler (void *cctx, struct MHD_Connection *conn, const char *url,
1785 const char *method, const char *version, const char *data,
1786 size_t *data_size, void **rctx)
1788 unsigned int i;
1789 url_type utype;
1790 struct MHD_Response *resp;
1791 my_state *ms = *rctx;
1793 if (ms) {
1794 return ms->handler(cctx,conn,url,method,version,
1795 data,data_size,rctx);
1798 ms = calloc(sizeof(*ms), 1);
1799 if (!ms) {
1800 return MHD_NO;
1802 ms->refcnt = 1;
1804 utype = parse_url(url,ms);
1806 for (i = 0; my_rules[i].method; ++i) {
1807 if (utype != my_rules[i].utype) {
1808 continue;
1810 if (strcmp(method,my_rules[i].method)) {
1811 continue;
1813 if (!my_rules[i].handler) {
1814 break;
1816 ms->handler = my_rules[i].handler;
1817 ms->state = MS_NEW;
1818 ms->url = NULL;
1819 ms->post = NULL;
1820 ms->conn = conn;
1821 *rctx = ms;
1822 return ms->handler(cctx,conn,url,method,version,
1823 data,data_size,rctx);
1826 /* Don't need this after all. Free before the next check. */
1827 free_ms(ms);
1829 if (!strcmp(method,"QUIT")) {
1830 (void)sem_post((sem_t *)cctx);
1831 return MHD_NO;
1834 fprintf(stderr,"bad request m=%s u=%s\n",method,url);
1836 resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO);
1837 if (!resp) {
1838 return MHD_NO;
1840 MHD_queue_response(conn,MHD_HTTP_NOT_FOUND,resp);
1841 MHD_destroy_response(resp);
1842 return MHD_YES;
1845 /* These enum values cannot possibly conflict with the option values
1846 ordinarily used by commands, including CHAR_MAX + 1, etc. Avoid
1847 CHAR_MIN - 1, as it may equal -1, the getopt end-of-options value. */
1848 enum
1850 GETOPT_HELP_CHAR = (CHAR_MIN - 2),
1851 GETOPT_VERSION_CHAR = (CHAR_MIN - 3)
1854 static const struct option my_options[] = {
1855 { "autostart", no_argument, NULL, 'a' },
1856 { "config", required_argument, NULL, 'c' },
1857 { "db", required_argument, NULL, 'd' },
1858 { "master", required_argument, NULL, 'm' },
1859 { "port", required_argument, NULL, 'p' },
1860 { "verbose", no_argument, NULL, 'v' },
1861 { "version", no_argument, NULL, GETOPT_VERSION_CHAR },
1862 { "help", no_argument, NULL, GETOPT_HELP_CHAR },
1863 { NULL, 0, NULL, '\0' }
1866 static void
1867 usage (int status)
1869 if (status != EXIT_SUCCESS)
1870 fprintf (stderr, _("Try `%s --help' for more information.\n"),
1871 program_name);
1872 else
1874 printf (_("\
1875 Usage: %s [OPTION]\n\
1877 program_name);
1878 fputs (_("\
1879 Deltacloud image-warehouse daemon.\n\
1880 A configuration file must be specified.\n\
1882 -a, --autostart start necessary back-end services\n\
1883 -c, --config=FILE config file [required]\n\
1884 -d, --db=HOST_PORT database server as ip[:port]\n\
1885 -m, --master=HOST_PORT master (upstream) server as ip[:port]\n\
1886 -p, --port=PORT alternate listen port (default 9090)\n\
1887 -v, --verbose verbose/debug output\n\
1889 --help display this help and exit\n\
1890 --version output version information and exit\n\
1891 "), stdout);
1892 printf (_("\
1894 Report %s bugs to %s.\n\
1896 program_name, PACKAGE_BUGREPORT);
1898 exit (status);
1903 main (int argc, char **argv)
1905 struct MHD_Daemon *the_daemon;
1906 sem_t the_sem;
1907 char *stctx = NULL;
1908 char *port_tmp;
1909 int autostart = 0;
1911 program_name = argv[0];
1913 for (;;) switch (getopt_long(argc,argv,"ac:d:m:p:v",my_options,NULL)) {
1914 case 'a':
1915 ++autostart;
1916 break;
1917 case 'c':
1918 cfg_file = optarg;
1919 break;
1920 case 'd':
1921 assert (optarg);
1922 db_host = strtok_r(optarg,":",&stctx);
1923 port_tmp = strtok_r(NULL,":",&stctx);
1924 if (port_tmp) {
1925 db_port = (unsigned short)strtoul(port_tmp,NULL,10);
1927 break;
1928 case 'm':
1929 assert (optarg);
1930 master_host = strtok_r(optarg,":",&stctx);
1931 port_tmp = strtok_r(NULL,":",&stctx);
1932 if (port_tmp) {
1933 master_port = (unsigned short)strtoul(port_tmp,NULL,10);
1935 break;
1936 case 'p':
1937 my_port = (unsigned short)strtoul(optarg,NULL,10);
1938 break;
1939 case 'v':
1940 ++verbose;
1941 break;
1942 case GETOPT_HELP_CHAR:
1943 usage(EXIT_SUCCESS);
1944 break;
1945 case GETOPT_VERSION_CHAR:
1946 printf ("%s version %s\n", program_name, PACKAGE_VERSION);
1947 exit (EXIT_SUCCESS);
1948 break;
1950 case -1:
1951 goto args_done;
1952 default:
1953 usage(EXIT_FAILURE);
1954 break;
1956 args_done:
1958 if (!db_port) {
1959 db_port = autostart ? AUTO_MONGOD_PORT : 27017;
1962 if (autostart && cfg_file) {
1963 error(0,0,"do not use -c and -a simultaneously");
1964 return !0;
1966 else if (autostart && !cfg_file) {
1967 me = auto_config();
1968 if (!me) {
1969 /* error printed */
1970 return !0;
1973 else if (!autostart && cfg_file) {
1974 me = parse_config(cfg_file);
1975 if (!me) {
1976 error(0,0,"could not parse %s",cfg_file);
1977 return !0;
1980 else {
1981 error(0,0,"specify at least -c or -a");
1982 usage (EXIT_FAILURE);
1985 sem_init(&the_sem,0,0);
1987 if (verbose) {
1988 printf("primary store type is %s\n",main_prov->type);
1989 if (master_host) {
1990 printf("operating as slave to %s:%u\n",
1991 master_host, master_port);
1993 printf("db is at %s:%u\n",db_host,db_port);
1994 printf("will listen on port %u\n",my_port);
1995 printf("my location is \"%s\"\n",me);
1996 if (fflush(stdout) || ferror(stdout))
1997 error(EXIT_FAILURE, 0, "write failed");
2000 backend_init();
2001 meta_init();
2002 repl_init();
2005 * Gotcha: if we don't set the connection memory limit explicitly,
2006 * the per-connection buffer for MHD will be smaller than that used
2007 * by CURL, so proxy_writefunc will never be able to do its job.
2009 the_daemon = MHD_start_daemon(MY_MHD_FLAGS,
2010 my_port, NULL, NULL, &access_handler, &the_sem,
2011 MHD_OPTION_CONNECTION_MEMORY_LIMIT, (size_t)1048576,
2012 MHD_OPTION_END);
2013 if (!the_daemon) {
2014 fprintf(stderr,"Could not create daemon.\n");
2015 return !0;
2018 sem_wait(&the_sem);
2019 return 0;