qlexer.l: rewrite not to need static var, at_eof.
[iwhd.git] / backend.c
bloba2972d1898949b8f638f571ebfe506e82d3bb99e
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <fcntl.h>
19 #include <getopt.h>
20 #include <poll.h>
21 #include <pthread.h>
22 #include <regex.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <strings.h>
29 #include <unistd.h>
30 #include <sys/stat.h>
31 #include <sys/wait.h>
32 #include <assert.h>
33 #include <errno.h>
34 #include <error.h>
36 #include <microhttpd.h>
37 #include <curl/curl.h>
38 #include <hstor.h>
39 #include <glib.h>
41 #define GLOBALS_IMPL
42 #include "iwh.h"
43 #include "meta.h"
44 #include "setup.h"
45 #include "template.h"
46 #include "mpipe.h"
47 #include "backend.h"
48 #include "state_defs.h"
50 struct hstor_client *hstor;
52 /***** Generic module stuff, not specific to one back end *****/
54 /* Sizes for internal string buffers for CF code. */
55 #define ADDR_SIZE 1024
56 #define SVC_ACC_SIZE 128
57 #define HEADER_SIZE 64
59 #define S3_IMAGE_PATTERN "^IMAGE[[:blank:]]+([^[:space:]]+)"
60 #define S3_ERROR_PATTERN "^ERROR[[:blank:]]+([^[:space:]]+)"
62 regex_t s3_success_pat;
63 regex_t s3_failure_pat;
64 int regex_ok = FALSE;
66 void
67 backend_init (void)
69 regex_ok = TRUE;
71 if (regcomp(&s3_success_pat,S3_IMAGE_PATTERN,REG_EXTENDED) != 0){
72 DPRINTF("could not compile S3 success pattern\n");
73 regex_ok = FALSE;
76 if (regcomp(&s3_failure_pat,S3_ERROR_PATTERN,REG_EXTENDED) != 0){
77 DPRINTF("could not compile S3 failure pattern\n");
78 regex_ok = FALSE;
83 /***** Stub functions for unimplemented stuff. *****/
85 static void
86 bad_init (provider_t *prov)
88 (void)prov;
90 DPRINTF("*** bad call to %s\n",__func__);
93 static void *
94 bad_get_child (void * ctx)
96 backend_thunk_t *tp = (backend_thunk_t *)ctx;
97 my_state *ms = tp->parent;
99 DPRINTF("*** bad call to %s\n",__func__);
100 pipe_prod_siginit(&ms->pipe,-1);
101 return NULL;
104 static void *
105 bad_put_child (void * ctx)
107 pipe_private *pp = ctx;
108 pipe_shared *ps = pp->shared;
110 DPRINTF("*** bad call to %s\n",__func__);
111 pipe_cons_siginit(ps, -1);
112 free(pp);
113 return THREAD_FAILED;
116 static void *
117 bad_cache_child (void * ctx)
119 (void)ctx;
121 DPRINTF("*** bad call to %s\n",__func__);
122 return NULL;
125 static int
126 bad_delete (const provider_t *prov, const char *bucket, const char *key,
127 const char *url)
129 (void)prov;
130 (void)bucket;
131 (void)key;
132 (void)url;
134 DPRINTF("*** bad call to %s\n",__func__);
135 return MHD_HTTP_BAD_REQUEST;
138 static int
139 bad_bcreate (const provider_t *prov, const char *bucket)
141 (void)prov;
142 (void)bucket;
144 DPRINTF("*** bad call to %s\n",__func__);
145 return MHD_HTTP_NOT_IMPLEMENTED;
148 static int
149 bad_register (my_state *ms, const provider_t *prov, const char *next,
150 GHashTable *args)
152 (void)ms;
153 (void)prov;
154 (void)next;
155 (void)args;
157 DPRINTF("*** bad call to %s\n",__func__);
158 return MHD_HTTP_NOT_IMPLEMENTED;
161 /***** Generic functions shared by the HTTP back ends. */
163 /* Invoked from S3/CURL/CF. */
164 static size_t
165 http_get_prod (void *ptr, size_t size, size_t nmemb, void *stream)
167 size_t total = size * nmemb;
168 pipe_shared *ps = stream;
170 DPRINTF("producer posting %zu bytes as %ld\n",total,ps->sequence+1);
171 pipe_prod_signal(ps,ptr,total);
173 DPRINTF("producer finished chunk\n");
174 return total;
177 /* Invoked from S3/CURL/CF. */
178 static size_t
179 http_put_cons (void *ptr, size_t size, size_t nmemb, void *stream)
181 size_t total = size * nmemb;
182 pipe_private *pp = stream;
183 pipe_shared *ps = pp->shared;
184 size_t done;
186 DPRINTF("consumer asked to read %zu\n",total);
188 if (!pipe_cons_wait(pp)) {
189 return 0;
192 DPRINTF("consumer offset %zu into %zu\n",
193 pp->offset, ps->data_len);
194 done = ps->data_len - pp->offset;
195 if (done > total) {
196 done = total;
198 memcpy(ptr,ps->data_ptr+pp->offset,done);
199 pp->offset += done;
200 DPRINTF("consumer copied %zu, new offset %zu\n",
201 done, pp->offset);
202 if (pp->offset == ps->data_len) {
203 DPRINTF("consumer finished chunk\n");
204 pipe_cons_signal(pp, 0);
207 return done;
210 /***** S3-specific functions *****/
212 static void
213 s3_init (provider_t *prov)
215 char svc_acc[128];
217 snprintf(svc_acc,sizeof(svc_acc),"%s:%u",prov->host,prov->port);
218 hstor = hstor_new(svc_acc,prov->host,prov->username,prov->password);
219 if (hstor) {
220 if (verbose) {
221 hstor->verbose = 1;
224 else {
225 DPRINTF("could not create S3 client\n");
229 /* Start an S3 _producer_. */
230 static void *
231 s3_get_child (void * ctx)
233 backend_thunk_t *tp = (backend_thunk_t *)ctx;
234 my_state *ms = tp->parent;
236 /* TBD: check existence before calling siginit */
237 pipe_prod_siginit(&ms->pipe,0);
239 hstor_get(hstor,ms->bucket,ms->key,http_get_prod,&ms->pipe,0);
240 /* TBD: check return value */
242 pipe_prod_finish(&ms->pipe);
244 DPRINTF("producer exiting\n");
245 return NULL;
248 /* Start an S3 _consumer_. */
249 static void *
250 s3_put_child (void * ctx)
252 pipe_private *pp = ctx;
253 pipe_shared *ps = pp->shared;
254 my_state *ms = ps->owner;
255 curl_off_t llen;
256 const char *clen;
257 bool rcb;
259 llen = (curl_off_t)MHD_SIZE_UNKNOWN;
260 if (ms->be_flags & BACKEND_GET_SIZE) {
261 clen = MHD_lookup_connection_value(
262 ms->conn, MHD_HEADER_KIND, "Content-Length");
263 if (clen) {
264 llen = strtoll(clen,NULL,10);
266 else {
267 error (0, 0, "missing Content-Length");
271 pipe_cons_siginit(ps, 0);
272 rcb = hstor_put(hstor,ms->bucket,ms->key,http_put_cons,llen,pp,NULL);
273 if (!rcb) {
274 DPRINTF("%s returning with error\n",__func__);
275 pipe_cons_siginit(ps, -1);
276 free(pp);
277 return THREAD_FAILED;
280 DPRINTF("%s returning\n",__func__);
281 free(pp);
282 return NULL;
285 static int
286 s3_delete (const provider_t *prov, const char *bucket, const char *key,
287 const char *url)
289 (void)prov;
290 (void)url;
292 hstor_del(hstor,bucket,key);
293 /* TBD: check return value */
295 return MHD_HTTP_OK;
298 static int
299 s3_bcreate (const provider_t *prov, const char *bucket)
301 (void)prov;
303 DPRINTF("creating bucket %s\n",bucket);
305 if (!hstor_add_bucket(hstor,bucket)) {
306 DPRINTF(" bucket create failed\n");
307 return MHD_HTTP_INTERNAL_SERVER_ERROR;
310 return MHD_HTTP_OK;
313 static const char *
314 s3_init_tmpfile (char *value)
316 char *path;
317 int fd;
318 size_t len;
319 ssize_t written;
321 path = strdup("/tmp/iwtmp.XXXXXX");
322 if (!path) {
323 return NULL;
326 fd = mkstemp(path);
327 if (fd < 0) {
328 error (0, errno, "%s: failed to create file from template", path);
329 free(path);
330 return NULL;
333 len = strlen(value);
334 if (len > 0) {
335 written = write(fd,value,len);
336 close(fd);
337 if (written != (ssize_t)len) {
338 if (written < 0) {
339 error (0, errno, "failed to write to %s", path);
341 else {
342 error (0, errno,
343 "invalid write length %zd in %s",
344 written, __func__);
346 unlink(path);
347 free(path);
348 return NULL;
352 return path;
355 static int
356 s3_register (my_state *ms, const provider_t *prov, const char *next,
357 GHashTable *args)
359 char *kernel = g_hash_table_lookup(args,"kernel");
360 char *ramdisk = g_hash_table_lookup(args,"ramdisk");
361 char *api_key;
362 char *api_secret;
363 const char *ami_cert;
364 const char *ami_key;
365 const char *ami_uid;
366 const char *argv[12];
367 int argc = 0;
368 pid_t pid;
369 int organ[2];
370 FILE *fp;
371 char buf[1024];
372 char *cval = NULL;
373 char *kval = NULL;
374 int rc = MHD_HTTP_BAD_REQUEST;
375 char *ami_bkt;
376 char ami_id_buf[64];
377 regmatch_t match[2];
379 if (!regex_ok) {
380 return MHD_HTTP_BAD_REQUEST;
383 if (next) {
384 DPRINTF("S3 register with next!=NULL\n");
385 goto cleanup;
388 DPRINTF("*** register %s/%s via %s (%s:%d)\n",
389 ms->bucket, ms->key, prov->name, prov->host, prov->port);
390 if (kernel) {
391 DPRINTF(" (using kernel %s)\n",kernel);
393 if (ramdisk) {
394 DPRINTF(" (using ramdisk %s)\n",ramdisk);
397 api_key = g_hash_table_lookup(args,"api-key");
398 if (!api_key) {
399 api_key = (char *)prov->username;
400 if (!api_key) {
401 error (0, 0, "missing EC2 API key");
402 goto cleanup;
406 api_secret = g_hash_table_lookup(args,"api-secret");
407 if (!api_secret) {
408 api_secret = (char *)prov->password;
409 if (!prov->password) {
410 error (0, 0, "missing EC2 API key");
411 goto cleanup;
415 cval = g_hash_table_lookup(args,"ami-cert");
416 if (cval) {
417 ami_cert = s3_init_tmpfile(cval);
418 if (!ami_cert) {
419 goto cleanup;
422 else {
423 ami_cert = get_provider_value(prov,"ami-cert");
424 if (!ami_cert) {
425 error (0, 0, "missing EC2 AMI cert");
426 goto cleanup;
430 kval = g_hash_table_lookup(args,"ami-key");
431 if (kval) {
432 ami_key = s3_init_tmpfile(kval);
433 if (!ami_cert) {
434 goto cleanup;
437 else {
438 ami_key = get_provider_value(prov,"ami-key");
439 if (!ami_key) {
440 error (0, 0, "missing EC2 AMI key");
441 goto cleanup;
445 ami_uid = g_hash_table_lookup(args,"ami-uid");
446 if (!ami_uid) {
447 ami_uid = get_provider_value(prov,"ami-uid");
448 if (!ami_uid) {
449 error (0, 0, "missing EC2 AMI uid");
450 goto cleanup;
454 ami_bkt = g_hash_table_lookup(args,"ami-bkt");
455 if (!ami_bkt) {
456 ami_bkt = ms->bucket;
460 * This is the point where we go from validation to execution. If we
461 * were double-forking so this could all be asynchronous, or for that
462 * matter to return an early 100-continue, this would probably be the
463 * place to do it. Even without that, we set the ami-id here so that
464 * the caller can know things are actually in progress.
466 sprintf(ami_id_buf,"pending %lld",(long long)time(NULL));
467 DPRINTF("temporary ami-id = \"%s\"\n",ami_id_buf);
468 (void)meta_set_value(ms->bucket,ms->key,"ami-id",ami_id_buf);
469 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
471 const char *cmd = "dc-register-image";
472 argv[argc++] = cmd;
473 argv[argc++] = ms->bucket;
474 argv[argc++] = ms->key;
475 argv[argc++] = api_key;
476 argv[argc++] = api_secret;
477 argv[argc++] = ami_cert;
478 argv[argc++] = ami_key;
479 argv[argc++] = ami_uid;
480 argv[argc++] = ami_bkt;
481 argv[argc++] = kernel ? kernel : "_default_";
482 argv[argc++] = ramdisk ? ramdisk : "_default_";
483 argv[argc] = NULL;
485 DPRINTF("api-key = %s\n",api_key);
486 DPRINTF("api-secret = %s\n",api_secret);
487 DPRINTF("ami-cert = %s\n",ami_cert);
488 DPRINTF("ami-key = %s\n",ami_key);
489 DPRINTF("ami-uid = %s\n",ami_uid);
490 DPRINTF("ami-bkt = %s\n",ami_bkt);
492 if (pipe(organ) < 0) {
493 error (0, errno, "pipe creation failed");
494 goto cleanup;
497 pid = fork();
498 if (pid < 0) {
499 error (0, errno, "fork failed");
500 close(organ[0]);
501 close(organ[1]);
502 goto cleanup;
505 if (pid == 0) {
506 (void)dup2(organ[1],STDOUT_FILENO);
507 (void)dup2(organ[1],STDERR_FILENO);
508 execvp(cmd, (char* const*)argv);
509 error (EXIT_FAILURE, errno, "failed run command %s", cmd);
512 DPRINTF("waiting for child...\n");
513 if (waitpid(pid,NULL,0) < 0) {
514 error (0, errno, "waitpid failed");
516 /* TBD: check identity/status from waitpid */
517 DPRINTF("...child exited\n");
519 close(organ[1]);
520 fp = fdopen(organ[0],"r");
521 if (!fp) {
522 DPRINTF("could not open parent pipe\n");
523 close(organ[0]);
524 goto cleanup;
526 while (fgets(buf,sizeof(buf)-1,fp)) {
527 buf[sizeof(buf)-1] = '\0';
528 if (regexec(&s3_success_pat,buf,2,match,0) == 0) {
529 buf[match[1].rm_eo] = '\0';
530 DPRINTF("found AMI ID: %s\n",buf+match[1].rm_so);
531 sprintf(ami_id_buf,"OK %.60s",buf+match[1].rm_so);
532 rc = MHD_HTTP_OK;
534 else if (regexec(&s3_failure_pat,buf,2,match,0) == 0) {
535 buf[match[1].rm_eo] = '\0';
536 DPRINTF("found error marker: %s\n",buf+match[1].rm_so);
537 sprintf(ami_id_buf,"failed %.56s",buf+match[1].rm_so);
538 rc = MHD_HTTP_INTERNAL_SERVER_ERROR;
540 else {
541 DPRINTF("ignoring line: <%s>\n",buf);
544 fclose(fp);
546 cleanup:
548 * This is a bit tricky. If we found the cert in the HTTP request and
549 * succeeded in creating a temp file, then this condition will succeed.
550 * If we failed to create the temp file, or never found a cert
551 * anywhere, there will be no ami_cert to clean up. If we got a cert
552 * from the config, then ami_cert will be set but we'll (correctly)
553 * skip cleanup because cval is null.
555 if (cval && ami_cert) {
556 unlink(ami_cert);
557 free((char *)ami_cert);
559 /* Same reasoning as above, with kval/ami_key. */
560 if (kval && ami_key) {
561 unlink(ami_key);
562 free((char *)ami_key);
564 (void)meta_set_value(ms->bucket,ms->key,"ami-id",ami_id_buf);
566 return rc;
569 /***** CURL-specific functions *****/
571 static void
572 curl_init (provider_t *prov)
574 (void)prov;
577 /* Start a CURL _producer_. */
578 static void *
579 curl_get_child (void * ctx)
581 char fixed[1024];
582 backend_thunk_t *tp = (backend_thunk_t *)ctx;
583 my_state *ms = tp->parent;
584 provider_t *prov = tp->prov;
585 CURL *curl;
587 curl = curl_easy_init();
588 if (!curl) {
589 pipe_prod_siginit(&ms->pipe,-1);
590 return NULL; /* TBD: flag error somehow */
592 if (ms->from_master) {
593 sprintf(fixed,"http://%s:%u%s",
594 master_host, master_port, ms->url);
596 else {
597 sprintf(fixed,"http://%s:%u%s",
598 prov->host, prov->port, ms->url);
600 curl_easy_setopt(curl,CURLOPT_URL,fixed);
601 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION, http_get_prod);
602 curl_easy_setopt(curl,CURLOPT_WRITEDATA,&ms->pipe);
603 pipe_prod_siginit(&ms->pipe,0);
605 curl_easy_perform(curl);
606 curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&ms->rc);
607 pipe_prod_finish(&ms->pipe);
609 DPRINTF("producer exiting\n");
610 curl_easy_cleanup(curl);
611 return NULL;
614 /* Start a CURL _consumer_. */
615 static void *
616 curl_put_child (void * ctx)
618 pipe_private *pp = ctx;
619 pipe_shared *ps = pp->shared;
620 my_state *ms = ps->owner;
621 provider_t *prov = pp->prov;
622 curl_off_t llen;
623 char fixed[1024];
624 CURL *curl;
625 const char *clen;
627 llen = (curl_off_t)MHD_SIZE_UNKNOWN;
628 if (ms->be_flags & BACKEND_GET_SIZE) {
629 clen = MHD_lookup_connection_value(
630 ms->conn, MHD_HEADER_KIND, "Content-Length");
631 if (clen) {
632 llen = strtoll(clen,NULL,10);
634 else {
635 error (0, 0, "missing Content-Length");
639 curl = curl_easy_init();
640 if (!curl) {
641 pipe_cons_siginit(ps, -1);
642 free(pp);
643 return THREAD_FAILED;
645 sprintf(fixed,"http://%s:%u%s",prov->host,prov->port,
646 ms->url);
647 curl_easy_setopt(curl,CURLOPT_URL,fixed);
648 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
649 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,llen);
650 curl_easy_setopt(curl,CURLOPT_READFUNCTION,http_put_cons);
651 curl_easy_setopt(curl,CURLOPT_READDATA,pp);
652 pipe_cons_siginit(ps, 0);
653 curl_easy_perform(curl);
654 curl_easy_cleanup(curl);
656 DPRINTF("%s returning\n",__func__);
657 free(pp);
658 return NULL;
661 /* Start a CURL cache consumer. */
662 static void *
663 curl_cache_child (void * ctx)
665 pipe_private *pp = ctx;
666 pipe_shared *ps = pp->shared;
667 my_state *ms = ps->owner;
668 provider_t *prov = pp->prov;
669 char fixed[1024];
670 CURL *curl;
671 char *slash;
672 char *my_url = strdup(ms->url);
674 if (!my_url) {
675 return THREAD_FAILED;
678 curl = curl_easy_init();
679 if (!curl) {
680 free(my_url);
681 pipe_cons_siginit(ps,-1);
682 return THREAD_FAILED;
684 sprintf(fixed,"http://%s:%u%s",prov->host,prov->port,
685 ms->url);
686 curl_easy_setopt(curl,CURLOPT_URL,fixed);
687 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
688 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
689 (curl_off_t)MHD_SIZE_UNKNOWN);
690 curl_easy_setopt(curl,CURLOPT_READFUNCTION,http_put_cons);
691 curl_easy_setopt(curl,CURLOPT_READDATA,pp);
692 curl_easy_perform(curl);
693 curl_easy_cleanup(curl);
695 slash = index(my_url+1,'/');
696 if (slash) {
697 *slash = '\0';
698 meta_got_copy(my_url+1,slash+1,me);
701 free(my_url);
702 return NULL;
705 static int
706 curl_delete (const provider_t *prov, const char *bucket, const char *key,
707 const char *url)
709 CURL *curl;
710 char fixed[1024];
712 (void)bucket;
713 (void)key;
715 curl = curl_easy_init();
716 if (!curl) {
717 return MHD_HTTP_INTERNAL_SERVER_ERROR;
720 sprintf(fixed,"http://%s:%u%s",prov->host,prov->port,url);
721 curl_easy_setopt(curl,CURLOPT_URL,fixed);
722 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
723 curl_easy_perform(curl);
724 curl_easy_cleanup(curl);
726 return MHD_HTTP_OK;
729 static int
730 curl_bcreate (const provider_t *prov, const char *bucket)
732 char addr[ADDR_SIZE];
733 int chars;
734 CURL *curl;
736 chars = snprintf(addr,ADDR_SIZE,"http://%s:%d/%s",
737 prov->host,prov->port,bucket);
738 if (chars >= ADDR_SIZE) {
739 error(0,0,"path too long in %s",__func__);
740 return MHD_HTTP_INTERNAL_SERVER_ERROR;
743 curl = curl_easy_init();
744 if (!curl) {
745 error(0,errno,"no memory in %s",__func__);
746 return MHD_HTTP_INTERNAL_SERVER_ERROR;
748 curl_easy_setopt(curl,CURLOPT_URL,addr);
749 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"PUT");
750 curl_easy_perform(curl);
751 curl_easy_cleanup(curl);
752 return MHD_HTTP_OK;
756 * We can proxy through any number of CURL/HTTP warehouses, but the chain
757 * eventually has to terminate at an S3 back end.
760 static int
761 curl_register (my_state *ms, const provider_t *prov, const char *next,
762 GHashTable *args)
764 char fixed[1024];
765 CURL *curl;
766 struct curl_httppost *first = NULL;
767 struct curl_httppost *last = NULL;
768 char *kernel = g_hash_table_lookup(args,"kernel");
769 char *ramdisk = g_hash_table_lookup(args,"ramdisk");
771 if (!next) {
772 DPRINTF("CURL register with next==NULL\n");
773 return MHD_HTTP_BAD_REQUEST;
776 DPRINTF("*** PROXY registration request for %s/%s to %s (%s:%d)\n",
777 ms->bucket, ms->key, prov->name, prov->host, prov->port);
779 curl = curl_easy_init();
780 if (!curl) {
781 return MHD_HTTP_INTERNAL_SERVER_ERROR;
783 sprintf(fixed,"http://%s:%d/%s/%s",
784 prov->host,prov->port, ms->bucket, ms->key);
785 curl_easy_setopt(curl,CURLOPT_URL,fixed);
786 curl_formadd(&first,&last,
787 CURLFORM_COPYNAME, "op",
788 CURLFORM_COPYCONTENTS, "register",
789 CURLFORM_END);
790 curl_formadd(&first,&last,
791 CURLFORM_COPYNAME, "site",
792 CURLFORM_COPYCONTENTS, next,
793 CURLFORM_END);
794 if (kernel) {
795 curl_formadd(&first,&last,
796 CURLFORM_COPYNAME, "kernel",
797 CURLFORM_COPYCONTENTS, kernel,
798 CURLFORM_END);
800 if (ramdisk) {
801 curl_formadd(&first,&last,
802 CURLFORM_COPYNAME, "ramdisk",
803 CURLFORM_COPYCONTENTS, ramdisk,
804 CURLFORM_END);
806 curl_easy_setopt(curl,CURLOPT_HTTPPOST,first);
807 curl_easy_perform(curl);
808 curl_easy_cleanup(curl);
810 return MHD_HTTP_OK;
813 /***** CF-specific functions *****/
816 * TBD: refactor to maximize common code. Despite the de-duplication between
817 * this module and replica.c, there's still a lot more that could be done to
818 * combine xxx_yyy_child for xxx={http,cf} and yyy={put,cache}. A rough
819 * outline might be:
821 * if xxx=cf, call CF-specific routine to add CF auth header
822 * do common curl setup and execution
823 * if yyy=cache, call meta_got_copy
825 * There might even be an opportunity to combine code for put and bucket
826 * create in some cases, since the only difference is the URL and the
827 * lack of a data transfer in the bucket-create case.
830 static size_t
831 cf_writer (void *ptr ATTRIBUTE_UNUSED, size_t size, size_t nmemb,
832 void *stream ATTRIBUTE_UNUSED)
834 return size * nmemb;
837 static size_t
838 cf_header (void *ptr, size_t size, size_t nmemb, void *stream)
840 char *next;
841 char *sctx;
842 provider_t *prov = (provider_t *)stream;
844 next = strtok_r(ptr,":",&sctx);
845 if (next) {
846 if (!strcasecmp(next,"X-Storage-Url")) {
847 next = strtok_r(NULL," \n\r",&sctx);
848 if (next) {
849 DPRINTF("got CF URL %s\n",next);
850 /* NB: after this, original "host" is gone. */
851 free((char *)prov->host);
852 prov->host = strdup(next);
855 else if (!strcasecmp(next,"X-Storage-Token")) {
856 next = strtok_r(NULL," \n\r",&sctx);
857 if (next) {
858 DPRINTF("got CF token %s\n",next);
859 prov->token = strdup(next);
863 return size * nmemb;
866 static struct curl_slist *
867 cf_add_token (struct curl_slist *in_slist, const char *token)
869 int chars;
870 char auth_hdr[HEADER_SIZE];
871 struct curl_slist *out_slist;
873 if (!token) {
874 return in_slist;
877 chars = snprintf(auth_hdr,HEADER_SIZE,"X-Auth-Token: %s",token);
878 if (chars >= HEADER_SIZE) {
879 error(0,0,"auth_token too long");
880 return in_slist;
883 return curl_slist_append(NULL,auth_hdr);
886 static void
887 cf_init (provider_t *prov)
889 CURL *curl;
890 char addr[ADDR_SIZE];
891 char auth_user[HEADER_SIZE];
892 char auth_key[HEADER_SIZE];
893 char *token;
894 struct curl_slist *slist;
895 int chars;
897 if (prov->token) {
898 return;
901 chars = snprintf(addr,ADDR_SIZE,"https://%s:%u/v1.0",
902 prov->host, prov->port);
903 if (chars >= ADDR_SIZE) {
904 error(0,0,"API URL too long in %s",__func__);
905 return;
908 chars = snprintf(auth_user,HEADER_SIZE,"X-Auth-User: %s",
909 prov->username);
910 if (chars >= HEADER_SIZE) {
911 error(0,0,"auth_user too long in %s",__func__);
912 return;
915 chars = snprintf(auth_key,HEADER_SIZE,"X-Auth-Key: %s",
916 prov->password);
917 if (chars >= HEADER_SIZE) {
918 error(0,0,"auth_key too long in %s",__func__);
919 return;
922 curl = curl_easy_init();
923 curl_easy_setopt(curl,CURLOPT_URL,addr);
924 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION,cf_writer);
925 curl_easy_setopt(curl,CURLOPT_HEADERFUNCTION,cf_header);
926 curl_easy_setopt(curl,CURLOPT_WRITEHEADER,prov);
927 slist = curl_slist_append(NULL,auth_user);
928 slist = curl_slist_append(slist,auth_key);
929 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
930 curl_easy_perform(curl);
931 curl_easy_cleanup(curl);
932 curl_slist_free_all(slist);
934 DPRINTF("CF token = %s\n",prov->token);
937 /* Start a CloudFiles _producer_. */
938 static void *
939 cf_get_child (void * ctx)
941 char fixed[1024];
942 backend_thunk_t *tp = (backend_thunk_t *)ctx;
943 my_state *ms = tp->parent;
944 provider_t *prov = tp->prov;
945 CURL *curl;
946 struct curl_slist *slist = NULL;
948 slist = cf_add_token(slist,prov->token);
949 if (!slist) {
950 return THREAD_FAILED;
953 * Rackspace doesn't clearly document that you'll get
954 * 412 (Precondition Failed) if you omit this.
956 slist = curl_slist_append(slist,
957 "Content-Type: binary/octet-stream");
959 curl = curl_easy_init();
960 if (!curl) {
961 pipe_prod_siginit(&ms->pipe,-1);
962 curl_slist_free_all(slist);
963 return NULL; /* TBD: flag error somehow */
965 sprintf(fixed,"%s%s", prov->host, ms->url);
966 curl_easy_setopt(curl,CURLOPT_URL,fixed);
967 curl_easy_setopt(curl,CURLOPT_WRITEFUNCTION, http_get_prod);
968 curl_easy_setopt(curl,CURLOPT_WRITEDATA,&ms->pipe);
969 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
970 pipe_prod_siginit(&ms->pipe,0);
972 curl_easy_perform(curl);
973 curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&ms->rc);
974 pipe_prod_finish(&ms->pipe);
976 DPRINTF("producer exiting\n");
977 curl_easy_cleanup(curl);
978 curl_slist_free_all(slist);
979 return NULL;
982 /* Start a CloudFiles _consumer_. */
983 void *
984 cf_put_child (void * ctx)
986 pipe_private *pp = ctx;
987 pipe_shared *ps = pp->shared;
988 my_state *ms = ps->owner;
989 provider_t *prov = pp->prov;
990 curl_off_t llen;
991 char fixed[1024];
992 CURL *curl;
993 const char *clen;
994 struct curl_slist *slist = NULL;
996 slist = cf_add_token(slist,prov->token);
997 if (!slist) {
998 return THREAD_FAILED;
1001 llen = (curl_off_t)MHD_SIZE_UNKNOWN;
1002 if (ms->be_flags & BACKEND_GET_SIZE) {
1003 clen = MHD_lookup_connection_value(
1004 ms->conn, MHD_HEADER_KIND, "Content-Length");
1005 if (clen) {
1006 llen = strtoll(clen,NULL,10);
1008 else {
1009 error (0, 0, "missing Content-Length");
1013 curl = curl_easy_init();
1014 if (!curl) {
1015 pipe_cons_siginit(ps, -1);
1016 free(pp);
1017 curl_slist_free_all(slist);
1018 return THREAD_FAILED;
1020 sprintf(fixed,"%s%s",prov->host,ms->url);
1021 curl_easy_setopt(curl,CURLOPT_URL,fixed);
1022 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
1023 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,llen);
1024 curl_easy_setopt(curl,CURLOPT_READFUNCTION,http_put_cons);
1025 curl_easy_setopt(curl,CURLOPT_READDATA,pp);
1026 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
1027 pipe_cons_siginit(ps, 0);
1029 curl_easy_perform(curl);
1030 curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&ms->rc);
1032 DPRINTF("%s returning\n",__func__);
1033 curl_easy_cleanup(curl);
1034 curl_slist_free_all(slist);
1035 free(pp);
1036 return NULL;
1039 static int
1040 cf_delete (const provider_t *prov, const char *bucket, const char *key,
1041 const char *url)
1043 CURL *curl;
1044 char fixed[1024];
1045 int chars;
1046 char auth_hdr[HEADER_SIZE];
1047 long rc;
1048 struct curl_slist *slist = NULL;
1050 slist = cf_add_token(slist,prov->token);
1051 if (!slist) {
1052 return MHD_HTTP_INTERNAL_SERVER_ERROR;
1055 curl = curl_easy_init();
1056 if (!curl) {
1057 curl_slist_free_all(slist);
1058 return MHD_HTTP_INTERNAL_SERVER_ERROR;
1061 sprintf(fixed,"%s%s",prov->host,url);
1062 curl_easy_setopt(curl,CURLOPT_URL,fixed);
1063 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
1064 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
1066 curl_easy_perform(curl);
1067 curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&rc);
1068 DPRINTF("%s: rc = %ld\n",__func__,rc);
1070 curl_easy_cleanup(curl);
1071 curl_slist_free_all(slist);
1073 return MHD_HTTP_OK;
1076 static size_t
1077 cf_null_reader (void *ptr, size_t size, size_t nmemb, void *stream)
1079 return 0;
1082 static int
1083 cf_bcreate (const provider_t *prov, const char *bucket)
1085 char fixed[1024];
1086 CURL *curl;
1087 int chars;
1088 char auth_hdr[HEADER_SIZE];
1089 long rc;
1090 struct curl_slist *slist = NULL;
1092 slist = cf_add_token(slist,prov->token);
1093 if (!slist) {
1094 return MHD_HTTP_INTERNAL_SERVER_ERROR;
1097 curl = curl_easy_init();
1098 if (!curl) {
1099 curl_slist_free_all(slist);
1100 return MHD_HTTP_INTERNAL_SERVER_ERROR;
1102 sprintf(fixed,"%s/%s",prov->host,bucket);
1103 curl_easy_setopt(curl,CURLOPT_URL,fixed);
1104 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
1105 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
1106 (curl_off_t)MHD_SIZE_UNKNOWN);
1107 curl_easy_setopt(curl,CURLOPT_READFUNCTION,cf_null_reader);
1108 curl_easy_setopt(curl,CURLOPT_HTTPHEADER,slist);
1110 curl_easy_perform(curl);
1111 curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&rc);
1112 DPRINTF("%s: rc = %ld\n",__func__,rc);
1114 DPRINTF("%s returning\n",__func__);
1115 curl_easy_cleanup(curl);
1116 curl_slist_free_all(slist);
1117 return MHD_HTTP_OK;
1120 /***** FS-specific functions *****/
1122 static void
1123 fs_init (provider_t *prov)
1125 DPRINTF("changing directory to %s\n",prov->path);
1126 if (chdir(prov->path) < 0) {
1127 error(0,errno,"chdir failed, unsafe to continue");
1128 exit(!0); /* Value doesn't matter, as long as it's not zero. */
1132 /* Start an FS _producer_. */
1133 static void *
1134 fs_get_child (void * ctx)
1136 backend_thunk_t *tp = (backend_thunk_t *)ctx;
1137 my_state *ms = tp->parent;
1138 int fd;
1139 char buf[1<<16];
1140 ssize_t bytes;
1141 char *file = ms->url+1;
1143 fd = open(file, O_RDONLY);
1144 if (fd < 0) {
1145 pipe_prod_siginit(&ms->pipe,-1);
1146 pipe_prod_finish(&ms->pipe);
1147 return THREAD_FAILED;
1150 pipe_prod_siginit(&ms->pipe,0);
1152 for (;;) {
1153 bytes = read(fd,buf,sizeof(buf));
1154 if (bytes <= 0) {
1155 if (bytes < 0) {
1156 error (0, errno, "%s: read failed", file);
1158 break;
1160 pipe_prod_signal(&ms->pipe,buf,bytes);
1163 close(fd);
1164 pipe_prod_finish(&ms->pipe);
1166 DPRINTF("producer exiting\n");
1167 return NULL;
1170 /* Start an FS _consumer_. */
1171 static void *
1172 fs_put_child (void * ctx)
1174 pipe_private *pp = ctx;
1175 pipe_shared *ps = pp->shared;
1176 my_state *ms = ps->owner;
1177 int fd;
1178 ssize_t bytes;
1179 size_t offset;
1180 char *file = ms->url+1;
1182 fd = open(file,O_WRONLY|O_CREAT,0666);
1183 if (fd < 0) {
1184 pipe_cons_siginit(ps, errno);
1185 free(pp);
1186 return THREAD_FAILED;
1189 pipe_cons_siginit(ps, 0);
1191 while (pipe_cons_wait(pp)) {
1192 offset = 0;
1193 do {
1194 bytes = write(fd,
1195 ps->data_ptr+offset,ps->data_len-offset);
1196 if (bytes <= 0) {
1197 if (bytes < 0) {
1198 error (0, errno, "%s: write failed",
1199 file);
1201 pipe_cons_signal(pp, errno);
1202 goto done;
1204 offset += bytes;
1205 } while (offset < ps->data_len);
1206 pipe_cons_signal(pp, 0);
1209 done:
1210 close(fd);
1212 DPRINTF("%s returning\n",__func__);
1213 free(pp);
1214 return NULL;
1217 static int
1218 fs_delete (const provider_t *prov, const char *bucket, const char *key,
1219 const char *url)
1221 (void)prov;
1222 (void)bucket;
1223 (void)key;
1225 if (unlink(url+1) < 0) {
1226 error (0, errno, "%s: failed to unlink", url+1);
1227 return MHD_HTTP_NOT_FOUND;
1230 return MHD_HTTP_OK;
1233 static int
1234 fs_bcreate (const provider_t *prov, const char *bucket)
1236 (void)prov;
1238 DPRINTF("creating bucket %s\n",bucket);
1240 if (mkdir(bucket,0700) < 0) {
1241 error (0, errno, "%s: failed to create directory", bucket);
1242 return MHD_HTTP_INTERNAL_SERVER_ERROR;
1245 return MHD_HTTP_OK;
1248 /***** Function tables. ****/
1250 backend_func_tbl bad_func_tbl = {
1251 "uninitialized",
1252 bad_init,
1253 bad_get_child,
1254 bad_put_child,
1255 bad_cache_child,
1256 bad_delete,
1257 bad_bcreate,
1258 bad_register,
1261 backend_func_tbl s3_func_tbl = {
1262 "S3",
1263 s3_init,
1264 s3_get_child,
1265 s3_put_child,
1266 bad_cache_child,
1267 s3_delete,
1268 s3_bcreate,
1269 s3_register,
1272 backend_func_tbl curl_func_tbl = {
1273 "HTTP",
1274 curl_init,
1275 curl_get_child,
1276 curl_put_child,
1277 curl_cache_child,
1278 curl_delete,
1279 curl_bcreate,
1280 curl_register,
1283 backend_func_tbl cf_func_tbl = {
1284 "CF",
1285 cf_init,
1286 cf_get_child,
1287 cf_put_child,
1288 bad_cache_child,
1289 cf_delete,
1290 cf_bcreate,
1291 bad_register,
1294 backend_func_tbl fs_func_tbl = {
1295 "FS",
1296 fs_init,
1297 fs_get_child,
1298 fs_put_child,
1299 bad_cache_child,
1300 fs_delete,
1301 fs_bcreate,
1302 bad_register,