From 5d6ba6e4dc02acadd6489678e5d273e078edb79b Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Tue, 14 Sep 2010 10:10:47 -0400 Subject: [PATCH] Teach replication module ("proxy.c") how to use a filesystem-backed primary store. Needed for EC2 bundling. --- proxy.c | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/proxy.c b/proxy.c index 025faec..8523a38 100644 --- a/proxy.c +++ b/proxy.c @@ -232,6 +232,62 @@ junk_writer (void *ptr, size_t size, size_t nmemb, void *stream) } void * +proxy_repl_prod_fs (void *ctx) +{ + repl_item *item = ctx; + int ifd; + int ofd; + char buf[1<<16]; + ssize_t ibytes; + ssize_t obytes; + ssize_t offset; + + DPRINTF("replicating from %s (FS)\n",item->url); + + ifd = open(item->url,O_RDONLY); + if (ifd < 0) { + perror("ifd open"); + return THREAD_FAILED; + } + ofd = item->pipes[1]; + + for (;;) { + ibytes = read(ifd,buf,sizeof(buf)); + if (ibytes <= 0) { + if (ibytes < 0) { + perror("read"); + } + else { + DPRINTF("EOF on ifd\n"); + } + break; + } + offset = 0; + do { + obytes = write(ofd,buf+offset,ibytes); + if (obytes <= 0) { + if (obytes < 0) { + perror("ofd write"); + } + else { + DPRINTF("zero-length write on ofd\n"); + } + break; + } + ibytes -= obytes; + offset += obytes; + } while (ibytes > 0); + } + + close(ifd); + close(ofd); + + DPRINTF("%s returning\n",__func__); + close(item->pipes[1]); + return NULL; +} + +void * proxy_repl_prod (void *ctx) { repl_item *item = ctx; @@ -395,7 +451,7 @@ proxy_repl_cons (void *ctx) key = strtok_r(NULL,"/",&stctx); if (!strcasecmp(s_type,"s3")) { - DPRINTF("replicating %zu to %s%s (S3)\n",item->size,s_host, + DPRINTF("replicating %zu to %s/%s (S3)\n",item->size,s_host, item->url); snprintf(svc_acc,sizeof(svc_acc),"%s:%u",s_host,s_port); hstor = hstor_new(svc_acc,s_host,s_key,s_secret); @@ -529,7 +585,14 @@ repl_worker (void *notused ATTRIBUTE_UNUSED) switch (item->type) { case REPL_PUT: if (pipe(item->pipes) >= 0) { - pthread_create(&prod,NULL,proxy_repl_prod,item); + if (proxy_host) { + pthread_create(&prod,NULL, + proxy_repl_prod,item); + } + else { + pthread_create(&prod,NULL, + proxy_repl_prod_fs,item); + } pthread_create(&cons,NULL,proxy_repl_cons,item); pthread_join(prod,NULL); pthread_join(cons,NULL); -- 2.11.4.GIT