avoid printf format abuse; use -Wformat-security; report errno more
[iwhd.git] / proxy.c
blobba34b8e0a954da7fc6c24ed83d94326dd59b04b2
1 /* Copyright (C) 2010 Free Software Foundation, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <getopt.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <semaphore.h>
25 #include <stdint.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <unistd.h>
31 #include <assert.h>
33 #include <microhttpd.h>
34 #include <curl/curl.h>
35 #include <hstor.h>
36 #include <jansson.h>
38 #include "iwh.h"
39 #include "proxy.h"
40 #include "query.h"
41 #include "meta.h"
44 * A config consists of a JSON array of objects, where each object includes:
46 * name string
47 * type string "s3" or "cf" or "http" (case insensitive)
48 * host string
49 * port integer
50 * key string S3 key or (optional TBD) HTTP user
51 * secret string S3 secret or (optional TBD) HTTP password
53 * The above fields are all used to implement replication once we've decided to
54 * do it. There may be other fields as well, to help us make that decision.
55 * For example, there might be one or more fields to describe geographic
56 * location, an array of supported image-format names, etc. This information
57 * is deliberately left "schema-free" so that users may add whatever fields
58 * they like to both the config and to replication attributes on objects.
61 /* Sizes for internal string buffers. */
62 #define ADDR_SIZE 1024
63 #define SVC_ACC_SIZE 128
64 #define HEADER_SIZE 64
66 extern backend_func_tbl bad_func_tbl;
67 extern backend_func_tbl s3_func_tbl;
68 extern backend_func_tbl curl_func_tbl;
69 extern backend_func_tbl fs_func_tbl;
71 typedef enum {
72 REPL_PUT, /* store an object */
73 REPL_ODELETE, /* delete an object */
74 REPL_BCREATE, /* create a bucket */
75 /* TBD: bucket deletion, others? */
76 } repl_t;
78 typedef struct _repl_item {
79 struct _repl_item *next;
80 repl_t type;
81 char *path;
82 unsigned int server;
83 size_t size;
84 int pipes[2];
85 } repl_item;
87 typedef struct {
88 char *cur_bucket;
89 char *cur_key;
90 json_t *cur_server;
91 } query_ctx_t;
93 repl_item *queue_head = NULL;
94 repl_item *queue_tail = NULL;
95 pthread_mutex_t queue_lock;
96 sem_t queue_sema;
97 json_t *config = NULL;
99 int
100 validate_server (unsigned int i)
102 json_t *server;
103 json_t *elem;
104 const char *name;
105 const char *type;
106 enum { NEED_NONE, NEED_SERVER, NEED_ALL } needs = NEED_ALL;
108 server = json_array_get(config,i);
109 if (!json_is_object(server)) {
110 error(0,0,"config elem %u: missing object\n",i);
111 return 0;
114 elem = json_object_get(server,"name");
115 if (!json_is_string(elem)) {
116 error(0,0,"config elem %u: missing name\n",i);
117 return 0;
119 name = json_string_value(elem);
121 elem = json_object_get(server,"type");
122 if (!json_is_string(elem)) {
123 error(0,0,"config elem %u (%s): missing type\n",i,name);
124 return 0;
126 type = json_string_value(elem);
128 if (!strcasecmp(type,"s3") || !strcasecmp(type,"cf")) {
129 needs = NEED_ALL;
131 else if (!strcasecmp(type,"http")) {
132 needs = NEED_SERVER;
134 else if (!strcasecmp(type,"fs")) {
135 needs = NEED_NONE;
137 else {
138 error(0,0,"config elem %u (%s): bad type\n",i,name);
139 return 0;
142 if (needs != NEED_NONE) {
143 elem = json_object_get(server,"host");
144 if (!json_is_string(elem)) {
145 error(0,0,"config elem %u (%s): missing host\n",
146 i,name);
147 return 0;
149 elem = json_object_get(server,"port");
150 if (!json_is_integer(elem)) {
151 error(0,0,"config elem %u (%s): missing port\n",
152 i,name);
153 return 0;
157 if (needs == NEED_ALL) {
158 elem = json_object_get(server,"key");
159 if (!json_is_string(elem)) {
160 error(0,0,"config elem %u (%s): missing S3 key\n",
161 i, name);
162 return 0;
164 elem = json_object_get(server,"secret");
165 if (!json_is_string(elem)) {
166 error(0,0,
167 "config elem %u (%s): missing S3 secret\n",
168 i, name);
169 return 0;
173 return 1;
176 /* We've already validated, so minimal checking here. */
177 char *
178 set_config (void)
180 json_t *server;
181 const char *type;
183 server = json_array_get(config,0);
184 type = json_string_value(json_object_get(server,"type"));
185 if (strcasecmp(type,"fs")) {
186 proxy_host = json_string_value(
187 json_object_get(server,"host"));
188 proxy_port = json_integer_value(
189 json_object_get(server,"port"));
190 if (!strcasecmp(type,"s3")) {
191 s3mode = 1;
192 proxy_key = json_string_value(
193 json_object_get(server,"key"));
194 proxy_secret = json_string_value(
195 json_object_get(server,"secret"));
197 else {
198 s3mode = 0;
202 return (char *)json_string_value(json_object_get(server,"name"));
205 char *
206 parse_config (void)
208 json_error_t err;
209 unsigned int nservers;
210 unsigned int i;
212 if (access(cfg_file,R_OK) < 0) {
213 error(0,errno,"failed to open %s for reading", cfg_file);
214 return NULL;
217 config = json_load_file(cfg_file,&err);
218 if (!config) {
219 error(0,0,"JSON error on line %d: %s\n",err.line,err.text);
220 return NULL;
223 if (json_typeof(config) != JSON_ARRAY) {
224 error(0,0,"config should be a JSON array\n");
225 goto err;
228 nservers = json_array_size(config);
229 if (!nservers) {
230 goto err;
233 for (i = 0; i < nservers; ++i) {
234 if (!validate_server(i)) {
235 goto err;
239 /* Everything looks OK. */
240 printf("%u replication servers defined\n",nservers-1);
241 return set_config();
243 err:
244 json_decref(config);
245 config = NULL;
246 return 0;
249 size_t
250 junk_writer (void *ptr, size_t size, size_t nmemb, void *stream)
252 size_t n;
254 n = fwrite(ptr,size,nmemb,stream);
255 fflush(stream);
256 DPRINTF("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
258 return n;
261 void *
262 proxy_repl_prod_fs (void *ctx)
264 repl_item *item = ctx;
265 int ifd;
266 int ofd;
267 char buf[1<<16];
268 ssize_t ibytes;
269 ssize_t obytes;
270 ssize_t offset;
272 DPRINTF("replicating from %s (FS)\n",item->path);
274 ifd = open(item->path,O_RDONLY);
275 if (ifd < 0) {
276 error(0,errno,"ifd open");
277 return THREAD_FAILED;
279 ofd = item->pipes[1];
281 for (;;) {
282 ibytes = read(ifd,buf,sizeof(buf));
283 if (ibytes <= 0) {
284 if (ibytes < 0) {
285 error(0,errno,"%s: read failed", item->path);
287 else {
288 DPRINTF("EOF on ifd\n");
290 break;
292 offset = 0;
293 do {
294 obytes = write(ofd,buf+offset,ibytes);
295 if (obytes <= 0) {
296 if (obytes < 0) {
297 error(0,errno,"ofd write");
299 else {
300 DPRINTF("zero-length write on ofd\n");
302 break;
304 ibytes -= obytes;
305 offset += obytes;
306 } while (ibytes > 0);
309 close(ifd);
310 close(ofd);
312 DPRINTF("%s returning\n",__func__);
313 close(item->pipes[1]);
314 return NULL;
317 void *
318 proxy_repl_prod (void *ctx)
320 repl_item *item = ctx;
321 FILE *fp = fdopen(item->pipes[1],"w");
322 char addr[ADDR_SIZE];
323 CURL *curl;
324 char svc_acc[SVC_ACC_SIZE];
325 struct hstor_client *hstor;
326 char *bucket;
327 char *key;
328 char *stctx;
329 char *myurl;
330 int chars;
332 chars = snprintf(addr,ADDR_SIZE,
333 "http://%s:%u/%s",proxy_host,proxy_port,item->path);
334 if (chars >= ADDR_SIZE) {
335 error(0,0,"path too long in %s\n",__func__);
336 goto done;
338 DPRINTF("replicating from %s\n",addr);
340 if (s3mode) {
341 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",
342 proxy_host,proxy_port);
343 if (chars >= SVC_ACC_SIZE) {
344 error(0,0,"svc_acc too long in %s\n",__func__);
345 goto done;
347 hstor = hstor_new(svc_acc,proxy_host,
348 proxy_key,proxy_secret);
349 /* Blech. Can't conflict with consumer, though. */
350 myurl = strdup(item->path);
351 assert (myurl);
352 bucket = strtok_r(myurl,"/",&stctx);
353 key = strtok_r(NULL,"/",&stctx);
354 hstor_get(hstor,bucket,key,
355 junk_writer,fp,0);
356 hstor_free(hstor);
357 free(myurl);
359 else {
360 curl = curl_easy_init();
361 curl_easy_setopt(curl,CURLOPT_URL,addr);
362 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,junk_writer);
363 curl_easy_setopt(curl,CURLOPT_WRITEDATA,fp);
364 DPRINTF("%s calling curl_easy_perform\n",__func__);
365 curl_easy_perform(curl);
366 curl_easy_cleanup(curl);
369 done:
370 DPRINTF("%s returning\n",__func__);
371 /* Closing should signal to the consumer that we're finished. */
372 close(item->pipes[1]);
373 return NULL;
376 size_t
377 junk_reader (void *ptr, size_t size, size_t nmemb, void *stream)
379 size_t n;
381 n = fread(ptr,size,nmemb,stream);
382 printf("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
383 return n;
386 size_t
387 cf_writer (void *ptr ATTRIBUTE_UNUSED, size_t size, size_t nmemb,
388 void *stream ATTRIBUTE_UNUSED)
390 return size * nmemb;
393 size_t
394 cf_header (void *ptr, size_t size, size_t nmemb, void *stream)
396 char *next;
397 char *sctx;
398 json_t *server = (json_t *)stream;
400 next = strtok_r(ptr,":",&sctx);
401 if (next) {
402 if (!strcasecmp(next,"X-Storage-Url")) {
403 next = strtok_r(NULL," \n\r",&sctx);
404 if (next) {
405 DPRINTF("got CF URL %s\n",next);
406 /* NB: after this, original "host" is gone. */
407 json_object_set_new(server,"host",
408 json_string(next));
411 else if (!strcasecmp(next,"X-Storage-Token")) {
412 next = strtok_r(NULL," \n\r",&sctx);
413 if (next) {
414 DPRINTF("got CF token %s\n",next);
415 json_object_set_new(server,"token",
416 json_string(next));
420 return size * nmemb;
423 const char *
424 get_cloudfiles_token (json_t *server, const char *host, unsigned int port,
425 const char * user, const char * key)
427 CURL *curl;
428 char addr[ADDR_SIZE];
429 char auth_user[HEADER_SIZE];
430 char auth_key[HEADER_SIZE];
431 json_t *token_obj;
432 struct curl_slist *slist;
433 int chars;
435 token_obj = json_object_get(server,"token");
436 if (token_obj) {
437 return json_string_value(token_obj);
440 chars = snprintf(addr,ADDR_SIZE,"https://%s:%u/v1.0",host,port);
441 if (chars >= ADDR_SIZE) {
442 error(0,0,"API URL too long in %s\n",__func__);
443 return NULL;
446 chars = snprintf(auth_user,HEADER_SIZE,"X-Auth-User: %s",user);
447 if (chars >= HEADER_SIZE) {
448 error(0,0,"auth_user too long in %s\n",__func__);
449 return NULL;
452 chars = snprintf(auth_key,HEADER_SIZE,"X-Auth-Key: %s",key);
453 if (chars >= HEADER_SIZE) {
454 error(0,0,"auth_key too long in %s\n",__func__);
455 return NULL;
458 curl = curl_easy_init();
459 curl_easy_setopt(curl,CURLOPT_URL,addr);
460 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,cf_writer);
461 curl_easy_setopt(curl,CURLOPT_HEADERFUNCTION,cf_header);
462 curl_easy_setopt(curl,CURLOPT_WRITEHEADER,server);
463 slist = curl_slist_append(NULL,auth_user);
464 slist = curl_slist_append(slist,auth_key);
465 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
466 curl_easy_perform(curl);
467 curl_easy_cleanup(curl);
468 curl_slist_free_all(slist);
470 token_obj = json_object_get(server,"token");
471 return token_obj ? json_string_value(token_obj) : NULL;
474 void *
475 proxy_repl_cons (void *ctx)
477 repl_item *item = ctx;
478 FILE *fp = fdopen(item->pipes[0],"r");
479 char addr[ADDR_SIZE];
480 CURL *curl;
481 json_t *server;
482 char svc_acc[SVC_ACC_SIZE];
483 char auth_hdr[HEADER_SIZE];
484 struct hstor_client *hstor;
485 char *bucket;
486 char *key;
487 char *stctx;
488 const char *s_host;
489 unsigned int s_port;
490 const char *s_key;
491 const char *s_secret;
492 const char *s_type;
493 const char *s_name;
494 struct curl_slist *slist;
495 char *myurl;
496 int chars;
498 server = json_array_get(config,item->server);
499 s_host = json_string_value(json_object_get(server,"host"));
500 s_port = json_integer_value(json_object_get(server,"port"));
501 s_key = json_string_value(json_object_get(server,"key"));
502 s_secret = json_string_value(json_object_get(server,"secret"));
503 s_type = json_string_value(json_object_get(server,"type"));
504 s_name = json_string_value(json_object_get(server,"name"));
506 myurl = strdup(item->path);
507 assert (myurl);
508 bucket = strtok_r(myurl,"/",&stctx);
509 key = strtok_r(NULL,"/",&stctx);
511 if (!strcasecmp(s_type,"s3")) {
512 DPRINTF("replicating %zu to %s/%s (S3)\n",item->size,s_host,
513 item->path);
514 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
515 if (chars >= SVC_ACC_SIZE) {
516 error(0,0,"svc_acc too long in %s\n",__func__);
517 return THREAD_FAILED;
519 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
520 /* Blech. Can't conflict with producer, though. */
521 hstor_put(hstor,bucket,key,
522 junk_reader,item->size,fp,NULL);
523 hstor_free(hstor);
525 else {
526 const char *token_str = NULL;
527 if (!strcasecmp(s_type,"cf")) {
528 token_str = get_cloudfiles_token(server,s_host,s_port,
529 s_key, s_secret);
530 if (!token_str) {
531 DPRINTF("could not get CF token\n");
532 return THREAD_FAILED;
534 /* Re-fetch as this might have changed. */
535 s_host = json_string_value(json_object_get(server,
536 "host"));
537 chars = snprintf(addr,ADDR_SIZE,"%s/%s",
538 s_host,item->path);
539 if (chars >= ADDR_SIZE) {
540 error(0,0,"CF path too long in %s\n",__func__);
541 return THREAD_FAILED;
543 DPRINTF("replicating %zu to %s (CF)\n",item->size,
544 addr);
546 else {
547 chars = snprintf(addr,ADDR_SIZE,"http://%s:%u/%s",
548 s_host,s_port,item->path);
549 if (chars >= ADDR_SIZE) {
550 error(0,0,"HTTP path too long in %s\n",
551 __func__);
552 return THREAD_FAILED;
554 DPRINTF("replicating %zu to %s (repod)\n",item->size,
555 addr);
557 curl = curl_easy_init();
558 curl_easy_setopt(curl,CURLOPT_URL,addr);
559 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
560 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
561 (curl_off_t)item->size);
562 curl_easy_setopt(curl,CURLOPT_READFUNCTION,junk_reader);
563 if (!strcasecmp(s_type,"cf")) {
564 chars = snprintf(auth_hdr,HEADER_SIZE,
565 "X-Auth-Token: %s",token_str);
566 if (chars >= HEADER_SIZE) {
567 error(0,0,"auth_token too long in %s\n",
568 __func__);
569 return THREAD_FAILED;
571 slist = curl_slist_append(NULL,auth_hdr);
573 * Rackspace doesn't clearly document that you'll get
574 * 412 (Precondition Failed) if you omit this.
576 slist = curl_slist_append(slist,
577 "Content-Type: binary/octet-stream");
579 else {
580 slist = curl_slist_append(NULL,"X-redhat-role: master");
582 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
583 curl_easy_setopt(curl,CURLOPT_READDATA,fp);
584 DPRINTF("%s calling curl_easy_perform\n",__func__);
585 curl_easy_perform(curl);
586 curl_easy_cleanup(curl);
587 curl_slist_free_all(slist);
590 DPRINTF("%s returning\n",__func__);
591 close(item->pipes[0]);
592 meta_got_copy(bucket,key,s_name);
593 free(myurl);
594 return NULL;
597 void
598 repl_worker_del (const repl_item *item)
600 json_t *server;
601 const char *s_host;
602 unsigned int s_port;
603 const char *s_key;
604 const char *s_secret;
605 const char *s_type;
606 char svc_acc[SVC_ACC_SIZE];
607 struct hstor_client *hstor;
608 char addr[ADDR_SIZE];
609 CURL *curl;
610 char *bucket;
611 char *key;
612 char *stctx;
613 int chars;
615 server = json_array_get(config,item->server);
616 s_host = json_string_value(json_object_get(server,"host"));
617 s_port = json_integer_value(json_object_get(server,"port"));
618 s_key = json_string_value(json_object_get(server,"key"));
619 s_secret = json_string_value(json_object_get(server,"secret"));
620 s_type = json_string_value(json_object_get(server,"type"));
622 if (!strcasecmp(s_type,"s3")) {
623 DPRINTF("%s replicating delete of %s on %s:%u (S3)\n",__func__,
624 item->path, s_host, s_port);
625 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
626 if (chars >= SVC_ACC_SIZE) {
627 error(0,0,"svc_acc too long in %s\n",__func__);
628 return;
630 /* TBD: check return */
631 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
632 assert (item->path);
633 bucket = strtok_r(item->path,"/",&stctx);
634 key = strtok_r(NULL,"/",&stctx);
635 (void)hstor_del(hstor,bucket,key);
636 hstor_free(hstor);
638 else {
639 DPRINTF("%s replicating delete of %s on %s:%u (HTTP)\n",
640 __func__, item->path, s_host, s_port);
641 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d%s",
642 s_host,s_port,item->path);
643 if (chars >= ADDR_SIZE) {
644 error(0,0,"path too long in %s\n",__func__);
645 return;
647 curl = curl_easy_init();
648 curl_easy_setopt(curl,CURLOPT_URL,addr);
649 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
650 curl_easy_perform(curl);
651 curl_easy_cleanup(curl);
654 DPRINTF("%s returning\n",__func__);
657 void
658 repl_worker_bcreate (repl_item *item)
660 json_t *server;
661 const char *s_host;
662 unsigned int s_port;
663 const char *s_key;
664 const char *s_secret;
665 const char *s_type;
666 char svc_acc[SVC_ACC_SIZE];
667 struct hstor_client *hstor;
668 char addr[ADDR_SIZE];
669 CURL *curl;
670 int chars;
672 server = json_array_get(config,item->server);
673 s_host = json_string_value(json_object_get(server,"host"));
674 s_port = json_integer_value(json_object_get(server,"port"));
675 s_key = json_string_value(json_object_get(server,"key"));
676 s_secret = json_string_value(json_object_get(server,"secret"));
677 s_type = json_string_value(json_object_get(server,"type"));
679 if (!strcasecmp(s_type,"s3")) {
680 DPRINTF("%s replicating create of bucket %s on %s:%u (S3)\n",
681 __func__, item->path, s_host, s_port);
682 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
683 if (chars >= SVC_ACC_SIZE) {
684 error(0,0,"svc_acc too long in %s\n",__func__);
685 return;
687 /* TBD: check return */
688 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
689 assert (item->path);
690 if (!hstor_add_bucket(hstor,item->path)) {
691 error(0,0,"bucket create failed for %s\n",
692 item->path);
694 hstor_free(hstor);
696 else {
697 DPRINTF("%s replicating create of bucket %s on %s:%u (HTTP)\n",
698 __func__, item->path, s_host, s_port);
699 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d/%s",
700 s_host,s_port,item->path);
701 if (chars >= ADDR_SIZE) {
702 error(0,0,"path too long in %s\n",__func__);
703 return;
705 curl = curl_easy_init();
706 curl_easy_setopt(curl,CURLOPT_URL,addr);
707 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
708 curl_easy_perform(curl);
709 curl_easy_cleanup(curl);
712 DPRINTF("%s returning\n",__func__);
715 void *
716 repl_worker (void *notused ATTRIBUTE_UNUSED)
718 repl_item *item;
719 pthread_t cons;
720 pthread_t prod;
722 for (;;) {
723 sem_wait(&queue_sema);
724 pthread_mutex_lock(&queue_lock);
725 item = queue_head;
726 queue_head = item->next;
727 if (!queue_head) {
728 queue_tail = NULL;
730 pthread_mutex_unlock(&queue_lock);
732 switch (item->type) {
733 case REPL_PUT:
734 if (pipe(item->pipes) >= 0) {
735 if (proxy_host) {
736 pthread_create(&prod,NULL,
737 proxy_repl_prod,item);
739 else {
740 pthread_create(&prod,NULL,
741 proxy_repl_prod_fs,item);
743 pthread_create(&cons,NULL,proxy_repl_cons,item);
744 pthread_join(prod,NULL);
745 pthread_join(cons,NULL);
747 else {
748 error(0,errno,"pipe");
750 break;
751 case REPL_ODELETE:
752 repl_worker_del(item);
753 break;
754 case REPL_BCREATE:
755 repl_worker_bcreate(item);
756 break;
757 default:
758 error(0,0,"bad repl type %d (url=%s) skipped\n",
759 item->type, item->path);
761 free(item->path);
762 free(item);
766 void
767 repl_init (void)
769 pthread_t tid;
771 sem_init(&queue_sema,0,0);
772 pthread_mutex_init(&queue_lock,NULL);
773 pthread_create(&tid,NULL,repl_worker,NULL);
776 char *
777 repl_oget (void *ctx, const char *id)
779 query_ctx_t *qctx = ctx;
780 char *cur_value;
782 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
784 return cur_value;
787 char *
788 repl_sget (void *ctx, const char *id)
790 query_ctx_t *qctx = ctx;
791 json_t *elem = json_object_get(qctx->cur_server,id);
793 return elem ? (char *)json_string_value(elem) : NULL;
796 void
797 replicate (const char *url, size_t size, const char *policy)
799 unsigned int i;
800 repl_item *item;
801 value_t *expr;
802 int res;
803 char *url2;
804 char *stctx;
805 query_ctx_t qctx;
806 getter_t oget;
807 getter_t sget;
809 url2 = strdup(url);
810 if (!url2) {
811 error(0,0,"could not parse url %s\n",url);
812 return;
814 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
815 qctx.cur_key = strtok_r(NULL,"/",&stctx);
817 if (!size) {
818 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
819 DPRINTF("fetched size %zu for %s\n",size,url);
822 if (policy) {
823 DPRINTF("--- policy = %s\n",policy);
824 expr = parse(policy);
826 else {
827 expr = NULL;
830 oget.func = repl_oget;
831 oget.ctx = &qctx;
832 sget.func = repl_sget;
833 sget.ctx = &qctx;
835 for (i = 1; i < json_array_size(config); ++i) {
836 if (expr) {
837 qctx.cur_server = json_array_get(config,i);
838 res = eval(expr,&oget,&sget);
840 else {
841 res = 0;
843 if (res <= 0) {
844 DPRINTF("skipping %u for %s\n",i,url);
845 continue;
847 DPRINTF("REPLICATING %s to %u\n",url,i);
848 item = malloc(sizeof(*item));
849 if (!item) {
850 error(0,errno,"could not create repl_item for %s\n",
851 url);
852 break;
854 item->type = REPL_PUT;
855 item->path = strdup(url);
856 if (!item->path) {
857 error(0,errno,"could not create repl_item for %s\n",
858 url);
859 break;
861 item->server = i;
862 item->size = size;
863 pthread_mutex_lock(&queue_lock);
864 if (queue_tail) {
865 item->next = queue_tail->next;
866 queue_tail->next = item;
868 else {
869 item->next = NULL;
870 queue_head = item;
872 queue_tail = item;
873 pthread_mutex_unlock(&queue_lock);
874 sem_post(&queue_sema);
877 if (expr) {
878 free_value(expr);
880 free(url2);
883 void
884 replicate_namespace_action (const char *name, repl_t action)
886 unsigned int i;
887 repl_item *item;
889 for (i = 1; i < json_array_size(config); ++i) {
890 DPRINTF("replicating delete(%s) on %u\n",name,i);
891 item = malloc(sizeof(*item));
892 if (!item) {
893 error(0,errno,"could not create repl_item for %s\n",
894 name);
895 return;
897 item->type = action;
898 item->path = strdup(name);
899 if (!item->path) {
900 free(item);
901 return;
903 item->server = i;
904 pthread_mutex_lock(&queue_lock);
905 if (queue_tail) {
906 item->next = queue_tail->next;
907 queue_tail->next = item;
909 else {
910 item->next = NULL;
911 queue_head = item;
913 queue_tail = item;
914 pthread_mutex_unlock(&queue_lock);
915 sem_post(&queue_sema);
919 void
920 replicate_delete (const char *name)
922 replicate_namespace_action(name,REPL_ODELETE);
925 void
926 replicate_bcreate (const char *name)
928 replicate_namespace_action(name,REPL_BCREATE);
932 get_provider (int i, provider_t *out)
934 json_t *server;
936 server = json_array_get(config,i);
937 if (!server) {
938 DPRINTF("no such entry %d\n",i);
939 return 0;
942 out->index = i;
943 out->name = json_string_value(json_object_get(server,"name"));
944 out->type = json_string_value(json_object_get(server,"type"));
945 out->host = json_string_value(json_object_get(server,"host"));
946 out->port = json_integer_value(json_object_get(server,"port"));
947 /* TBD: change key/secret field names to username/password */
948 out->username = json_string_value(json_object_get(server,"key"));
949 out->password = json_string_value(json_object_get(server,"secret"));
951 /* Use empty strings instead of NULL. */
952 if (!out->username) out->username = "";
953 if (!out->password) out->password = "";
955 /* TBD: do this a cleaner way. */
956 if (!strcasecmp(out->type,"s3")) {
957 out->func_tbl = &s3_func_tbl;
959 else if (!strcasecmp(out->type,"http")) {
960 out->func_tbl = &curl_func_tbl;
962 else if (!strcasecmp(out->type,"fs")) {
963 out->func_tbl = &fs_func_tbl;
965 else {
966 out->func_tbl = &bad_func_tbl;
969 return 1;
972 char *
973 get_provider_value (int i, const char *fname)
975 json_t *server;
976 json_t *field;
978 server = json_array_get(config,i);
979 if (!server) {
980 DPRINTF("no such entry %d\n",i);
981 return NULL;
984 field = json_object_get(server,fname);
985 if (!field) {
986 DPRINTF("no such field %s on server %d\n",fname,i);
987 return NULL;
990 return (char *)json_string_value(field);
993 void
994 update_provider (const char *provider, const char *username, const char *password)
996 int i;
997 json_t *server;
998 const char *s_name;
1000 DPRINTF("updating %s username=%s password=%s\n",
1001 provider, username, password);
1003 for (i = 0; ; ++i) {
1004 server = json_array_get(config,i);
1005 if (!server) {
1006 DPRINTF(" could not find provider %s\n",provider);
1007 break;
1009 s_name = json_string_value(json_object_get(server,"name"));
1010 DPRINTF(" checking %s\n",s_name);
1011 if (s_name && !strcmp(s_name,provider)) {
1012 json_object_set(server,"key",json_string(username));
1013 json_object_set(server,"secret",json_string(password));
1014 break;
1019 /* Part of our API to the query module. */
1020 char *
1021 follow_link (char *object, char *key)
1023 (void)object;
1024 (void)key;
1026 return "no_such_object";