don't ignore failed thread creation
[iwhd.git] / proxy.c
blobfa25e5404466091c4816604e7040bb15900a4298
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <getopt.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <semaphore.h>
25 #include <stdint.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <unistd.h>
31 #include <assert.h>
33 #include <microhttpd.h>
34 #include <curl/curl.h>
35 #include <hstor.h>
36 #include <jansson.h>
38 #include "iwh.h"
39 #include "proxy.h"
40 #include "query.h"
41 #include "meta.h"
44 * A config consists of a JSON array of objects, where each object includes:
46 * name string
47 * type string "s3" or "cf" or "http" (case insensitive)
48 * host string
49 * port integer
50 * key string S3 key or (optional TBD) HTTP user
51 * secret string S3 secret or (optional TBD) HTTP password
53 * The above fields are all used to implement replication once we've decided to
54 * do it. There may be other fields as well, to help us make that decision.
55 * For example, there might be one or more fields to describe geographic
56 * location, an array of supported image-format names, etc. This information
57 * is deliberately left "schema-free" so that users may add whatever fields
58 * they like to both the config and to replication attributes on objects.
61 /* Sizes for internal string buffers. */
62 #define ADDR_SIZE 1024
63 #define SVC_ACC_SIZE 128
64 #define HEADER_SIZE 64
66 /* Bitfield for things to check in validate_server */
67 #define NEED_NONE 0
68 #define NEED_SERVER 0x00000001
69 #define NEED_CREDS 0x00000002
70 #define NEED_PATH 0x00000004
71 #define NEED_ALL ~0
73 extern backend_func_tbl bad_func_tbl;
74 extern backend_func_tbl s3_func_tbl;
75 extern backend_func_tbl curl_func_tbl;
76 extern backend_func_tbl fs_func_tbl;
78 typedef enum {
79 REPL_PUT, /* store an object */
80 REPL_ODELETE, /* delete an object */
81 REPL_BCREATE, /* create a bucket */
82 /* TBD: bucket deletion, others? */
83 } repl_t;
85 typedef struct _repl_item {
86 struct _repl_item *next;
87 repl_t type;
88 char *path;
89 unsigned int server;
90 size_t size;
91 int pipes[2];
92 } repl_item;
94 typedef struct {
95 char *cur_bucket;
96 char *cur_key;
97 json_t *cur_server;
98 } query_ctx_t;
100 static repl_item *queue_head = NULL;
101 static repl_item *queue_tail = NULL;
102 static pthread_mutex_t queue_lock;
103 static sem_t queue_sema;
104 static json_t *config = NULL;
106 static int
107 validate_server (unsigned int i)
109 json_t *server;
110 json_t *elem;
111 const char *name;
112 const char *type;
113 unsigned int needs = NEED_ALL;
115 server = json_array_get(config,i);
116 if (!json_is_object(server)) {
117 error(0,0,"config elem %u: missing object",i);
118 return 0;
121 elem = json_object_get(server,"name");
122 if (!json_is_string(elem)) {
123 error(0,0,"config elem %u: missing name",i);
124 return 0;
126 name = json_string_value(elem);
128 elem = json_object_get(server,"type");
129 if (!json_is_string(elem)) {
130 error(0,0,"config elem %u (%s): missing type",i,name);
131 return 0;
133 type = json_string_value(elem);
135 if (!strcasecmp(type,"s3") || !strcasecmp(type,"cf")) {
136 needs = NEED_SERVER | NEED_CREDS;
138 else if (!strcasecmp(type,"http")) {
139 needs = NEED_SERVER;
141 else if (!strcasecmp(type,"fs")) {
142 needs = NEED_PATH;
144 else {
145 error(0,0,"config elem %u (%s): bad type",i,name);
146 return 0;
149 if (needs & NEED_SERVER) {
150 elem = json_object_get(server,"host");
151 if (!json_is_string(elem)) {
152 error(0,0,"config elem %u (%s): missing host",
153 i,name);
154 return 0;
156 elem = json_object_get(server,"port");
157 if (!json_is_integer(elem)) {
158 error(0,0,"config elem %u (%s): missing port",
159 i,name);
160 return 0;
164 if (needs & NEED_CREDS) {
165 elem = json_object_get(server,"key");
166 if (!json_is_string(elem)) {
167 error(0,0,"config elem %u (%s): missing key",
168 i, name);
169 return 0;
171 elem = json_object_get(server,"secret");
172 if (!json_is_string(elem)) {
173 error(0,0, "config elem %u (%s): missing secret",
174 i, name);
175 return 0;
179 if (needs & NEED_PATH) {
180 elem = json_object_get(server,"path");
181 if (!json_is_string(elem)) {
182 error(0,0,"config elem %u (%s): missing path",
183 i, name);
184 return 0;
188 return 1;
191 /* We've already validated, so minimal checking here. */
192 char *
193 set_config (void)
195 json_t *server;
196 const char *type;
198 server = json_array_get(config,0);
199 type = json_string_value(json_object_get(server,"type"));
200 if (strcasecmp(type,"fs")) {
201 proxy_host = json_string_value(
202 json_object_get(server,"host"));
203 proxy_port = json_integer_value(
204 json_object_get(server,"port"));
205 if (!strcasecmp(type,"s3")) {
206 s3mode = 1;
207 proxy_key = json_string_value(
208 json_object_get(server,"key"));
209 proxy_secret = json_string_value(
210 json_object_get(server,"secret"));
212 else {
213 s3mode = 0;
216 else {
217 local_path = json_string_value(
218 json_object_get(server,"path"));
221 return (char *)json_string_value(json_object_get(server,"name"));
224 char *
225 parse_config (void)
227 json_error_t err;
228 unsigned int nservers;
229 unsigned int i;
231 if (access(cfg_file,R_OK) < 0) {
232 error(0,errno,"failed to open %s for reading", cfg_file);
233 return NULL;
236 config = json_load_file(cfg_file,&err);
237 if (!config) {
238 error(0,0,"JSON error on line %d: %s",err.line,err.text);
239 return NULL;
242 if (json_typeof(config) != JSON_ARRAY) {
243 error(0,0,"config should be a JSON array");
244 goto err;
247 nservers = json_array_size(config);
248 if (!nservers) {
249 goto err;
252 for (i = 0; i < nservers; ++i) {
253 if (!validate_server(i)) {
254 goto err;
258 /* Everything looks OK. */
259 printf("%u replication servers defined\n",nservers-1);
260 return set_config();
262 err:
263 json_decref(config);
264 config = NULL;
265 return 0;
268 static size_t
269 junk_writer (/* const */ void *ptr, size_t size, size_t nmemb, void *stream)
271 size_t n;
273 n = fwrite(ptr,size,nmemb,stream);
274 if (n != nmemb)
275 error(0, 0, "warning: write failed");
276 if (fflush(stream))
277 error(0, 0, "warning: write failed");
278 DPRINTF("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
280 return n;
283 static void *
284 proxy_repl_prod_fs (void *ctx)
286 repl_item *item = ctx;
287 int ifd;
288 int ofd;
289 char buf[1<<16];
290 ssize_t ibytes;
291 ssize_t obytes;
292 ssize_t offset;
294 DPRINTF("replicating from %s (FS)\n",item->path);
296 ifd = open(item->path,O_RDONLY);
297 if (ifd < 0) {
298 error(0,errno,"ifd open");
299 return THREAD_FAILED;
301 ofd = item->pipes[1];
303 for (;;) {
304 ibytes = read(ifd,buf,sizeof(buf));
305 if (ibytes <= 0) {
306 if (ibytes < 0) {
307 error(0,errno,"%s: read failed", item->path);
309 else {
310 DPRINTF("EOF on ifd\n");
312 break;
314 offset = 0;
315 do {
316 obytes = write(ofd,buf+offset,ibytes);
317 if (obytes <= 0) {
318 if (obytes < 0) {
319 error(0,errno,"ofd write");
321 else {
322 DPRINTF("zero-length write on ofd\n");
324 break;
326 ibytes -= obytes;
327 offset += obytes;
328 } while (ibytes > 0);
331 close(ifd);
332 close(ofd);
334 DPRINTF("%s returning\n",__func__);
335 close(item->pipes[1]);
336 return NULL;
339 static void *
340 proxy_repl_prod (void *ctx)
342 repl_item *item = ctx;
343 FILE *fp = fdopen(item->pipes[1],"w");
344 char addr[ADDR_SIZE];
345 CURL *curl;
346 char svc_acc[SVC_ACC_SIZE];
347 struct hstor_client *hstor;
348 char *bucket;
349 char *key;
350 char *stctx;
351 char *myurl;
352 int chars;
354 if (fp == NULL) {
355 error(0, errno, "%s: fdopen failed", __func__);
356 return NULL;
359 chars = snprintf(addr,ADDR_SIZE,
360 "http://%s:%u/%s",proxy_host,proxy_port,item->path);
361 if (chars >= ADDR_SIZE) {
362 error(0,0,"path too long in %s",__func__);
363 goto done;
365 DPRINTF("replicating from %s\n",addr);
367 if (s3mode) {
368 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",
369 proxy_host,proxy_port);
370 if (chars >= SVC_ACC_SIZE) {
371 error(0,0,"svc_acc too long in %s",__func__);
372 goto done;
374 hstor = hstor_new(svc_acc,proxy_host,
375 proxy_key,proxy_secret);
376 /* Blech. Can't conflict with consumer, though. */
377 myurl = strdup(item->path);
378 assert (myurl);
379 bucket = strtok_r(myurl,"/",&stctx);
380 key = strtok_r(NULL,"/",&stctx);
381 hstor_get(hstor,bucket,key,
382 junk_writer,fp,0);
383 hstor_free(hstor);
384 free(myurl);
386 else {
387 curl = curl_easy_init();
388 curl_easy_setopt(curl,CURLOPT_URL,addr);
389 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,junk_writer);
390 curl_easy_setopt(curl,CURLOPT_WRITEDATA,fp);
391 DPRINTF("%s calling curl_easy_perform\n",__func__);
392 curl_easy_perform(curl);
393 curl_easy_cleanup(curl);
396 done:
397 DPRINTF("%s returning\n",__func__);
398 /* Closing should signal to the consumer that we're finished. */
399 fclose(fp);
400 return NULL;
403 static size_t
404 junk_reader (void *ptr, size_t size, size_t nmemb, void *stream)
406 size_t n;
408 n = fread(ptr,size,nmemb,stream);
409 printf("in %s(%zu,%zu) => %zu\n",__func__,size,nmemb,n);
410 return n;
413 static size_t
414 cf_writer (void *ptr ATTRIBUTE_UNUSED, size_t size, size_t nmemb,
415 void *stream ATTRIBUTE_UNUSED)
417 return size * nmemb;
420 static size_t
421 cf_header (void *ptr, size_t size, size_t nmemb, void *stream)
423 char *next;
424 char *sctx;
425 json_t *server = (json_t *)stream;
427 next = strtok_r(ptr,":",&sctx);
428 if (next) {
429 if (!strcasecmp(next,"X-Storage-Url")) {
430 next = strtok_r(NULL," \n\r",&sctx);
431 if (next) {
432 DPRINTF("got CF URL %s\n",next);
433 /* NB: after this, original "host" is gone. */
434 json_object_set_new(server,"host",
435 json_string(next));
438 else if (!strcasecmp(next,"X-Storage-Token")) {
439 next = strtok_r(NULL," \n\r",&sctx);
440 if (next) {
441 DPRINTF("got CF token %s\n",next);
442 json_object_set_new(server,"token",
443 json_string(next));
447 return size * nmemb;
450 static const char *
451 get_cloudfiles_token (json_t *server, const char *host, unsigned int port,
452 const char * user, const char * key)
454 CURL *curl;
455 char addr[ADDR_SIZE];
456 char auth_user[HEADER_SIZE];
457 char auth_key[HEADER_SIZE];
458 json_t *token_obj;
459 struct curl_slist *slist;
460 int chars;
462 token_obj = json_object_get(server,"token");
463 if (token_obj) {
464 return json_string_value(token_obj);
467 chars = snprintf(addr,ADDR_SIZE,"https://%s:%u/v1.0",host,port);
468 if (chars >= ADDR_SIZE) {
469 error(0,0,"API URL too long in %s",__func__);
470 return NULL;
473 chars = snprintf(auth_user,HEADER_SIZE,"X-Auth-User: %s",user);
474 if (chars >= HEADER_SIZE) {
475 error(0,0,"auth_user too long in %s",__func__);
476 return NULL;
479 chars = snprintf(auth_key,HEADER_SIZE,"X-Auth-Key: %s",key);
480 if (chars >= HEADER_SIZE) {
481 error(0,0,"auth_key too long in %s",__func__);
482 return NULL;
485 curl = curl_easy_init();
486 curl_easy_setopt(curl,CURLOPT_URL,addr);
487 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,cf_writer);
488 curl_easy_setopt(curl,CURLOPT_HEADERFUNCTION,cf_header);
489 curl_easy_setopt(curl,CURLOPT_WRITEHEADER,server);
490 slist = curl_slist_append(NULL,auth_user);
491 slist = curl_slist_append(slist,auth_key);
492 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
493 curl_easy_perform(curl);
494 curl_easy_cleanup(curl);
495 curl_slist_free_all(slist);
497 token_obj = json_object_get(server,"token");
498 return token_obj ? json_string_value(token_obj) : NULL;
501 static void *
502 proxy_repl_cons (void *ctx)
504 repl_item *item = ctx;
505 FILE *fp = fdopen(item->pipes[0],"r");
506 char addr[ADDR_SIZE];
507 CURL *curl;
508 json_t *server;
509 char svc_acc[SVC_ACC_SIZE];
510 char auth_hdr[HEADER_SIZE];
511 struct hstor_client *hstor;
512 char *bucket;
513 char *key;
514 char *stctx;
515 const char *s_host;
516 unsigned int s_port;
517 const char *s_key;
518 const char *s_secret;
519 const char *s_type;
520 const char *s_name;
521 struct curl_slist *slist;
522 char *myurl;
523 int chars;
525 if (fp == NULL) {
526 error(0, errno, "%s: fdopen failed", __func__);
527 return THREAD_FAILED;
530 server = json_array_get(config,item->server);
531 s_host = json_string_value(json_object_get(server,"host"));
532 s_port = json_integer_value(json_object_get(server,"port"));
533 s_key = json_string_value(json_object_get(server,"key"));
534 s_secret = json_string_value(json_object_get(server,"secret"));
535 s_type = json_string_value(json_object_get(server,"type"));
536 s_name = json_string_value(json_object_get(server,"name"));
538 myurl = strdup(item->path);
539 assert (myurl);
540 bucket = strtok_r(myurl,"/",&stctx);
541 key = strtok_r(NULL,"/",&stctx);
543 if (!strcasecmp(s_type,"s3")) {
544 DPRINTF("replicating %zu to %s/%s (S3)\n",item->size,s_host,
545 item->path);
546 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
547 if (chars >= SVC_ACC_SIZE) {
548 error(0,0,"svc_acc too long in %s",__func__);
549 return THREAD_FAILED;
551 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
552 /* Blech. Can't conflict with producer, though. */
553 hstor_put(hstor,bucket,key,
554 junk_reader,item->size,fp,NULL);
555 hstor_free(hstor);
557 else {
558 const char *token_str = NULL;
559 if (!strcasecmp(s_type,"cf")) {
560 token_str = get_cloudfiles_token(server,s_host,s_port,
561 s_key, s_secret);
562 if (!token_str) {
563 DPRINTF("could not get CF token\n");
564 return THREAD_FAILED;
566 /* Re-fetch as this might have changed. */
567 s_host = json_string_value(json_object_get(server,
568 "host"));
569 chars = snprintf(addr,ADDR_SIZE,"%s/%s",
570 s_host,item->path);
571 if (chars >= ADDR_SIZE) {
572 error(0,0,"CF path too long in %s",__func__);
573 return THREAD_FAILED;
575 DPRINTF("replicating %zu to %s (CF)\n",item->size,
576 addr);
578 else {
579 chars = snprintf(addr,ADDR_SIZE,"http://%s:%u/%s",
580 s_host,s_port,item->path);
581 if (chars >= ADDR_SIZE) {
582 error(0,0,"HTTP path too long in %s",
583 __func__);
584 return THREAD_FAILED;
586 DPRINTF("replicating %zu to %s (repod)\n",item->size,
587 addr);
589 curl = curl_easy_init();
590 curl_easy_setopt(curl,CURLOPT_URL,addr);
591 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
592 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
593 (curl_off_t)item->size);
594 curl_easy_setopt(curl,CURLOPT_READFUNCTION,junk_reader);
595 if (!strcasecmp(s_type,"cf")) {
596 chars = snprintf(auth_hdr,HEADER_SIZE,
597 "X-Auth-Token: %s",token_str);
598 if (chars >= HEADER_SIZE) {
599 error(0,0,"auth_token too long in %s",
600 __func__);
601 return THREAD_FAILED;
603 slist = curl_slist_append(NULL,auth_hdr);
605 * Rackspace doesn't clearly document that you'll get
606 * 412 (Precondition Failed) if you omit this.
608 slist = curl_slist_append(slist,
609 "Content-Type: binary/octet-stream");
611 else {
612 slist = curl_slist_append(NULL,"X-redhat-role: master");
614 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
615 curl_easy_setopt(curl,CURLOPT_READDATA,fp);
616 DPRINTF("%s calling curl_easy_perform\n",__func__);
617 curl_easy_perform(curl);
618 curl_easy_cleanup(curl);
619 curl_slist_free_all(slist);
622 DPRINTF("%s returning\n",__func__);
623 fclose(fp);
624 meta_got_copy(bucket,key,s_name);
625 free(myurl);
626 return NULL;
629 static void
630 repl_worker_del (const repl_item *item)
632 json_t *server;
633 const char *s_host;
634 unsigned int s_port;
635 const char *s_key;
636 const char *s_secret;
637 const char *s_type;
638 char svc_acc[SVC_ACC_SIZE];
639 struct hstor_client *hstor;
640 char addr[ADDR_SIZE];
641 CURL *curl;
642 char *bucket;
643 char *key;
644 char *stctx;
645 int chars;
647 server = json_array_get(config,item->server);
648 s_host = json_string_value(json_object_get(server,"host"));
649 s_port = json_integer_value(json_object_get(server,"port"));
650 s_key = json_string_value(json_object_get(server,"key"));
651 s_secret = json_string_value(json_object_get(server,"secret"));
652 s_type = json_string_value(json_object_get(server,"type"));
654 if (!strcasecmp(s_type,"s3")) {
655 DPRINTF("%s replicating delete of %s on %s:%u (S3)\n",__func__,
656 item->path, s_host, s_port);
657 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
658 if (chars >= SVC_ACC_SIZE) {
659 error(0,0,"svc_acc too long in %s",__func__);
660 return;
662 /* TBD: check return */
663 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
664 assert (item->path);
665 bucket = strtok_r(item->path,"/",&stctx);
666 key = strtok_r(NULL,"/",&stctx);
667 (void)hstor_del(hstor,bucket,key);
668 hstor_free(hstor);
670 else {
671 DPRINTF("%s replicating delete of %s on %s:%u (HTTP)\n",
672 __func__, item->path, s_host, s_port);
673 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d%s",
674 s_host,s_port,item->path);
675 if (chars >= ADDR_SIZE) {
676 error(0,0,"path too long in %s",__func__);
677 return;
679 curl = curl_easy_init();
680 curl_easy_setopt(curl,CURLOPT_URL,addr);
681 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
682 curl_easy_perform(curl);
683 curl_easy_cleanup(curl);
686 DPRINTF("%s returning\n",__func__);
689 static void
690 repl_worker_bcreate (repl_item *item)
692 json_t *server;
693 const char *s_host;
694 unsigned int s_port;
695 const char *s_key;
696 const char *s_secret;
697 const char *s_type;
698 char svc_acc[SVC_ACC_SIZE];
699 struct hstor_client *hstor;
700 char addr[ADDR_SIZE];
701 CURL *curl;
702 int chars;
704 server = json_array_get(config,item->server);
705 s_host = json_string_value(json_object_get(server,"host"));
706 s_port = json_integer_value(json_object_get(server,"port"));
707 s_key = json_string_value(json_object_get(server,"key"));
708 s_secret = json_string_value(json_object_get(server,"secret"));
709 s_type = json_string_value(json_object_get(server,"type"));
711 if (!strcasecmp(s_type,"s3")) {
712 DPRINTF("%s replicating create of bucket %s on %s:%u (S3)\n",
713 __func__, item->path, s_host, s_port);
714 chars = snprintf(svc_acc,SVC_ACC_SIZE,"%s:%u",s_host,s_port);
715 if (chars >= SVC_ACC_SIZE) {
716 error(0,0,"svc_acc too long in %s",__func__);
717 return;
719 /* TBD: check return */
720 hstor = hstor_new(svc_acc,s_host,s_key,s_secret);
721 assert (item->path);
722 if (!hstor_add_bucket(hstor,item->path)) {
723 error(0,0,"bucket create failed for %s",
724 item->path);
726 hstor_free(hstor);
728 else {
729 DPRINTF("%s replicating create of bucket %s on %s:%u (HTTP)\n",
730 __func__, item->path, s_host, s_port);
731 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d/%s",
732 s_host,s_port,item->path);
733 if (chars >= ADDR_SIZE) {
734 error(0,0,"path too long in %s",__func__);
735 return;
737 curl = curl_easy_init();
738 curl_easy_setopt(curl,CURLOPT_URL,addr);
739 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
740 curl_easy_perform(curl);
741 curl_easy_cleanup(curl);
744 DPRINTF("%s returning\n",__func__);
747 /* Use this to diagnose failed thread creation. */
748 #define xpthread_create(thread, start_routine, msg) \
749 do { \
750 int err = pthread_create (thread, NULL, start_routine, NULL); \
751 if (err) { \
752 error (0, err, msg); \
753 return NULL; \
755 } while (0)
757 static void *
758 repl_worker (void *notused ATTRIBUTE_UNUSED)
760 repl_item *item;
761 pthread_t cons;
762 pthread_t prod;
764 for (;;) {
765 sem_wait(&queue_sema);
766 pthread_mutex_lock(&queue_lock);
767 item = queue_head;
768 queue_head = item->next;
769 if (!queue_head) {
770 queue_tail = NULL;
772 pthread_mutex_unlock(&queue_lock);
774 switch (item->type) {
775 case REPL_PUT:
776 if (pipe(item->pipes) >= 0) {
777 xpthread_create(&prod, (proxy_host
778 ? proxy_repl_prod
779 : proxy_repl_prod_fs),
780 "failed to start producer thread");
781 xpthread_create(&cons,proxy_repl_cons,
782 "failed to start consumer thread");
783 pthread_join(prod,NULL);
784 pthread_join(cons,NULL);
786 else {
787 error(0,errno,"pipe");
789 break;
790 case REPL_ODELETE:
791 repl_worker_del(item);
792 break;
793 case REPL_BCREATE:
794 repl_worker_bcreate(item);
795 break;
796 default:
797 error(0,0,"bad repl type %d (url=%s) skipped",
798 item->type, item->path);
800 free(item->path);
801 free(item);
805 void
806 repl_init (void)
808 pthread_t tid;
810 sem_init(&queue_sema,0,0);
811 pthread_mutex_init(&queue_lock,NULL);
812 pthread_create(&tid,NULL,repl_worker,NULL);
815 static char *
816 repl_oget (void *ctx, const char *id)
818 query_ctx_t *qctx = ctx;
819 char *cur_value = NULL;
821 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
823 return cur_value;
826 static char *
827 repl_sget (void *ctx, const char *id)
829 query_ctx_t *qctx = ctx;
830 json_t *elem = json_object_get(qctx->cur_server,id);
832 return elem ? (char *)json_string_value(elem) : NULL;
835 void
836 replicate (const char *url, size_t size, const char *policy)
838 unsigned int i;
839 repl_item *item;
840 value_t *expr;
841 int res;
842 char *url2;
843 char *stctx;
844 query_ctx_t qctx;
845 getter_t oget;
846 getter_t sget;
848 url2 = strdup(url);
849 if (!url2) {
850 error(0,0,"could not parse url %s",url);
851 return;
853 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
854 qctx.cur_key = strtok_r(NULL,"/",&stctx);
856 if (!size) {
857 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
858 DPRINTF("fetched size %zu for %s\n",size,url);
861 if (policy) {
862 DPRINTF("--- policy = %s\n",policy);
863 expr = parse(policy);
865 else {
866 expr = NULL;
869 oget.func = repl_oget;
870 oget.ctx = &qctx;
871 sget.func = repl_sget;
872 sget.ctx = &qctx;
874 for (i = 1; i < json_array_size(config); ++i) {
875 if (expr) {
876 qctx.cur_server = json_array_get(config,i);
877 res = eval(expr,&oget,&sget);
879 else {
880 res = 0;
882 if (res <= 0) {
883 DPRINTF("skipping %u for %s\n",i,url);
884 continue;
886 DPRINTF("REPLICATING %s to %u\n",url,i);
887 item = malloc(sizeof(*item));
888 if (!item) {
889 error(0,errno,"could not create repl_item for %s",
890 url);
891 break;
893 item->type = REPL_PUT;
894 item->path = strdup(url);
895 if (!item->path) {
896 error(0,errno,"could not create repl_item for %s",
897 url);
898 break;
900 item->server = i;
901 item->size = size;
902 pthread_mutex_lock(&queue_lock);
903 if (queue_tail) {
904 item->next = queue_tail->next;
905 queue_tail->next = item;
907 else {
908 item->next = NULL;
909 queue_head = item;
911 queue_tail = item;
912 pthread_mutex_unlock(&queue_lock);
913 sem_post(&queue_sema);
916 if (expr) {
917 free_value(expr);
919 free(url2);
922 static void
923 replicate_namespace_action (const char *name, repl_t action)
925 unsigned int i;
926 repl_item *item;
928 for (i = 1; i < json_array_size(config); ++i) {
929 DPRINTF("replicating delete(%s) on %u\n",name,i);
930 item = malloc(sizeof(*item));
931 if (!item) {
932 error(0,errno,"could not create repl_item for %s",
933 name);
934 return;
936 item->type = action;
937 item->path = strdup(name);
938 if (!item->path) {
939 free(item);
940 return;
942 item->server = i;
943 pthread_mutex_lock(&queue_lock);
944 if (queue_tail) {
945 item->next = queue_tail->next;
946 queue_tail->next = item;
948 else {
949 item->next = NULL;
950 queue_head = item;
952 queue_tail = item;
953 pthread_mutex_unlock(&queue_lock);
954 sem_post(&queue_sema);
958 void
959 replicate_delete (const char *name)
961 replicate_namespace_action(name,REPL_ODELETE);
964 void
965 replicate_bcreate (const char *name)
967 replicate_namespace_action(name,REPL_BCREATE);
971 get_provider (int i, provider_t *out)
973 json_t *server;
975 server = json_array_get(config,i);
976 if (!server) {
977 DPRINTF("no such entry %d\n",i);
978 return 0;
981 out->index = i;
982 out->name = json_string_value(json_object_get(server,"name"));
983 out->type = json_string_value(json_object_get(server,"type"));
984 out->host = json_string_value(json_object_get(server,"host"));
985 out->port = json_integer_value(json_object_get(server,"port"));
986 /* TBD: change key/secret field names to username/password */
987 out->username = json_string_value(json_object_get(server,"key"));
988 out->password = json_string_value(json_object_get(server,"secret"));
989 out->path = json_string_value(json_object_get(server,"path"));
991 /* Use empty strings instead of NULL. */
992 if (!out->username) out->username = "";
993 if (!out->password) out->password = "";
995 /* TBD: do this a cleaner way. */
996 if (!strcasecmp(out->type,"s3")) {
997 out->func_tbl = &s3_func_tbl;
999 else if (!strcasecmp(out->type,"http")) {
1000 out->func_tbl = &curl_func_tbl;
1002 else if (!strcasecmp(out->type,"fs")) {
1003 out->func_tbl = &fs_func_tbl;
1005 else {
1006 out->func_tbl = &bad_func_tbl;
1009 return 1;
1012 char *
1013 get_provider_value (int i, const char *fname)
1015 json_t *server;
1016 json_t *field;
1018 server = json_array_get(config,i);
1019 if (!server) {
1020 DPRINTF("no such entry %d\n",i);
1021 return NULL;
1024 field = json_object_get(server,fname);
1025 if (!field) {
1026 DPRINTF("no such field %s on server %d\n",fname,i);
1027 return NULL;
1030 return (char *)json_string_value(field);
1033 void
1034 update_provider (const char *provider, const char *username, const char *password)
1036 int i;
1037 json_t *server;
1038 const char *s_name;
1040 DPRINTF("updating %s username=%s password=%s\n",
1041 provider, username, password);
1043 for (i = 0; ; ++i) {
1044 server = json_array_get(config,i);
1045 if (!server) {
1046 DPRINTF(" could not find provider %s\n",provider);
1047 break;
1049 s_name = json_string_value(json_object_get(server,"name"));
1050 DPRINTF(" checking %s\n",s_name);
1051 if (s_name && !strcmp(s_name,provider)) {
1052 json_object_set_new(server,"key",json_string(username));
1053 json_object_set_new(server,"secret",
1054 json_string(password));
1055 break;
1060 /* Part of our API to the query module. */
1061 char *
1062 follow_link (char *object, char *key)
1064 (void)object;
1065 (void)key;
1067 return "no_such_object";