17 #include <microhttpd.h>
18 #include <curl/curl.h>
29 #include "state_defs.h"
31 struct hstor_client
*hstor
;
33 /***** Stub functions for unimplemented stuff. *****/
38 DPRINTF("*** bad call to %s\n",__func__
);
42 bad_get_child (void * ctx
)
46 DPRINTF("*** bad call to %s\n",__func__
);
51 bad_put_child (void * ctx
)
55 DPRINTF("*** bad call to %s\n",__func__
);
60 bad_cache_child (void * ctx
)
64 DPRINTF("*** bad call to %s\n",__func__
);
69 bad_delete (char *bucket
, char *key
, char *url
)
75 DPRINTF("*** bad call to %s\n",__func__
);
80 bad_bcreate (char *bucket
)
84 DPRINTF("*** bad call to %s\n",__func__
);
85 return MHD_HTTP_NOT_IMPLEMENTED
;
88 /***** Generic functions shared by the HTTP back ends. */
90 /* Invoked from S3/CURL/CF. */
92 http_get_prod (void *ptr
, size_t size
, size_t nmemb
, void *stream
)
94 size_t total
= size
* nmemb
;
95 pipe_shared
*ps
= stream
;
97 DPRINTF("producer posting %zu bytes as %ld\n",total
,ps
->sequence
+1);
98 pipe_prod_signal(ps
,ptr
,total
);
100 DPRINTF("producer chunk finished\n");
104 /* Invoked from S3/CURL/CF. */
106 http_put_cons (void *ptr
, size_t size
, size_t nmemb
, void *stream
)
108 size_t total
= size
* nmemb
;
109 pipe_private
*pp
= stream
;
110 pipe_shared
*ps
= pp
->shared
;
113 DPRINTF("consumer asked to read %zu\n",total
);
115 if (!pipe_cons_wait(pp
)) {
119 DPRINTF("consumer offset %zu into %zu\n",
120 pp
->offset
, ps
->data_len
);
121 done
= ps
->data_len
- pp
->offset
;
125 memcpy(ptr
,ps
->data_ptr
+pp
->offset
,done
);
127 DPRINTF("consumer copied %zu, new offset %zu\n",
129 if (pp
->offset
== ps
->data_len
) {
130 DPRINTF("consumer finished chunk\n");
131 pipe_cons_signal(pp
);
137 /***** S3-specific functions *****/
144 snprintf(svc_acc
,sizeof(svc_acc
),"%s:%u",
145 proxy_host
,proxy_port
);
146 hstor
= hstor_new(svc_acc
,proxy_host
,proxy_key
,proxy_secret
);
147 /* TBD: check return value */
153 /* Start an S3 _producer_. */
155 s3_get_child (void * ctx
)
159 hstor_get(hstor
,ms
->bucket
,ms
->key
,http_get_prod
,&ms
->pipe
,0);
160 /* TBD: check return value */
162 pipe_prod_finish(&ms
->pipe
);
164 DPRINTF("producer exiting\n");
168 /* Start an S3 _consumer_. */
170 s3_put_child (void * ctx
)
172 pipe_private
*pp
= ctx
;
173 pipe_shared
*ps
= pp
->shared
;
174 my_state
*ms
= ps
->owner
;
178 clen
= MHD_lookup_connection_value(
179 ms
->conn
, MHD_HEADER_KIND
, "Content-Length");
181 llen
= strtoll(clen
,NULL
,10);
184 fprintf(stderr
,"missing Content-Length\n");
185 llen
= (curl_off_t
)MHD_SIZE_UNKNOWN
;
188 hstor_put(hstor
,ms
->bucket
,ms
->key
,
189 http_put_cons
,llen
,pp
,NULL
);
190 /* TBD: check return value */
192 DPRINTF("%s returning\n",__func__
);
198 s3_delete (char *bucket
, char *key
, char *url
)
202 hstor_del(hstor
,bucket
,key
);
203 /* TBD: check return value */
209 s3_bcreate (char *bucket
)
211 DPRINTF("creating bucket %s\n",bucket
);
213 if (!hstor_add_bucket(hstor
,bucket
)) {
214 DPRINTF(" bucket create failed\n");
215 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
221 /***** CURL-specific functions *****/
228 /* Start a CURL _producer_. */
230 curl_get_child (void * ctx
)
235 ms
->curl
= curl_easy_init();
237 return NULL
; /* TBD: flag error somehow */
239 ms
->cleanup
|= CLEANUP_CURL
;
240 if (ms
->from_master
) {
241 sprintf(fixed
,"http://%s:%u%s",
242 master_host
, master_port
, ms
->url
);
245 sprintf(fixed
,"http://%s:%u%s",
246 proxy_host
, proxy_port
, ms
->url
);
248 curl_easy_setopt(ms
->curl
,CURLOPT_URL
,fixed
);
249 curl_easy_setopt(ms
->curl
,CURLOPT_WRITEFUNCTION
,
251 curl_easy_setopt(ms
->curl
,CURLOPT_WRITEDATA
,&ms
->pipe
);
252 curl_easy_perform(ms
->curl
);
253 curl_easy_getinfo(ms
->curl
,CURLINFO_RESPONSE_CODE
,&ms
->rc
);
255 pipe_prod_finish(&ms
->pipe
);
257 DPRINTF("producer exiting\n");
261 /* Start a CURL _consumer_. */
263 curl_put_child (void * ctx
)
265 pipe_private
*pp
= ctx
;
266 pipe_shared
*ps
= pp
->shared
;
267 my_state
*ms
= ps
->owner
;
273 clen
= MHD_lookup_connection_value(
274 ms
->conn
, MHD_HEADER_KIND
, "Content-Length");
276 llen
= strtoll(clen
,NULL
,10);
279 fprintf(stderr
,"missing Content-Length\n");
280 llen
= (curl_off_t
)MHD_SIZE_UNKNOWN
;
283 curl
= curl_easy_init();
285 return THREAD_FAILED
;
287 sprintf(fixed
,"http://%s:%u%s",proxy_host
,proxy_port
,
289 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
290 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
291 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,llen
);
292 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,http_put_cons
);
293 curl_easy_setopt(curl
,CURLOPT_READDATA
,pp
);
294 curl_easy_perform(curl
);
295 curl_easy_cleanup(curl
);
297 DPRINTF("%s returning\n",__func__
);
302 /* Start a CURL cache consumer. */
304 curl_cache_child (void * ctx
)
306 pipe_private
*pp
= ctx
;
307 pipe_shared
*ps
= pp
->shared
;
308 my_state
*ms
= ps
->owner
;
312 char *my_url
= strdup(ms
->url
);
315 return THREAD_FAILED
;
318 curl
= curl_easy_init();
321 return THREAD_FAILED
;
323 sprintf(fixed
,"http://%s:%u%s",proxy_host
,proxy_port
,
325 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
326 curl_easy_setopt(curl
,CURLOPT_UPLOAD
,1);
327 curl_easy_setopt(curl
,CURLOPT_INFILESIZE_LARGE
,
328 (curl_off_t
)MHD_SIZE_UNKNOWN
);
329 curl_easy_setopt(curl
,CURLOPT_READFUNCTION
,http_put_cons
);
330 curl_easy_setopt(curl
,CURLOPT_READDATA
,pp
);
331 curl_easy_perform(curl
);
332 curl_easy_cleanup(curl
);
334 slash
= index(my_url
+1,'/');
337 meta_got_copy(my_url
+1,slash
+1,me
);
345 curl_delete (char *bucket
, char *key
, char *url
)
353 curl
= curl_easy_init();
358 sprintf(fixed
,"http://%s:%u%s",proxy_host
,proxy_port
,url
);
359 curl_easy_setopt(curl
,CURLOPT_URL
,fixed
);
360 curl_easy_setopt(curl
,CURLOPT_CUSTOMREQUEST
,"DELETE");
361 curl_easy_perform(curl
);
362 curl_easy_cleanup(curl
);
368 curl_bcreate (char *bucket
)
372 DPRINTF("cannot create bucket in non-S3 mode\n");
373 /* TBD: pretend this works for testing, fix for release
374 rc = MHD_HTTP_NOT_IMPLEMENTED;
380 /***** CF-specific functions (TBD) *****/
382 /***** FS-specific functions (TBD) *****/
389 /* Start an FS _producer_. */
391 fs_get_child (void * ctx
)
398 fd
= open(ms
->url
+1,O_RDONLY
);
400 return THREAD_FAILED
;
404 bytes
= read(fd
,buf
,sizeof(buf
));
411 pipe_prod_signal(&ms
->pipe
,buf
,bytes
);
415 pipe_prod_finish(&ms
->pipe
);
417 DPRINTF("producer exiting\n");
421 /* Start an FS _consumer_. */
423 fs_put_child (void * ctx
)
425 pipe_private
*pp
= ctx
;
426 pipe_shared
*ps
= pp
->shared
;
427 my_state
*ms
= ps
->owner
;
432 fd
= open(ms
->url
+1,O_WRONLY
|O_CREAT
,0666);
434 return THREAD_FAILED
;
437 while (pipe_cons_wait(pp
)) {
441 ps
->data_ptr
+offset
,ps
->data_len
-offset
);
449 } while (offset
< ps
->data_len
);
450 pipe_cons_signal(pp
);
456 DPRINTF("%s returning\n",__func__
);
462 fs_delete (char *bucket
, char *key
, char *url
)
467 if (unlink(url
+1) < 0) {
476 fs_bcreate (char *bucket
)
478 DPRINTF("creating bucket %s\n",bucket
);
480 if (mkdir(bucket
,0777) < 0) {
482 return MHD_HTTP_INTERNAL_SERVER_ERROR
;
489 /***** Function tables. ****/
491 backend_func_tbl bad_func_tbl
= {
500 backend_func_tbl s3_func_tbl
= {
509 backend_func_tbl curl_func_tbl
= {
518 backend_func_tbl fs_func_tbl
= {