Fix ms leaks when object not found, deleted.
[iwhd.git] / proxy.c
blobb2f3a6bf719f8205e71782cc9f6247ee83a2ce64
1 /* Copyright (C) 2010 Free Software Foundation, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <getopt.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <semaphore.h>
25 #include <stdint.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <unistd.h>
31 #include <assert.h>
33 #include <microhttpd.h>
34 #include <curl/curl.h>
35 #include <hstor.h>
36 #include <jansson.h>
38 #include "iwh.h"
39 #include "proxy.h"
40 #include "query.h"
41 #include "meta.h"
44 * A config consists of a JSON array of objects, where each object includes:
46 * name string
47 * type string "s3" or "cf" or "http" (case insensitive)
48 * host string
49 * port integer
50 * key string S3 key or (optional TBD) HTTP user
51 * secret string S3 secret or (optional TBD) HTTP password
53 * The above fields are all used to implement replication once we've decided to
54 * do it. There may be other fields as well, to help us make that decision.
55 * For example, there might be one or more fields to describe geographic
56 * location, an array of supported image-format names, etc. This information
57 * is deliberately left "schema-free" so that users may add whatever fields
58 * they like to both the config and to replication attributes on objects.
61 /* Sizes for internal string buffers. */
62 #define ADDR_SIZE 1024
63 #define SVC_ACC_SIZE 128
64 #define HEADER_SIZE 64
66 /* Bitfield for things to check in validate_server */
67 #define NEED_NONE 0
68 #define NEED_SERVER 0x00000001
69 #define NEED_CREDS 0x00000002
70 #define NEED_PATH 0x00000004
71 #define NEED_ALL ~0
73 extern backend_func_tbl bad_func_tbl;
74 extern backend_func_tbl s3_func_tbl;
75 extern backend_func_tbl curl_func_tbl;
76 extern backend_func_tbl fs_func_tbl;
78 typedef enum {
79 REPL_PUT, /* store an object */
80 REPL_ODELETE, /* delete an object */
81 REPL_BCREATE, /* create a bucket */
82 /* TBD: bucket deletion, others? */
83 } repl_t;
85 typedef struct _repl_item {
86 struct _repl_item *next;
87 repl_t type;
88 char *path;
89 unsigned int server;
90 size_t size;
91 int pipes[2];
92 } repl_item;
94 typedef struct {
95 char *cur_bucket;
96 char *cur_key;
97 json_t *cur_server;
98 } query_ctx_t;
100 repl_item *queue_head = NULL;
101 repl_item *queue_tail = NULL;
102 pthread_mutex_t queue_lock;
103 sem_t queue_sema;
104 json_t *config = NULL;
107 validate_server (unsigned int i)
109 json_t *server;
110 json_t *elem;
111 const char *name;
112 const char *type;
113 unsigned int needs = NEED_ALL;
115 server = json_array_get(config,i);
116 if (!json_is_object(server)) {
117 error(0,0,"config elem %u: missing object",i);
118 return 0;
121 elem = json_object_get(server,"name");
122 if (!json_is_string(elem)) {
123 error(0,0,"config elem %u: missing name",i);
124 return 0;
126 name = json_string_value(elem);
128 elem = json_object_get(server,"type");
129 if (!json_is_string(elem)) {
130 error(0,0,"config elem %u (%s): missing type",i,name);
131 return 0;
133 type = json_string_value(elem);
135 if (!strcasecmp(type,"s3") || !strcasecmp(type,"cf")) {
136 needs = NEED_SERVER | NEED_CREDS;
138 else if (!strcasecmp(type,"http")) {
139 needs = NEED_SERVER;
141 else if (!strcasecmp(type,"fs")) {
142 needs = NEED_PATH;
144 else {
145 error(0,0,"config elem %u (%s): bad type",i,name);
146 return 0;
149 if (needs & NEED_SERVER) {
150 elem = json_object_get(server,"host");
151 if (!json_is_string(elem)) {
152 error(0,0,"config elem %u (%s): missing host",
153 i,name);
154 return 0;
156 elem = json_object_get(server,"port");
157 if (!json_is_integer(elem)) {
158 error(0,0,"config elem %u (%s): missing port",
159 i,name);
160 return 0;
164 if (needs & NEED_CREDS) {
165 elem = json_object_get(server,"key");
166 if (!json_is_string(elem)) {
167 error(0,0,"config elem %u (%s): missing key",
168 i, name);
169 return 0;
171 elem = json_object_get(server,"secret");
172 if (!json_is_string(elem)) {
173 error(0,0, "config elem %u (%s): missing secret",
174 i, name);
175 return 0;
179 if (needs & NEED_PATH) {
180 elem = json_object_get(server,"path");
181 if (!json_is_string(elem)) {
182 error(0,0,"config elem %u (%s): missing path",
183 i, name);
184 return 0;
188 return 1;
191 /* We've already validated, so minimal checking here. */
192 char *
193 set_config (void)
195 json_t *server;
196 const char *type;
198 server = json_array_get(config,0);
199 type = json_string_value(json_object_get(server,"type"));
200 if (strcasecmp(type,"fs")) {
201 proxy_host = json_string_value(
202 json_object_get(server,"host"));
203 proxy_port = json_integer_value(
204 json_object_get(server,"port"));
205 if (!strcasecmp(type,"s3")) {
206 s3mode = 1;
207 proxy_key = json_string_value(
208 json_object_get(server,"key"));
209 proxy_secret = json_string_value(
210 json_object_get(server,"secret"));
212 else {
213 s3mode = 0;
216 else {
217 local_path = json_string_value(
218 json_object_get(server,"path"));
221 return (char *)json_string_value(json_object_get(server,"name"));
224 char *
225 parse_config (void)
227 json_error_t err;
228 unsigned int nservers;
229 unsigned int i;
231 if (access(cfg_file,R_OK) < 0) {
232 error(0,errno,"failed to open %s for reading", cfg_file);
233 return NULL;
236 config = json_load_file(cfg_file,&err);
237 if (!config) {
238 error(0,0,"JSON error on line %d: %s",err.line,err.text);
239 return NULL;
242 if (json_typeof(config) != JSON_ARRAY) {
243 error(0,0,"config should be a JSON array");
244 goto err;
247 nservers = json_array_size(config);
248 if (!nservers) {
249 goto err;
252 for (i = 0; i < nservers; ++i) {
253 if (!validate_server(i)) {
254 goto err;
258 /* Everything looks OK. */
259 printf("%u replication servers defined\n",nservers-1);
260 return set_config();
262 err:
263 json_decref(config);
264 config = NULL;
265 return 0;
268 size_t
269 junk_writer (void *ptr, size_t size, size_t nmemb, void *stream)
271 size_t n;
273 n = fwrite(ptr,size,nmemb,stream);
274 fflush(stream);
275 DPRINTF("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
277 return n;
280 void *
281 proxy_repl_prod_fs (void *ctx)
283 repl_item *item = ctx;
284 int ifd;
285 int ofd;
286 char buf[1<<16];
287 ssize_t ibytes;
288 ssize_t obytes;
289 ssize_t offset;
291 DPRINTF("replicating from %s (FS)\n",item->path);
293 ifd = open(item->path,O_RDONLY);
294 if (ifd < 0) {
295 error(0,errno,"ifd open");
296 return THREAD_FAILED;
298 ofd = item->pipes[1];
300 for (;;) {
301 ibytes = read(ifd,buf,sizeof(buf));
302 if (ibytes <= 0) {
303 if (ibytes < 0) {
304 error(0,errno,"%s: read failed", item->path);
306 else {
307 DPRINTF("EOF on ifd\n");
309 break;
311 offset = 0;
312 do {
313 obytes = write(ofd,buf+offset,ibytes);
314 if (obytes <= 0) {
315 if (obytes < 0) {
316 error(0,errno,"ofd write");
318 else {
319 DPRINTF("zero-length write on ofd\n");
321 break;
323 ibytes -= obytes;
324 offset += obytes;
325 } while (ibytes > 0);
328 close(ifd);
329 close(ofd);
331 DPRINTF("%s returning\n",__func__);
332 close(item->pipes[1]);
333 return NULL;
336 void *
337 proxy_repl_prod (void *ctx)
339 repl_item *item = ctx;
340 FILE *fp = fdopen(item->pipes[1],"w");
341 char addr[ADDR_SIZE];
342 CURL *curl;
343 char svc_acc[SVC_ACC_SIZE];
344 struct hstor_client *hstor;
345 char *bucket;
346 char *key;
347 char *stctx;
348 char *myurl;
349 int chars;
351 if (fp == NULL) {
352 error(0, errno, "%s: fdopen failed", __func__);
353 return NULL;
356 chars = snprintf(addr,ADDR_SIZE,
357 "http://%s:%u/%s",proxy_host,proxy_port,item->path);
358 if (chars >= ADDR_SIZE) {
359 error(0,0,"path too long in %s",__func__);
360 goto done;
362 DPRINTF("replicating from %s\n",addr);
364 if (s3mode) {
365 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",
366 proxy_host,proxy_port);
367 if (chars >= SVC_ACC_SIZE) {
368 error(0,0,"svc_acc too long in %s",__func__);
369 goto done;
371 hstor = hstor_new(svc_acc,proxy_host,
372 proxy_key,proxy_secret);
373 /* Blech. Can't conflict with consumer, though. */
374 myurl = strdup(item->path);
375 assert (myurl);
376 bucket = strtok_r(myurl,"/",&stctx);
377 key = strtok_r(NULL,"/",&stctx);
378 hstor_get(hstor,bucket,key,
379 junk_writer,fp,0);
380 hstor_free(hstor);
381 free(myurl);
383 else {
384 curl = curl_easy_init();
385 curl_easy_setopt(curl,CURLOPT_URL,addr);
386 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,junk_writer);
387 curl_easy_setopt(curl,CURLOPT_WRITEDATA,fp);
388 DPRINTF("%s calling curl_easy_perform\n",__func__);
389 curl_easy_perform(curl);
390 curl_easy_cleanup(curl);
393 done:
394 DPRINTF("%s returning\n",__func__);
395 /* Closing should signal to the consumer that we're finished. */
396 fclose(fp);
397 return NULL;
400 size_t
401 junk_reader (void *ptr, size_t size, size_t nmemb, void *stream)
403 size_t n;
405 n = fread(ptr,size,nmemb,stream);
406 printf("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
407 return n;
410 size_t
411 cf_writer (void *ptr ATTRIBUTE_UNUSED, size_t size, size_t nmemb,
412 void *stream ATTRIBUTE_UNUSED)
414 return size * nmemb;
417 size_t
418 cf_header (void *ptr, size_t size, size_t nmemb, void *stream)
420 char *next;
421 char *sctx;
422 json_t *server = (json_t *)stream;
424 next = strtok_r(ptr,":",&sctx);
425 if (next) {
426 if (!strcasecmp(next,"X-Storage-Url")) {
427 next = strtok_r(NULL," \n\r",&sctx);
428 if (next) {
429 DPRINTF("got CF URL %s\n",next);
430 /* NB: after this, original "host" is gone. */
431 json_object_set_new(server,"host",
432 json_string(next));
435 else if (!strcasecmp(next,"X-Storage-Token")) {
436 next = strtok_r(NULL," \n\r",&sctx);
437 if (next) {
438 DPRINTF("got CF token %s\n",next);
439 json_object_set_new(server,"token",
440 json_string(next));
444 return size * nmemb;
447 const char *
448 get_cloudfiles_token (json_t *server, const char *host, unsigned int port,
449 const char * user, const char * key)
451 CURL *curl;
452 char addr[ADDR_SIZE];
453 char auth_user[HEADER_SIZE];
454 char auth_key[HEADER_SIZE];
455 json_t *token_obj;
456 struct curl_slist *slist;
457 int chars;
459 token_obj = json_object_get(server,"token");
460 if (token_obj) {
461 return json_string_value(token_obj);
464 chars = snprintf(addr,ADDR_SIZE,"https://%s:%u/v1.0",host,port);
465 if (chars >= ADDR_SIZE) {
466 error(0,0,"API URL too long in %s",__func__);
467 return NULL;
470 chars = snprintf(auth_user,HEADER_SIZE,"X-Auth-User: %s",user);
471 if (chars >= HEADER_SIZE) {
472 error(0,0,"auth_user too long in %s",__func__);
473 return NULL;
476 chars = snprintf(auth_key,HEADER_SIZE,"X-Auth-Key: %s",key);
477 if (chars >= HEADER_SIZE) {
478 error(0,0,"auth_key too long in %s",__func__);
479 return NULL;
482 curl = curl_easy_init();
483 curl_easy_setopt(curl,CURLOPT_URL,addr);
484 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,cf_writer);
485 curl_easy_setopt(curl,CURLOPT_HEADERFUNCTION,cf_header);
486 curl_easy_setopt(curl,CURLOPT_WRITEHEADER,server);
487 slist = curl_slist_append(NULL,auth_user);
488 slist = curl_slist_append(slist,auth_key);
489 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
490 curl_easy_perform(curl);
491 curl_easy_cleanup(curl);
492 curl_slist_free_all(slist);
494 token_obj = json_object_get(server,"token");
495 return token_obj ? json_string_value(token_obj) : NULL;
498 void *
499 proxy_repl_cons (void *ctx)
501 repl_item *item = ctx;
502 FILE *fp = fdopen(item->pipes[0],"r");
503 char addr[ADDR_SIZE];
504 CURL *curl;
505 json_t *server;
506 char svc_acc[SVC_ACC_SIZE];
507 char auth_hdr[HEADER_SIZE];
508 struct hstor_client *hstor;
509 char *bucket;
510 char *key;
511 char *stctx;
512 const char *s_host;
513 unsigned int s_port;
514 const char *s_key;
515 const char *s_secret;
516 const char *s_type;
517 const char *s_name;
518 struct curl_slist *slist;
519 char *myurl;
520 int chars;
522 if (fp == NULL) {
523 error(0, errno, "%s: fdopen failed", __func__);
524 return THREAD_FAILED;
527 server = json_array_get(config,item->server);
528 s_host = json_string_value(json_object_get(server,"host"));
529 s_port = json_integer_value(json_object_get(server,"port"));
530 s_key = json_string_value(json_object_get(server,"key"));
531 s_secret = json_string_value(json_object_get(server,"secret"));
532 s_type = json_string_value(json_object_get(server,"type"));
533 s_name = json_string_value(json_object_get(server,"name"));
535 myurl = strdup(item->path);
536 assert (myurl);
537 bucket = strtok_r(myurl,"/",&stctx);
538 key = strtok_r(NULL,"/",&stctx);
540 if (!strcasecmp(s_type,"s3")) {
541 DPRINTF("replicating %zu to %s/%s (S3)\n",item->size,s_host,
542 item->path);
543 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
544 if (chars >= SVC_ACC_SIZE) {
545 error(0,0,"svc_acc too long in %s",__func__);
546 return THREAD_FAILED;
548 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
549 /* Blech. Can't conflict with producer, though. */
550 hstor_put(hstor,bucket,key,
551 junk_reader,item->size,fp,NULL);
552 hstor_free(hstor);
554 else {
555 const char *token_str = NULL;
556 if (!strcasecmp(s_type,"cf")) {
557 token_str = get_cloudfiles_token(server,s_host,s_port,
558 s_key, s_secret);
559 if (!token_str) {
560 DPRINTF("could not get CF token\n");
561 return THREAD_FAILED;
563 /* Re-fetch as this might have changed. */
564 s_host = json_string_value(json_object_get(server,
565 "host"));
566 chars = snprintf(addr,ADDR_SIZE,"%s/%s",
567 s_host,item->path);
568 if (chars >= ADDR_SIZE) {
569 error(0,0,"CF path too long in %s",__func__);
570 return THREAD_FAILED;
572 DPRINTF("replicating %zu to %s (CF)\n",item->size,
573 addr);
575 else {
576 chars = snprintf(addr,ADDR_SIZE,"http://%s:%u/%s",
577 s_host,s_port,item->path);
578 if (chars >= ADDR_SIZE) {
579 error(0,0,"HTTP path too long in %s",
580 __func__);
581 return THREAD_FAILED;
583 DPRINTF("replicating %zu to %s (repod)\n",item->size,
584 addr);
586 curl = curl_easy_init();
587 curl_easy_setopt(curl,CURLOPT_URL,addr);
588 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
589 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
590 (curl_off_t)item->size);
591 curl_easy_setopt(curl,CURLOPT_READFUNCTION,junk_reader);
592 if (!strcasecmp(s_type,"cf")) {
593 chars = snprintf(auth_hdr,HEADER_SIZE,
594 "X-Auth-Token: %s",token_str);
595 if (chars >= HEADER_SIZE) {
596 error(0,0,"auth_token too long in %s",
597 __func__);
598 return THREAD_FAILED;
600 slist = curl_slist_append(NULL,auth_hdr);
602 * Rackspace doesn't clearly document that you'll get
603 * 412 (Precondition Failed) if you omit this.
605 slist = curl_slist_append(slist,
606 "Content-Type: binary/octet-stream");
608 else {
609 slist = curl_slist_append(NULL,"X-redhat-role: master");
611 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
612 curl_easy_setopt(curl,CURLOPT_READDATA,fp);
613 DPRINTF("%s calling curl_easy_perform\n",__func__);
614 curl_easy_perform(curl);
615 curl_easy_cleanup(curl);
616 curl_slist_free_all(slist);
619 DPRINTF("%s returning\n",__func__);
620 fclose(fp);
621 meta_got_copy(bucket,key,s_name);
622 free(myurl);
623 return NULL;
626 void
627 repl_worker_del (const repl_item *item)
629 json_t *server;
630 const char *s_host;
631 unsigned int s_port;
632 const char *s_key;
633 const char *s_secret;
634 const char *s_type;
635 char svc_acc[SVC_ACC_SIZE];
636 struct hstor_client *hstor;
637 char addr[ADDR_SIZE];
638 CURL *curl;
639 char *bucket;
640 char *key;
641 char *stctx;
642 int chars;
644 server = json_array_get(config,item->server);
645 s_host = json_string_value(json_object_get(server,"host"));
646 s_port = json_integer_value(json_object_get(server,"port"));
647 s_key = json_string_value(json_object_get(server,"key"));
648 s_secret = json_string_value(json_object_get(server,"secret"));
649 s_type = json_string_value(json_object_get(server,"type"));
651 if (!strcasecmp(s_type,"s3")) {
652 DPRINTF("%s replicating delete of %s on %s:%u (S3)\n",__func__,
653 item->path, s_host, s_port);
654 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
655 if (chars >= SVC_ACC_SIZE) {
656 error(0,0,"svc_acc too long in %s",__func__);
657 return;
659 /* TBD: check return */
660 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
661 assert (item->path);
662 bucket = strtok_r(item->path,"/",&stctx);
663 key = strtok_r(NULL,"/",&stctx);
664 (void)hstor_del(hstor,bucket,key);
665 hstor_free(hstor);
667 else {
668 DPRINTF("%s replicating delete of %s on %s:%u (HTTP)\n",
669 __func__, item->path, s_host, s_port);
670 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d%s",
671 s_host,s_port,item->path);
672 if (chars >= ADDR_SIZE) {
673 error(0,0,"path too long in %s",__func__);
674 return;
676 curl = curl_easy_init();
677 curl_easy_setopt(curl,CURLOPT_URL,addr);
678 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
679 curl_easy_perform(curl);
680 curl_easy_cleanup(curl);
683 DPRINTF("%s returning\n",__func__);
686 void
687 repl_worker_bcreate (repl_item *item)
689 json_t *server;
690 const char *s_host;
691 unsigned int s_port;
692 const char *s_key;
693 const char *s_secret;
694 const char *s_type;
695 char svc_acc[SVC_ACC_SIZE];
696 struct hstor_client *hstor;
697 char addr[ADDR_SIZE];
698 CURL *curl;
699 int chars;
701 server = json_array_get(config,item->server);
702 s_host = json_string_value(json_object_get(server,"host"));
703 s_port = json_integer_value(json_object_get(server,"port"));
704 s_key = json_string_value(json_object_get(server,"key"));
705 s_secret = json_string_value(json_object_get(server,"secret"));
706 s_type = json_string_value(json_object_get(server,"type"));
708 if (!strcasecmp(s_type,"s3")) {
709 DPRINTF("%s replicating create of bucket %s on %s:%u (S3)\n",
710 __func__, item->path, s_host, s_port);
711 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
712 if (chars >= SVC_ACC_SIZE) {
713 error(0,0,"svc_acc too long in %s",__func__);
714 return;
716 /* TBD: check return */
717 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
718 assert (item->path);
719 if (!hstor_add_bucket(hstor,item->path)) {
720 error(0,0,"bucket create failed for %s",
721 item->path);
723 hstor_free(hstor);
725 else {
726 DPRINTF("%s replicating create of bucket %s on %s:%u (HTTP)\n",
727 __func__, item->path, s_host, s_port);
728 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d/%s",
729 s_host,s_port,item->path);
730 if (chars >= ADDR_SIZE) {
731 error(0,0,"path too long in %s",__func__);
732 return;
734 curl = curl_easy_init();
735 curl_easy_setopt(curl,CURLOPT_URL,addr);
736 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
737 curl_easy_perform(curl);
738 curl_easy_cleanup(curl);
741 DPRINTF("%s returning\n",__func__);
744 void *
745 repl_worker (void *notused ATTRIBUTE_UNUSED)
747 repl_item *item;
748 pthread_t cons;
749 pthread_t prod;
751 for (;;) {
752 sem_wait(&queue_sema);
753 pthread_mutex_lock(&queue_lock);
754 item = queue_head;
755 queue_head = item->next;
756 if (!queue_head) {
757 queue_tail = NULL;
759 pthread_mutex_unlock(&queue_lock);
761 switch (item->type) {
762 case REPL_PUT:
763 if (pipe(item->pipes) >= 0) {
764 if (proxy_host) {
765 pthread_create(&prod,NULL,
766 proxy_repl_prod,item);
768 else {
769 pthread_create(&prod,NULL,
770 proxy_repl_prod_fs,item);
772 pthread_create(&cons,NULL,proxy_repl_cons,item);
773 pthread_join(prod,NULL);
774 pthread_join(cons,NULL);
776 else {
777 error(0,errno,"pipe");
779 break;
780 case REPL_ODELETE:
781 repl_worker_del(item);
782 break;
783 case REPL_BCREATE:
784 repl_worker_bcreate(item);
785 break;
786 default:
787 error(0,0,"bad repl type %d (url=%s) skipped",
788 item->type, item->path);
790 free(item->path);
791 free(item);
795 void
796 repl_init (void)
798 pthread_t tid;
800 sem_init(&queue_sema,0,0);
801 pthread_mutex_init(&queue_lock,NULL);
802 pthread_create(&tid,NULL,repl_worker,NULL);
805 char *
806 repl_oget (void *ctx, const char *id)
808 query_ctx_t *qctx = ctx;
809 char *cur_value;
811 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
813 return cur_value;
816 char *
817 repl_sget (void *ctx, const char *id)
819 query_ctx_t *qctx = ctx;
820 json_t *elem = json_object_get(qctx->cur_server,id);
822 return elem ? (char *)json_string_value(elem) : NULL;
825 void
826 replicate (const char *url, size_t size, const char *policy)
828 unsigned int i;
829 repl_item *item;
830 value_t *expr;
831 int res;
832 char *url2;
833 char *stctx;
834 query_ctx_t qctx;
835 getter_t oget;
836 getter_t sget;
838 url2 = strdup(url);
839 if (!url2) {
840 error(0,0,"could not parse url %s",url);
841 return;
843 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
844 qctx.cur_key = strtok_r(NULL,"/",&stctx);
846 if (!size) {
847 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
848 DPRINTF("fetched size %zu for %s\n",size,url);
851 if (policy) {
852 DPRINTF("--- policy = %s\n",policy);
853 expr = parse(policy);
855 else {
856 expr = NULL;
859 oget.func = repl_oget;
860 oget.ctx = &qctx;
861 sget.func = repl_sget;
862 sget.ctx = &qctx;
864 for (i = 1; i < json_array_size(config); ++i) {
865 if (expr) {
866 qctx.cur_server = json_array_get(config,i);
867 res = eval(expr,&oget,&sget);
869 else {
870 res = 0;
872 if (res <= 0) {
873 DPRINTF("skipping %u for %s\n",i,url);
874 continue;
876 DPRINTF("REPLICATING %s to %u\n",url,i);
877 item = malloc(sizeof(*item));
878 if (!item) {
879 error(0,errno,"could not create repl_item for %s",
880 url);
881 break;
883 item->type = REPL_PUT;
884 item->path = strdup(url);
885 if (!item->path) {
886 error(0,errno,"could not create repl_item for %s",
887 url);
888 break;
890 item->server = i;
891 item->size = size;
892 pthread_mutex_lock(&queue_lock);
893 if (queue_tail) {
894 item->next = queue_tail->next;
895 queue_tail->next = item;
897 else {
898 item->next = NULL;
899 queue_head = item;
901 queue_tail = item;
902 pthread_mutex_unlock(&queue_lock);
903 sem_post(&queue_sema);
906 if (expr) {
907 free_value(expr);
909 free(url2);
912 void
913 replicate_namespace_action (const char *name, repl_t action)
915 unsigned int i;
916 repl_item *item;
918 for (i = 1; i < json_array_size(config); ++i) {
919 DPRINTF("replicating delete(%s) on %u\n",name,i);
920 item = malloc(sizeof(*item));
921 if (!item) {
922 error(0,errno,"could not create repl_item for %s",
923 name);
924 return;
926 item->type = action;
927 item->path = strdup(name);
928 if (!item->path) {
929 free(item);
930 return;
932 item->server = i;
933 pthread_mutex_lock(&queue_lock);
934 if (queue_tail) {
935 item->next = queue_tail->next;
936 queue_tail->next = item;
938 else {
939 item->next = NULL;
940 queue_head = item;
942 queue_tail = item;
943 pthread_mutex_unlock(&queue_lock);
944 sem_post(&queue_sema);
948 void
949 replicate_delete (const char *name)
951 replicate_namespace_action(name,REPL_ODELETE);
954 void
955 replicate_bcreate (const char *name)
957 replicate_namespace_action(name,REPL_BCREATE);
961 get_provider (int i, provider_t *out)
963 json_t *server;
965 server = json_array_get(config,i);
966 if (!server) {
967 DPRINTF("no such entry %d\n",i);
968 return 0;
971 out->index = i;
972 out->name = json_string_value(json_object_get(server,"name"));
973 out->type = json_string_value(json_object_get(server,"type"));
974 out->host = json_string_value(json_object_get(server,"host"));
975 out->port = json_integer_value(json_object_get(server,"port"));
976 /* TBD: change key/secret field names to username/password */
977 out->username = json_string_value(json_object_get(server,"key"));
978 out->password = json_string_value(json_object_get(server,"secret"));
979 out->path = json_string_value(json_object_get(server,"path"));
981 /* Use empty strings instead of NULL. */
982 if (!out->username) out->username = "";
983 if (!out->password) out->password = "";
985 /* TBD: do this a cleaner way. */
986 if (!strcasecmp(out->type,"s3")) {
987 out->func_tbl = &s3_func_tbl;
989 else if (!strcasecmp(out->type,"http")) {
990 out->func_tbl = &curl_func_tbl;
992 else if (!strcasecmp(out->type,"fs")) {
993 out->func_tbl = &fs_func_tbl;
995 else {
996 out->func_tbl = &bad_func_tbl;
999 return 1;
1002 char *
1003 get_provider_value (int i, const char *fname)
1005 json_t *server;
1006 json_t *field;
1008 server = json_array_get(config,i);
1009 if (!server) {
1010 DPRINTF("no such entry %d\n",i);
1011 return NULL;
1014 field = json_object_get(server,fname);
1015 if (!field) {
1016 DPRINTF("no such field %s on server %d\n",fname,i);
1017 return NULL;
1020 return (char *)json_string_value(field);
1023 void
1024 update_provider (const char *provider, const char *username, const char *password)
1026 int i;
1027 json_t *server;
1028 const char *s_name;
1030 DPRINTF("updating %s username=%s password=%s\n",
1031 provider, username, password);
1033 for (i = 0; ; ++i) {
1034 server = json_array_get(config,i);
1035 if (!server) {
1036 DPRINTF(" could not find provider %s\n",provider);
1037 break;
1039 s_name = json_string_value(json_object_get(server,"name"));
1040 DPRINTF(" checking %s\n",s_name);
1041 if (s_name && !strcmp(s_name,provider)) {
1042 json_object_set_new(server,"key",json_string(username));
1043 json_object_set_new(server,"secret",
1044 json_string(password));
1045 break;
1050 /* Part of our API to the query module. */
1051 char *
1052 follow_link (char *object, char *key)
1054 (void)object;
1055 (void)key;
1057 return "no_such_object";