configure.ac: refer to a better URL for peg
[iwhd.git] / backend.c
blob0457b908c69fbd81bced007e8b919819dfa70376
1 #include <config.h>
3 #include <fcntl.h>
4 #include <getopt.h>
5 #include <poll.h>
6 #include <pthread.h>
7 #include <semaphore.h>
8 #include <stdint.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <strings.h>
13 #include <unistd.h>
14 #include <sys/stat.h>
15 #include <assert.h>
17 #include <microhttpd.h>
18 #include <curl/curl.h>
19 #include <hstor.h>
20 #include <glib.h>
22 #define GLOBALS_IMPL
23 #include "repo.h"
24 #include "meta.h"
25 #include "proxy.h"
26 #include "template.h"
27 #include "mpipe.h"
28 #include "backend.h"
29 #include "state_defs.h"
31 struct hstor_client *hstor;
33 /***** Stub functions for unimplemented stuff. *****/
35 void
36 bad_init (void)
38 DPRINTF("*** bad call to %s\n",__func__);
41 void *
42 bad_get_child (void * ctx)
44 (void)ctx;
46 DPRINTF("*** bad call to %s\n",__func__);
47 return NULL;
50 void *
51 bad_put_child (void * ctx)
53 (void)ctx;
55 DPRINTF("*** bad call to %s\n",__func__);
56 return NULL;
59 void *
60 bad_cache_child (void * ctx)
62 (void)ctx;
64 DPRINTF("*** bad call to %s\n",__func__);
65 return NULL;
68 int
69 bad_delete (char *bucket, char *key, char *url)
71 (void)bucket;
72 (void)key;
73 (void)url;
75 DPRINTF("*** bad call to %s\n",__func__);
76 return MHD_NO;
79 int
80 bad_bcreate (char *bucket)
82 (void)bucket;
84 DPRINTF("*** bad call to %s\n",__func__);
85 return MHD_HTTP_NOT_IMPLEMENTED;
88 /***** Generic functions shared by the HTTP back ends. */
90 /* Invoked from S3/CURL/CF. */
91 size_t
92 http_get_prod (void *ptr, size_t size, size_t nmemb, void *stream)
94 size_t total = size * nmemb;
95 pipe_shared *ps = stream;
97 DPRINTF("producer posting %zu bytes as %ld\n",total,ps->sequence+1);
98 pipe_prod_signal(ps,ptr,total);
100 DPRINTF("producer chunk finished\n");
101 return total;
104 /* Invoked from S3/CURL/CF. */
105 size_t
106 http_put_cons (void *ptr, size_t size, size_t nmemb, void *stream)
108 size_t total = size * nmemb;
109 pipe_private *pp = stream;
110 pipe_shared *ps = pp->shared;
111 size_t done;
113 DPRINTF("consumer asked to read %zu\n",total);
115 if (!pipe_cons_wait(pp)) {
116 return 0;
119 DPRINTF("consumer offset %zu into %zu\n",
120 pp->offset, ps->data_len);
121 done = ps->data_len - pp->offset;
122 if (done > total) {
123 done = total;
125 memcpy(ptr,ps->data_ptr+pp->offset,done);
126 pp->offset += done;
127 DPRINTF("consumer copied %zu, new offset %zu\n",
128 done, pp->offset);
129 if (pp->offset == ps->data_len) {
130 DPRINTF("consumer finished chunk\n");
131 pipe_cons_signal(pp);
134 return done;
137 /***** S3-specific functions *****/
139 void
140 s3_init (void)
142 char svc_acc[128];
144 snprintf(svc_acc,sizeof(svc_acc),"%s:%u",
145 proxy_host,proxy_port);
146 hstor = hstor_new(svc_acc,proxy_host,proxy_key,proxy_secret);
147 /* TBD: check return value */
148 if (verbose) {
149 hstor->verbose = 1;
153 /* Start an S3 _producer_. */
154 void *
155 s3_get_child (void * ctx)
157 my_state *ms = ctx;
159 hstor_get(hstor,ms->bucket,ms->key,http_get_prod,&ms->pipe,0);
160 /* TBD: check return value */
162 pipe_prod_finish(&ms->pipe);
164 DPRINTF("producer exiting\n");
165 return NULL;
168 /* Start an S3 _consumer_. */
169 void *
170 s3_put_child (void * ctx)
172 pipe_private *pp = ctx;
173 pipe_shared *ps = pp->shared;
174 my_state *ms = ps->owner;
175 curl_off_t llen;
176 const char *clen;
178 clen = MHD_lookup_connection_value(
179 ms->conn, MHD_HEADER_KIND, "Content-Length");
180 if (clen) {
181 llen = strtoll(clen,NULL,10);
183 else {
184 fprintf(stderr,"missing Content-Length\n");
185 llen = (curl_off_t)MHD_SIZE_UNKNOWN;
188 hstor_put(hstor,ms->bucket,ms->key,
189 http_put_cons,llen,pp,NULL);
190 /* TBD: check return value */
192 DPRINTF("%s returning\n",__func__);
193 free(pp);
194 return NULL;
198 s3_delete (char *bucket, char *key, char *url)
200 (void)url;
202 hstor_del(hstor,bucket,key);
203 /* TBD: check return value */
205 return MHD_YES;
209 s3_bcreate (char *bucket)
211 DPRINTF("creating bucket %s\n",bucket);
213 if (!hstor_add_bucket(hstor,bucket)) {
214 DPRINTF(" bucket create failed\n");
215 return MHD_HTTP_INTERNAL_SERVER_ERROR;
218 return MHD_HTTP_OK;
221 /***** CURL-specific functions *****/
223 void
224 curl_init (void)
228 /* Start a CURL _producer_. */
229 void *
230 curl_get_child (void * ctx)
232 char fixed[1024];
233 my_state *ms = ctx;
235 ms->curl = curl_easy_init();
236 if (!ms->curl) {
237 return NULL; /* TBD: flag error somehow */
239 ms->cleanup |= CLEANUP_CURL;
240 if (ms->from_master) {
241 sprintf(fixed,"http://%s:%u%s",
242 master_host, master_port, ms->url);
244 else {
245 sprintf(fixed,"http://%s:%u%s",
246 proxy_host, proxy_port, ms->url);
248 curl_easy_setopt(ms->curl,CURLOPT_URL,fixed);
249 curl_easy_setopt(ms->curl,CURLOPT_WRITEFUNCTION,
250 http_get_prod);
251 curl_easy_setopt(ms->curl,CURLOPT_WRITEDATA,&ms->pipe);
252 curl_easy_perform(ms->curl);
253 curl_easy_getinfo(ms->curl,CURLINFO_RESPONSE_CODE,&ms->rc);
255 pipe_prod_finish(&ms->pipe);
257 DPRINTF("producer exiting\n");
258 return NULL;
261 /* Start a CURL _consumer_. */
262 void *
263 curl_put_child (void * ctx)
265 pipe_private *pp = ctx;
266 pipe_shared *ps = pp->shared;
267 my_state *ms = ps->owner;
268 curl_off_t llen;
269 char fixed[1024];
270 CURL *curl;
271 const char *clen;
273 clen = MHD_lookup_connection_value(
274 ms->conn, MHD_HEADER_KIND, "Content-Length");
275 if (clen) {
276 llen = strtoll(clen,NULL,10);
278 else {
279 fprintf(stderr,"missing Content-Length\n");
280 llen = (curl_off_t)MHD_SIZE_UNKNOWN;
283 curl = curl_easy_init();
284 if (!curl) {
285 return THREAD_FAILED;
287 sprintf(fixed,"http://%s:%u%s",proxy_host,proxy_port,
288 ms->url);
289 curl_easy_setopt(curl,CURLOPT_URL,fixed);
290 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
291 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,llen);
292 curl_easy_setopt(curl,CURLOPT_READFUNCTION,http_put_cons);
293 curl_easy_setopt(curl,CURLOPT_READDATA,pp);
294 curl_easy_perform(curl);
295 curl_easy_cleanup(curl);
297 DPRINTF("%s returning\n",__func__);
298 free(pp);
299 return NULL;
302 /* Start a CURL cache consumer. */
303 void *
304 curl_cache_child (void * ctx)
306 pipe_private *pp = ctx;
307 pipe_shared *ps = pp->shared;
308 my_state *ms = ps->owner;
309 char fixed[1024];
310 CURL *curl;
311 char *slash;
312 char *my_url = strdup(ms->url);
314 if (!my_url) {
315 return THREAD_FAILED;
318 curl = curl_easy_init();
319 if (!curl) {
320 free(my_url);
321 return THREAD_FAILED;
323 sprintf(fixed,"http://%s:%u%s",proxy_host,proxy_port,
324 ms->url);
325 curl_easy_setopt(curl,CURLOPT_URL,fixed);
326 curl_easy_setopt(curl,CURLOPT_UPLOAD,1);
327 curl_easy_setopt(curl,CURLOPT_INFILESIZE_LARGE,
328 (curl_off_t)MHD_SIZE_UNKNOWN);
329 curl_easy_setopt(curl,CURLOPT_READFUNCTION,http_put_cons);
330 curl_easy_setopt(curl,CURLOPT_READDATA,pp);
331 curl_easy_perform(curl);
332 curl_easy_cleanup(curl);
334 slash = index(my_url+1,'/');
335 if (slash) {
336 *slash = '\0';
337 meta_got_copy(my_url+1,slash+1,me);
340 free(my_url);
341 return NULL;
345 curl_delete (char *bucket, char *key, char *url)
347 CURL *curl;
348 char fixed[1024];
350 (void)bucket;
351 (void)key;
353 curl = curl_easy_init();
354 if (!curl) {
355 return MHD_NO;
358 sprintf(fixed,"http://%s:%u%s",proxy_host,proxy_port,url);
359 curl_easy_setopt(curl,CURLOPT_URL,fixed);
360 curl_easy_setopt(curl,CURLOPT_CUSTOMREQUEST,"DELETE");
361 curl_easy_perform(curl);
362 curl_easy_cleanup(curl);
364 return MHD_YES;
368 curl_bcreate (char *bucket)
370 (void)bucket;
372 DPRINTF("cannot create bucket in non-S3 mode\n");
373 /* TBD: pretend this works for testing, fix for release
374 rc = MHD_HTTP_NOT_IMPLEMENTED;
376 return MHD_HTTP_OK;
380 /***** CF-specific functions (TBD) *****/
382 /***** FS-specific functions (TBD) *****/
384 void
385 fs_init (void)
389 /* Start an FS _producer_. */
390 void *
391 fs_get_child (void * ctx)
393 my_state *ms = ctx;
394 int fd;
395 char buf[1<<16];
396 ssize_t bytes;
398 fd = open(ms->url+1,O_RDONLY);
399 if (fd < 0) {
400 return THREAD_FAILED;
403 for (;;) {
404 bytes = read(fd,buf,sizeof(buf));
405 if (bytes <= 0) {
406 if (bytes < 0) {
407 perror("read");
409 break;
411 pipe_prod_signal(&ms->pipe,buf,bytes);
414 close(fd);
415 pipe_prod_finish(&ms->pipe);
417 DPRINTF("producer exiting\n");
418 return NULL;
421 /* Start an FS _consumer_. */
422 void *
423 fs_put_child (void * ctx)
425 pipe_private *pp = ctx;
426 pipe_shared *ps = pp->shared;
427 my_state *ms = ps->owner;
428 int fd;
429 ssize_t bytes;
430 size_t offset;
432 fd = open(ms->url+1,O_WRONLY|O_CREAT,0666);
433 if (fd < 0) {
434 return THREAD_FAILED;
437 while (pipe_cons_wait(pp)) {
438 offset = 0;
439 do {
440 bytes = write(fd,
441 ps->data_ptr+offset,ps->data_len-offset);
442 if (bytes <= 0) {
443 if (bytes < 0) {
444 perror("write");
446 goto done;
448 offset += bytes;
449 } while (offset < ps->data_len);
450 pipe_cons_signal(pp);
453 done:
454 close(fd);
456 DPRINTF("%s returning\n",__func__);
457 free(pp);
458 return NULL;
462 fs_delete (char *bucket, char *key, char *url)
464 (void)bucket;
465 (void)key;
467 if (unlink(url+1) < 0) {
468 perror("unlink");
469 return MHD_NO;
472 return MHD_YES;
476 fs_bcreate (char *bucket)
478 DPRINTF("creating bucket %s\n",bucket);
480 if (mkdir(bucket,0777) < 0) {
481 perror("mkdir");
482 return MHD_HTTP_INTERNAL_SERVER_ERROR;
485 return MHD_HTTP_OK;
489 /***** Function tables. ****/
491 backend_func_tbl bad_func_tbl = {
492 bad_init,
493 bad_get_child,
494 bad_put_child,
495 bad_cache_child,
496 bad_delete,
497 bad_bcreate,
500 backend_func_tbl s3_func_tbl = {
501 s3_init,
502 s3_get_child,
503 s3_put_child,
504 bad_cache_child,
505 s3_delete,
506 s3_bcreate,
509 backend_func_tbl curl_func_tbl = {
510 curl_init,
511 curl_get_child,
512 curl_put_child,
513 curl_cache_child,
514 curl_delete,
515 curl_bcreate,
518 backend_func_tbl fs_func_tbl = {
519 fs_init,
520 fs_get_child,
521 fs_put_child,
522 bad_cache_child,
523 fs_delete,
524 fs_bcreate,