1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
22 #include <semaphore.h>
33 #include <microhttpd.h>
34 #include <curl/curl.h>
45 #include "state_defs.h"
47 struct hstor_client
*hstor
;
49 /***** Generic module stuff, not specific to one back end *****/
52 * Sizes for internal string buffers. In general, ADDR_SIZE needs to be
53 * big enough to hold a hostname, a port number, a bucket and key (each
54 * MAX_FIELD_LEN=64) and some punctuation. Header size needs to be big
55 * enough to hold the header name plus a CF token (32 bytes).
58 #define HEADER_SIZE 64
60 #define S3_IMAGE_PATTERN "^IMAGE[[:blank:]]+([^[:space:]]+)"
61 #define S3_ERROR_PATTERN "^ERROR[[:blank:]]+([^[:space:]]+)"
63 regex_t s3_success_pat
;
64 regex_t s3_failure_pat
;
72 if (regcomp(&s3_success_pat
,S3_IMAGE_PATTERN
,REG_EXTENDED
) != 0){
73 DPRINTF("could not compile S3 success pattern\n");
77 if (regcomp(&s3_failure_pat
,S3_ERROR_PATTERN
,REG_EXTENDED
) != 0){
78 DPRINTF("could not compile S3 failure pattern\n");
83 /***** Stub functions for unimplemented stuff. *****/
86 bad_init (provider_t
*prov
)
90 DPRINTF("*** bad call to %s\n",__func__
);
94 bad_get_child (void * ctx
)
96 backend_thunk_t
*tp
= (backend_thunk_t
*)ctx
;
97 my_state
*ms
= tp
->parent
;
99 DPRINTF("*** bad call to %s\n",__func__
);
100 pipe_prod_siginit(&ms
->pipe
,-1);
105 bad_put_child (void * ctx
)
107 pipe_private
*pp
= ctx
;
108 pipe_shared
*ps
= pp
->shared
;
110 DPRINTF("*** bad call to %s\n",__func__
);
111 pipe_cons_siginit(ps
, -1);
113 return THREAD_FAILED
;
117 bad_cache_child (void * ctx
)
121 DPRINTF("*** bad call to %s\n",__func__
);
126 bad_delete (const provider_t
*prov
, const char *bucket
, const char *key
,
134 DPRINTF("*** bad call to %s\n",__func__
);
135 return MHD_HTTP_BAD_REQUEST
;
139 bad_bcreate (const provider_t
*prov
, const char *bucket
)
144 DPRINTF("*** bad call to %s\n",__func__
);
145 return MHD_HTTP_NOT_IMPLEMENTED
;
149 bad_register (my_state
*ms
, const provider_t
*prov
, const char *next
,
157 DPRINTF("*** bad call to %s\n",__func__
);
158 return MHD_HTTP_NOT_IMPLEMENTED
;
161 /***** Generic functions shared by the HTTP back ends. */
163 /* Invoked from S3/CURL/CF. */
165 http_get_prod (void *ptr
, size_t size
, size_t nmemb
, void *stream
)
167 size_t total
= size
* nmemb
;
168 pipe_shared
*ps
= stream
;
170 DPRINTF("producer posting %zu bytes as %ld\n",total
,ps
->sequence
+1);
171 pipe_prod_signal(ps
,ptr
,total
);
173 DPRINTF("producer finished chunk\n");
177 /* Invoked from S3/CURL/CF. */
179 http_put_cons (void *ptr
, size_t size
, size_t nmemb
, void *stream
)
181 size_t total
= size
* nmemb
;
182 pipe_private
*pp
= stream
;
183 pipe_shared
*ps
= pp
->shared
;
186 DPRINTF("consumer asked to read %zu\n",total
);
188 if (!pipe_cons_wait(pp
)) {
192 DPRINTF("consumer offset %zu into %zu\n",
193 pp
->offset
, ps
->data_len
);
194 done
= ps
->data_len
- pp
->offset
;
198 memcpy(ptr
,ps
->data_ptr
+pp
->offset
,done
);
200 DPRINTF("consumer copied %zu, new offset %zu\n",
202 if (pp
->offset
== ps
->data_len
) {
203 DPRINTF("consumer finished chunk\n");
204 pipe_cons_signal(pp
, 0);
210 /***** S3-specific functions *****/
213 s3_init (provider_t
*prov
)
218 chars
= snprintf(svc_acc
,sizeof(svc_acc
),"%s:%u",prov
->host
,prov
->port
);
219 if (chars
>= (int)sizeof(svc_acc
)) {
220 error(0,0,"hostname %s too long in %s",prov
->host
,__func__
);
223 hstor
= hstor_new(svc_acc
,prov
->host
,prov
->username
,prov
->password
);
230 DPRINTF("could not create S3 client\n");
234 /* Start an S3 _producer_. */
236 s3_get_child (void * ctx
)
238 backend_thunk_t
*tp
= (backend_thunk_t
*)ctx
;
239 my_state
*ms
= tp
->parent
;
241 /* TBD: check existence before calling siginit */
242 pipe_prod_siginit(&ms
->pipe
,0);
244 hstor_get(hstor
,ms
->bucket
,ms
->key
,http_get_prod
,&ms
->pipe
,0);
245 /* TBD: check return value */
247 pipe_prod_finish(&ms
->pipe
);
249 DPRINTF("producer exiting\n");
253 /* Start an S3 _consumer_. */
255 s3_put_child (void * ctx
)
257 pipe_private
*pp
= ctx
;
258 pipe_shared
*ps
= pp
->shared
;
259 my_state
*ms
= ps
->owner
;
264 llen
= (curl_off_t
)MHD_SIZE_UNKNOWN
;
265 if (ms
->be_flags
& BACKEND_GET_SIZE
) {
266 clen
= MHD_lookup_connection_value(
267 ms
->conn
, MHD_HEADER_KIND
, "Content-Length");
269 llen
= strtoll(clen
,NULL
,10);
272 error (0, 0, "missing Content-Length");
276 pipe_cons_siginit(ps
, 0);
277 rcb
= hstor_put(hstor
,ms
->bucket
,ms
->key
,http_put_cons
,llen
,pp
,NULL
);
279 DPRINTF("%s returning with error\n",__func__
);
280 pipe_cons_siginit(ps
, -1);
282 return THREAD_FAILED
;
285 DPRINTF("%s returning\n",__func__
);
291 s3_delete (const provider_t
*prov
, const char *bucket
, const char *key
,
297 hstor_del(hstor
,bucket
,key
);
298 /* TBD: check return value */
304 s3_bcreate (const provider_t
*prov
, const char *bucket
)
308 DPRINTF("creating bucket %s\n",bucket
);
310 if (!hstor_add_bucket(hstor
,bucket
)) {
311 DPRINTF(" bucket create failed\n");
312 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
319 s3_init_tmpfile (const char *value
)
326 /* FIXME: do not hard-code /tmp. */
327 path
= strdup("/tmp/iwtmp.XXXXXX");
334 error (0, errno
, "%s: failed to create file from template", path
);
341 written
= write(fd
,value
,len
);
343 if (written
!= (ssize_t
)len
) {
345 error (0, errno
, "failed to write to %s", path
);
349 "invalid write length %zd in %s",
362 s3_register (my_state
*ms
, const provider_t
*prov
, const char *next
,
365 char *kernel
= g_hash_table_lookup(args
,"kernel");
366 char *ramdisk
= g_hash_table_lookup(args
,"ramdisk");
369 const char *ami_cert
;
372 const char *argv
[12];
380 int rc
= MHD_HTTP_BAD_REQUEST
;
386 return MHD_HTTP_BAD_REQUEST
;
390 DPRINTF("S3 register with next!=NULL\n");
394 DPRINTF("*** register %s/%s via %s (%s:%d)\n",
395 ms
->bucket
, ms
->key
, prov
->name
, prov
->host
, prov
->port
);
397 DPRINTF(" (using kernel %s)\n",kernel
);
400 DPRINTF(" (using ramdisk %s)\n",ramdisk
);
403 api_key
= g_hash_table_lookup(args
,"api-key");
405 api_key
= (char *)prov
->username
;
407 error (0, 0, "missing EC2 API key");
412 api_secret
= g_hash_table_lookup(args
,"api-secret");
414 api_secret
= (char *)prov
->password
;
415 if (!prov
->password
) {
416 error (0, 0, "missing EC2 API secret");
421 cval
= g_hash_table_lookup(args
,"ami-cert");
423 ami_cert
= s3_init_tmpfile(cval
);
429 ami_cert
= get_provider_value(prov
,"ami-cert");
431 error (0, 0, "missing EC2 AMI cert");
436 kval
= g_hash_table_lookup(args
,"ami-key");
438 ami_key
= s3_init_tmpfile(kval
);
444 ami_key
= get_provider_value(prov
,"ami-key");
446 error (0, 0, "missing EC2 AMI key");
451 ami_uid
= g_hash_table_lookup(args
,"ami-uid");
453 ami_uid
= get_provider_value(prov
,"ami-uid");
455 error (0, 0, "missing EC2 AMI uid");
460 ami_bkt
= g_hash_table_lookup(args
,"ami-bkt");
462 ami_bkt
= ms
->bucket
;
466 * This is the point where we go from validation to execution. If we
467 * were double-forking so this could all be asynchronous, or for that
468 * matter to return an early 100-continue, this would probably be the
469 * place to do it. Even without that, we set the ami-id here so that
470 * the caller can know things are actually in progress.
472 sprintf(ami_id_buf
,"pending %lld",(long long)time(NULL
));
473 DPRINTF("temporary ami-id = \"%s\"\n",ami_id_buf
);
474 (void)meta_set_value(ms
->bucket
,ms
->key
,"ami-id",ami_id_buf
);
475 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
477 const char *cmd
= "dc-register-image";
479 argv
[argc
++] = ms
->bucket
;
480 argv
[argc
++] = ms
->key
;
481 argv
[argc
++] = api_key
;
482 argv
[argc
++] = api_secret
;
483 argv
[argc
++] = ami_cert
;
484 argv
[argc
++] = ami_key
;
485 argv
[argc
++] = ami_uid
;
486 argv
[argc
++] = ami_bkt
;
487 argv
[argc
++] = kernel
? kernel
: "_default_";
488 argv
[argc
++] = ramdisk
? ramdisk
: "_default_";
491 DPRINTF("api-key = %s\n",api_key
);
492 DPRINTF("api-secret = %s\n",api_secret
);
493 DPRINTF("ami-cert = %s\n",ami_cert
);
494 DPRINTF("ami-key = %s\n",ami_key
);
495 DPRINTF("ami-uid = %s\n",ami_uid
);
496 DPRINTF("ami-bkt = %s\n",ami_bkt
);
498 if (pipe(organ
) < 0) {
499 error (0, errno
, "pipe creation failed");
505 error (0, errno
, "fork failed");
512 (void)dup2(organ
[1],STDOUT_FILENO
);
513 (void)dup2(organ
[1],STDERR_FILENO
);
514 execvp(cmd
, (char* const*)argv
);
515 error (EXIT_FAILURE
, errno
, "failed run command %s", cmd
);
518 DPRINTF("waiting for child...\n");
519 if (waitpid(pid
,NULL
,0) < 0) {
520 error (0, errno
, "waitpid failed");
522 /* TBD: check identity/status from waitpid */
523 DPRINTF("...child exited\n");
526 fp
= fdopen(organ
[0],"r");
528 DPRINTF("could not open parent pipe\n");
532 while (fgets(buf
,sizeof(buf
)-1,fp
)) {
533 buf
[sizeof(buf
)-1] = '\0';
534 if (regexec(&s3_success_pat
,buf
,2,match
,0) == 0) {
535 buf
[match
[1].rm_eo
] = '\0';
536 DPRINTF("found AMI ID: %s\n",buf
+match
[1].rm_so
);
537 sprintf(ami_id_buf
,"OK %.60s",buf
+match
[1].rm_so
);
540 else if (regexec(&s3_failure_pat
,buf
,2,match
,0) == 0) {
541 buf
[match
[1].rm_eo
] = '\0';
542 DPRINTF("found error marker: %s\n",buf
+match
[1].rm_so
);
543 sprintf(ami_id_buf
,"failed %.56s",buf
+match
[1].rm_so
);
544 rc
= MHD_HTTP_INTERNAL_SERVER_ERROR
;
547 DPRINTF("ignoring line: <%s>\n",buf
);
554 * This is a bit tricky. If we found the cert in the HTTP request and
555 * succeeded in creating a temp file, then this condition will succeed.
556 * If we failed to create the temp file, or never found a cert
557 * anywhere, there will be no ami_cert to clean up. If we got a cert
558 * from the config, then ami_cert will be set but we'll (correctly)
559 * skip cleanup because cval is null.
561 if (cval
&& ami_cert
) {
563 free((char *)ami_cert
);
565 /* Same reasoning as above, with kval/ami_key. */
566 if (kval
&& ami_key
) {
568 free((char *)ami_key
);
570 (void)meta_set_value(ms
->bucket
,ms
->key
,"ami-id",ami_id_buf
);
575 /***** CURL-specific functions *****/
578 curl_init (provider_t
*prov
)
583 /* Start a CURL _producer_. */
585 curl_get_child (void * ctx
)
587 char fixed
[ADDR_SIZE
];
588 backend_thunk_t
*tp
= (backend_thunk_t
*)ctx
;
589 my_state
*ms
= tp
->parent
;
590 provider_t
*prov
= tp
->prov
;
594 curl
= curl_easy_init();
596 pipe_prod_siginit(&ms
->pipe
,-1);
597 return NULL
; /* TBD: flag error somehow */
599 if (ms
->from_master
) {
600 chars
= snprintf(fixed
,sizeof(fixed
),"http://%s:%u%s",
601 master_host
, master_port
, ms
->url
);
604 chars
= snprintf(fixed
,sizeof(fixed
),"http://%s:%u%s",
605 prov
->host
, prov
->port
, ms
->url
);
607 if (chars
>= (int)sizeof(fixed
)) {
608 error(0,0,"path too long in %s",__func__
);
611 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
612 curl_easy_setopt(curl
,CURLOPT_WRITEFUNCTION
, http_get_prod
);
613 curl_easy_setopt(curl
,CURLOPT_WRITEDATA
,&ms
->pipe
);
614 pipe_prod_siginit(&ms
->pipe
,0);
616 curl_easy_perform(curl
);
617 curl_easy_getinfo(curl
,CURLINFO_RESPONSE_CODE
,&ms
->rc
);
618 pipe_prod_finish(&ms
->pipe
);
620 DPRINTF("producer exiting\n");
621 curl_easy_cleanup(curl
);
625 /* Start a CURL _consumer_. */
627 curl_put_child (void * ctx
)
629 pipe_private
*pp
= ctx
;
630 pipe_shared
*ps
= pp
->shared
;
631 my_state
*ms
= ps
->owner
;
632 provider_t
*prov
= pp
->prov
;
634 char fixed
[ADDR_SIZE
];
637 struct curl_slist
*slist
= NULL
;
640 llen
= (curl_off_t
)MHD_SIZE_UNKNOWN
;
641 if (ms
->be_flags
& BACKEND_GET_SIZE
) {
642 clen
= MHD_lookup_connection_value(
643 ms
->conn
, MHD_HEADER_KIND
, "Content-Length");
645 llen
= strtoll(clen
,NULL
,10);
648 error (0, 0, "missing Content-Length");
653 * This is how the iwhd at the other end knows this is a replication
654 * request and not just a PUT from some random user.
655 * TBD: add some auth* for this.
657 slist
= curl_slist_append(slist
,"X-redhat-role: master");
659 curl
= curl_easy_init();
661 pipe_cons_siginit(ps
, -1);
663 return THREAD_FAILED
;
665 chars
= snprintf(fixed
,sizeof(fixed
),
666 "http://%s:%u/%s/%s",prov
->host
,prov
->port
,ms
->bucket
,ms
->key
);
667 if (chars
>= (int)sizeof(fixed
)) {
668 error(0,0,"path too long in %s",__func__
);
671 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
672 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
673 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,llen
);
674 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,http_put_cons
);
675 curl_easy_setopt(curl
,CURLOPT_READDATA
,pp
);
676 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
677 pipe_cons_siginit(ps
, 0);
678 curl_easy_perform(curl
);
679 curl_easy_cleanup(curl
);
680 curl_slist_free_all(slist
);
682 DPRINTF("%s returning\n",__func__
);
687 /* Start a CURL cache consumer. */
689 curl_cache_child (void * ctx
)
691 pipe_private
*pp
= ctx
;
692 pipe_shared
*ps
= pp
->shared
;
693 my_state
*ms
= ps
->owner
;
694 provider_t
*prov
= pp
->prov
;
695 char fixed
[ADDR_SIZE
];
698 char *my_url
= strdup(ms
->url
);
702 return THREAD_FAILED
;
705 curl
= curl_easy_init();
708 pipe_cons_siginit(ps
,-1);
709 return THREAD_FAILED
;
711 chars
= snprintf(fixed
,sizeof(fixed
),
712 "http://%s:%u%s",prov
->host
,prov
->port
,ms
->url
);
713 if (chars
>= (int)sizeof(fixed
)) {
714 error(0,0,"path too long in %s",__func__
);
717 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
718 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
719 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,
720 (curl_off_t
)MHD_SIZE_UNKNOWN
);
721 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,http_put_cons
);
722 curl_easy_setopt(curl
,CURLOPT_READDATA
,pp
);
723 curl_easy_perform(curl
);
724 curl_easy_cleanup(curl
);
726 slash
= strchr(my_url
+1,'/');
729 meta_got_copy(my_url
+1,slash
+1,me
);
737 curl_delete (const provider_t
*prov
, const char *bucket
, const char *key
,
741 char fixed
[ADDR_SIZE
];
747 curl
= curl_easy_init();
749 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
752 chars
= snprintf(fixed
,sizeof(fixed
),
753 "http://%s:%u%s",prov
->host
,prov
->port
,url
);
754 if (chars
>= (int)sizeof(fixed
)) {
755 error(0,0,"path too long in %s",__func__
);
756 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
758 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
759 curl_easy_setopt(curl
,CURLOPT_CUSTOMREQUEST
,"DELETE");
760 curl_easy_perform(curl
);
761 curl_easy_cleanup(curl
);
767 curl_bcreate (const provider_t
*prov
, const char *bucket
)
769 char addr
[ADDR_SIZE
];
773 chars
= snprintf(addr
,sizeof(addr
),"http://%s:%d/%s",
774 prov
->host
,prov
->port
,bucket
);
775 if (chars
>= (int)sizeof(addr
)) {
776 error(0,0,"path too long in %s",__func__
);
777 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
780 curl
= curl_easy_init();
782 error(0,errno
,"no memory in %s",__func__
);
783 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
785 curl_easy_setopt(curl
,CURLOPT_URL
,addr
);
786 curl_easy_setopt(curl
,CURLOPT_CUSTOMREQUEST
,"PUT");
787 curl_easy_perform(curl
);
788 curl_easy_cleanup(curl
);
793 * We can proxy through any number of CURL/HTTP warehouses, but the chain
794 * eventually has to terminate at an S3 back end.
798 curl_register (my_state
*ms
, const provider_t
*prov
, const char *next
,
801 char fixed
[ADDR_SIZE
];
803 struct curl_httppost
*first
= NULL
;
804 struct curl_httppost
*last
= NULL
;
805 char *kernel
= g_hash_table_lookup(args
,"kernel");
806 char *ramdisk
= g_hash_table_lookup(args
,"ramdisk");
810 DPRINTF("CURL register with next==NULL\n");
811 return MHD_HTTP_BAD_REQUEST
;
814 DPRINTF("*** PROXY registration request for %s/%s to %s (%s:%d)\n",
815 ms
->bucket
, ms
->key
, prov
->name
, prov
->host
, prov
->port
);
817 curl
= curl_easy_init();
819 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
821 chars
= snprintf(fixed
,sizeof(fixed
),"http://%s:%d/%s/%s",
822 prov
->host
,prov
->port
, ms
->bucket
, ms
->key
);
823 if (chars
>= (int)sizeof(fixed
)) {
824 error(0,0,"path too long in %s",__func__
);
825 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
827 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
828 curl_formadd(&first
,&last
,
829 CURLFORM_COPYNAME
, "op",
830 CURLFORM_COPYCONTENTS
, "register",
832 curl_formadd(&first
,&last
,
833 CURLFORM_COPYNAME
, "site",
834 CURLFORM_COPYCONTENTS
, next
,
837 curl_formadd(&first
,&last
,
838 CURLFORM_COPYNAME
, "kernel",
839 CURLFORM_COPYCONTENTS
, kernel
,
843 curl_formadd(&first
,&last
,
844 CURLFORM_COPYNAME
, "ramdisk",
845 CURLFORM_COPYCONTENTS
, ramdisk
,
848 curl_easy_setopt(curl
,CURLOPT_HTTPPOST
,first
);
849 curl_easy_perform(curl
);
850 curl_easy_cleanup(curl
);
855 /***** CF-specific functions *****/
858 * TBD: refactor to maximize common code. Despite the de-duplication between
859 * this module and replica.c, there's still a lot more that could be done to
860 * combine xxx_yyy_child for xxx={http,cf} and yyy={put,cache}. A rough
863 * if xxx=cf, call CF-specific routine to add CF auth header
864 * do common curl setup and execution
865 * if yyy=cache, call meta_got_copy
867 * There might even be an opportunity to combine code for put and bucket
868 * create in some cases, since the only difference is the URL and the
869 * lack of a data transfer in the bucket-create case.
873 cf_writer (void *ptr ATTRIBUTE_UNUSED
, size_t size
, size_t nmemb
,
874 void *stream ATTRIBUTE_UNUSED
)
880 cf_header (void *ptr
, size_t size
, size_t nmemb
, void *stream
)
884 provider_t
*prov
= (provider_t
*)stream
;
886 next
= strtok_r(ptr
,":",&sctx
);
888 if (!strcasecmp(next
,"X-Storage-Url")) {
889 next
= strtok_r(NULL
," \n\r",&sctx
);
891 DPRINTF("got CF URL %s\n",next
);
892 /* NB: after this, original "host" is gone. */
893 free((char *)prov
->host
);
894 prov
->host
= strdup(next
);
897 else if (!strcasecmp(next
,"X-Storage-Token")) {
898 next
= strtok_r(NULL
," \n\r",&sctx
);
900 DPRINTF("got CF token %s\n",next
);
901 prov
->token
= strdup(next
);
908 static struct curl_slist
*
909 cf_add_token (struct curl_slist
*in_slist
, const char *token
)
912 char auth_hdr
[HEADER_SIZE
];
918 chars
= snprintf(auth_hdr
,sizeof(auth_hdr
),"X-Auth-Token: %s",token
);
919 if (chars
>= (int)sizeof(auth_hdr
)) {
920 error(0,0,"auth_hdr too long");
924 return curl_slist_append(NULL
,auth_hdr
);
928 cf_init (provider_t
*prov
)
931 char addr
[ADDR_SIZE
];
932 char auth_user
[HEADER_SIZE
];
933 char auth_key
[HEADER_SIZE
];
934 struct curl_slist
*slist
;
941 chars
= snprintf(addr
,sizeof(addr
),"https://%s:%u/v1.0",
942 prov
->host
, prov
->port
);
943 if (chars
>= (int)sizeof(addr
)) {
944 error(0,0,"API URL too long in %s",__func__
);
948 chars
= snprintf(auth_user
,sizeof(auth_user
),"X-Auth-User: %s",
950 if (chars
>= (int)sizeof(auth_user
)) {
951 error(0,0,"auth_user too long in %s",__func__
);
955 chars
= snprintf(auth_key
,sizeof(auth_key
),"X-Auth-Key: %s",
957 if (chars
>= (int)sizeof(auth_key
)) {
958 error(0,0,"auth_key too long in %s",__func__
);
962 curl
= curl_easy_init();
963 curl_easy_setopt(curl
,CURLOPT_URL
,addr
);
964 curl_easy_setopt(curl
,CURLOPT_WRITEFUNCTION
,cf_writer
);
965 curl_easy_setopt(curl
,CURLOPT_HEADERFUNCTION
,cf_header
);
966 curl_easy_setopt(curl
,CURLOPT_WRITEHEADER
,prov
);
967 slist
= curl_slist_append(NULL
,auth_user
);
968 slist
= curl_slist_append(slist
,auth_key
);
969 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
970 curl_easy_perform(curl
);
971 curl_easy_cleanup(curl
);
972 curl_slist_free_all(slist
);
974 DPRINTF("CF token = %s\n",prov
->token
);
977 /* Start a CloudFiles _producer_. */
979 cf_get_child (void * ctx
)
981 char fixed
[ADDR_SIZE
];
982 backend_thunk_t
*tp
= (backend_thunk_t
*)ctx
;
983 my_state
*ms
= tp
->parent
;
984 provider_t
*prov
= tp
->prov
;
986 struct curl_slist
*slist
= NULL
;
989 slist
= cf_add_token(slist
,prov
->token
);
991 return THREAD_FAILED
;
994 * Rackspace doesn't clearly document that you'll get
995 * 412 (Precondition Failed) if you omit this.
997 slist
= curl_slist_append(slist
,
998 "Content-Type: binary/octet-stream");
1000 curl
= curl_easy_init();
1002 pipe_prod_siginit(&ms
->pipe
,-1);
1003 curl_slist_free_all(slist
);
1004 return NULL
; /* TBD: flag error somehow */
1006 chars
= snprintf(fixed
,sizeof(fixed
),"%s%s", prov
->host
, ms
->url
);
1007 if (chars
>= (int)sizeof(fixed
)) {
1008 error(0,0,"path too long in %s",__func__
);
1011 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
1012 curl_easy_setopt(curl
,CURLOPT_WRITEFUNCTION
, http_get_prod
);
1013 curl_easy_setopt(curl
,CURLOPT_WRITEDATA
,&ms
->pipe
);
1014 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
1015 pipe_prod_siginit(&ms
->pipe
,0);
1017 curl_easy_perform(curl
);
1018 curl_easy_getinfo(curl
,CURLINFO_RESPONSE_CODE
,&ms
->rc
);
1019 pipe_prod_finish(&ms
->pipe
);
1021 DPRINTF("producer exiting\n");
1022 curl_easy_cleanup(curl
);
1023 curl_slist_free_all(slist
);
1027 /* Start a CloudFiles _consumer_. */
1029 cf_put_child (void * ctx
)
1031 pipe_private
*pp
= ctx
;
1032 pipe_shared
*ps
= pp
->shared
;
1033 my_state
*ms
= ps
->owner
;
1034 provider_t
*prov
= pp
->prov
;
1036 char fixed
[ADDR_SIZE
];
1039 struct curl_slist
*slist
= NULL
;
1042 slist
= cf_add_token(slist
,prov
->token
);
1044 return THREAD_FAILED
;
1047 llen
= (curl_off_t
)MHD_SIZE_UNKNOWN
;
1048 if (ms
->be_flags
& BACKEND_GET_SIZE
) {
1049 clen
= MHD_lookup_connection_value(
1050 ms
->conn
, MHD_HEADER_KIND
, "Content-Length");
1052 llen
= strtoll(clen
,NULL
,10);
1055 error (0, 0, "missing Content-Length");
1059 curl
= curl_easy_init();
1061 pipe_cons_siginit(ps
, -1);
1063 curl_slist_free_all(slist
);
1064 return THREAD_FAILED
;
1066 chars
= snprintf(fixed
,sizeof(fixed
),
1067 "%s/%s/%s",prov
->host
,ms
->bucket
,ms
->key
);
1068 if (chars
>= (int)sizeof(fixed
)) {
1069 error(0,0,"path too long in %s",__func__
);
1072 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
1073 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
1074 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,llen
);
1075 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,http_put_cons
);
1076 curl_easy_setopt(curl
,CURLOPT_READDATA
,pp
);
1077 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
1078 pipe_cons_siginit(ps
, 0);
1080 curl_easy_perform(curl
);
1081 curl_easy_getinfo(curl
,CURLINFO_RESPONSE_CODE
,&ms
->rc
);
1083 DPRINTF("%s returning\n",__func__
);
1084 curl_easy_cleanup(curl
);
1085 curl_slist_free_all(slist
);
1091 cf_delete (const provider_t
*prov
,
1092 const char *bucket ATTRIBUTE_UNUSED
,
1093 const char *key ATTRIBUTE_UNUSED
,
1097 char fixed
[ADDR_SIZE
];
1099 struct curl_slist
*slist
= NULL
;
1102 slist
= cf_add_token(slist
,prov
->token
);
1104 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1107 curl
= curl_easy_init();
1109 curl_slist_free_all(slist
);
1110 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1113 chars
= snprintf(fixed
,sizeof(fixed
),"%s%s",prov
->host
,url
);
1114 if (chars
>= (int)sizeof(fixed
)) {
1115 error(0,0,"path too long in %s",__func__
);
1116 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1118 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
1119 curl_easy_setopt(curl
,CURLOPT_CUSTOMREQUEST
,"DELETE");
1120 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
1122 curl_easy_perform(curl
);
1123 curl_easy_getinfo(curl
,CURLINFO_RESPONSE_CODE
,&rc
);
1124 DPRINTF("%s: rc = %ld\n",__func__
,rc
);
1126 curl_easy_cleanup(curl
);
1127 curl_slist_free_all(slist
);
1133 cf_null_reader (void *ptr ATTRIBUTE_UNUSED
,
1134 size_t size ATTRIBUTE_UNUSED
,
1135 size_t nmemb ATTRIBUTE_UNUSED
,
1136 void *stream ATTRIBUTE_UNUSED
)
1142 cf_bcreate (const provider_t
*prov
, const char *bucket
)
1144 char fixed
[ADDR_SIZE
];
1147 struct curl_slist
*slist
= NULL
;
1150 slist
= cf_add_token(slist
,prov
->token
);
1152 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1155 curl
= curl_easy_init();
1157 curl_slist_free_all(slist
);
1158 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1160 chars
= snprintf(fixed
,sizeof(fixed
),"%s/%s",prov
->host
,bucket
);
1161 if (chars
>= (int)sizeof(fixed
)) {
1162 error(0,0,"path too long in %s",__func__
);
1163 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1165 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
1166 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
1167 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,
1168 (curl_off_t
)MHD_SIZE_UNKNOWN
);
1169 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,cf_null_reader
);
1170 curl_easy_setopt(curl
,CURLOPT_HTTPHEADER
,slist
);
1172 curl_easy_perform(curl
);
1173 curl_easy_getinfo(curl
,CURLINFO_RESPONSE_CODE
,&rc
);
1174 DPRINTF("%s: rc = %ld\n",__func__
,rc
);
1176 DPRINTF("%s returning\n",__func__
);
1177 curl_easy_cleanup(curl
);
1178 curl_slist_free_all(slist
);
1182 /***** FS-specific functions *****/
1185 fs_init (provider_t
*prov
)
1187 DPRINTF("changing directory to %s\n",prov
->path
);
1188 if (chdir(prov
->path
) < 0) {
1189 error(0,errno
,"chdir failed, unsafe to continue");
1190 exit(!0); /* Value doesn't matter, as long as it's not zero. */
1194 /* Start an FS _producer_. */
1196 fs_get_child (void * ctx
)
1198 backend_thunk_t
*tp
= (backend_thunk_t
*)ctx
;
1199 my_state
*ms
= tp
->parent
;
1203 char *file
= ms
->url
+1;
1205 fd
= open(file
, O_RDONLY
);
1207 pipe_prod_siginit(&ms
->pipe
,-1);
1208 pipe_prod_finish(&ms
->pipe
);
1209 return THREAD_FAILED
;
1212 pipe_prod_siginit(&ms
->pipe
,0);
1215 bytes
= read(fd
,buf
,sizeof(buf
));
1218 error (0, errno
, "%s: read failed", file
);
1222 pipe_prod_signal(&ms
->pipe
,buf
,bytes
);
1226 pipe_prod_finish(&ms
->pipe
);
1228 DPRINTF("producer exiting\n");
1232 /* Start an FS _consumer_. */
1234 fs_put_child (void * ctx
)
1236 pipe_private
*pp
= ctx
;
1237 pipe_shared
*ps
= pp
->shared
;
1238 my_state
*ms
= ps
->owner
;
1242 char fixed
[ADDR_SIZE
];
1245 chars
= snprintf(fixed
,sizeof(fixed
),"%s/%s",ms
->bucket
,ms
->key
);
1246 if (chars
>= (int)sizeof(fixed
)) {
1247 error(0,0,"path too long in %s",__func__
);
1250 if (unlink(fixed
) < 0) {
1251 error(0,errno
,"unlink failed for %s (non-fatal)",fixed
);
1253 fd
= open(fixed
,O_WRONLY
|O_CREAT
|O_EXCL
,0666);
1255 pipe_cons_siginit(ps
, errno
);
1257 return THREAD_FAILED
;
1260 pipe_cons_siginit(ps
, 0);
1262 while (pipe_cons_wait(pp
)) {
1263 for (offset
= 0; offset
< ps
->data_len
; offset
+= bytes
) {
1265 ps
->data_ptr
+offset
,ps
->data_len
-offset
);
1268 error (0, errno
, "%s: write failed",
1270 pipe_cons_signal(pp
, errno
);
1273 pipe_cons_signal(pp
, ENOSPC
);
1278 pipe_cons_signal(pp
, 0);
1283 DPRINTF("%s returning\n",__func__
);
1289 fs_delete (const provider_t
*prov
, const char *bucket
, const char *key
,
1296 if (unlink(url
+1) < 0) {
1297 error (0, errno
, "%s: failed to unlink", url
+1);
1298 return MHD_HTTP_NOT_FOUND
;
1305 fs_bcreate (const provider_t
*prov
, const char *bucket
)
1309 DPRINTF("creating bucket %s\n",bucket
);
1311 if (mkdir(bucket
,0700) < 0) {
1312 error (0, errno
, "%s: failed to create directory", bucket
);
1313 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
1319 /***** Function tables. ****/
1321 backend_func_tbl bad_func_tbl
= {
1332 backend_func_tbl s3_func_tbl
= {
1343 backend_func_tbl curl_func_tbl
= {
1354 backend_func_tbl cf_func_tbl
= {
1365 backend_func_tbl fs_func_tbl
= {