use first gnulib module: progname
[iwhd.git] / replica.c
blob58544d413cba56bf2ced0197c904d1e73ee34ae7
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include <hstor.h>
31 #include <microhttpd.h> /* for HTTP status values */
33 #include "iwh.h"
34 #include "setup.h"
35 #include "query.h"
36 #include "meta.h"
37 #include "replica.h"
39 /* Sizes for internal string buffers. */
40 #define ADDR_SIZE 1024
41 #define SVC_ACC_SIZE 128
42 #define HEADER_SIZE 64
44 typedef enum {
45 REPL_PUT, /* store an object */
46 REPL_ODELETE, /* delete an object */
47 REPL_BCREATE, /* create a bucket */
48 /* TBD: bucket deletion, others? */
49 } repl_t;
51 typedef struct _repl_item {
52 struct _repl_item *next;
53 repl_t type;
54 char *path;
55 provider_t *server;
56 size_t size;
57 int pipes[2];
58 my_state *ms;
59 } repl_item;
61 typedef struct {
62 char *cur_bucket;
63 char *cur_key;
64 provider_t *cur_server;
65 } query_ctx_t;
67 static repl_item *queue_head = NULL;
68 static repl_item *queue_tail = NULL;
69 static pthread_mutex_t queue_lock;
70 static sem_t queue_sema;
71 static volatile gint rep_count = 0;
73 static void *
74 proxy_repl_prod (void *ctx)
76 repl_item *item = ctx;
77 backend_thunk_t thunk;
78 void *result;
80 thunk.parent = item->ms;
81 thunk.prov = main_prov;
83 result = thunk.prov->func_tbl->get_child_func(&thunk);
84 return result;
87 static void *
88 proxy_repl_cons (void *ctx)
90 repl_item *item = ctx;
91 my_state *ms = item->ms;
92 pipe_private *pp;
94 pp = pipe_init_private(&ms->pipe);
95 if (!pp) {
96 pipe_cons_siginit(&ms->pipe,-1);
97 return THREAD_FAILED;
100 pp->prov = item->server;
101 ms->be_flags = 0;
103 return item->server->func_tbl->put_child_func(pp);
106 static void
107 repl_worker_del (const repl_item *item)
109 char *bucket;
110 char *key;
111 int rc;
113 bucket = strdup(item->path);
114 if (!bucket) {
115 error(0,errno,"ran out of memory replicating delete for %s",
116 item->path);
117 return;
120 key = strchr(bucket,'/');
121 if (!key) {
122 error(0,0,"invalid path replicating delete for %s",item->path);
123 free(bucket);
124 return;
126 ++key;
128 rc = item->server->func_tbl->delete_func(item->server,
129 bucket, key, item->path);
130 if (rc != MHD_HTTP_OK) {
131 error(0,0,"got status %d replicating delete for %s",
132 rc, item->path);
134 free(bucket);
136 DPRINTF("finished replicating delete for %s, rc = %d\n",item->path,rc);
139 static void
140 repl_worker_bcreate (repl_item *item)
142 int rc;
144 rc = item->server->func_tbl->bcreate_func(item->server,item->path);
145 if (rc != MHD_HTTP_OK) {
146 error(0,0,"got status %d replicating bcreate for %s",
147 rc, item->path);
150 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item->path,rc);
153 /* Use this to diagnose failed thread creation. */
154 #define xpthread_create(thread, start_routine, item, msg) \
155 do { \
156 int err = pthread_create (thread, NULL, start_routine, item); \
157 if (err) { \
158 error (0, err, msg); \
159 return NULL; \
161 } while (0)
163 static void *
164 repl_worker (void *notused ATTRIBUTE_UNUSED)
166 repl_item *item;
167 pthread_t cons;
168 pthread_t prod;
169 my_state *ms;
171 for (;;) {
172 sem_wait(&queue_sema);
173 pthread_mutex_lock(&queue_lock);
174 item = queue_head;
175 queue_head = item->next;
176 if (!queue_head) {
177 queue_tail = NULL;
179 pthread_mutex_unlock(&queue_lock);
182 * Do a full initialization here, not just in the rest. It's
183 * necessary in the oddball case where we're re-replicating as
184 * a result of an attribute/policy change, and it's not harmful
185 * in the normal case where we're actually storing a new file.
187 ms = item->ms;
188 pipe_init_shared(&ms->pipe,ms,1);
189 switch (item->type) {
190 case REPL_PUT:
191 if (pipe(item->pipes) >= 0) {
192 xpthread_create(&prod,proxy_repl_prod,item,
193 "failed to start producer thread");
194 xpthread_create(&cons,proxy_repl_cons,item,
195 "failed to start consumer thread");
196 pthread_join(prod,NULL);
197 pthread_join(cons,NULL);
199 else {
200 error(0,errno,"pipe");
202 break;
203 case REPL_ODELETE:
204 repl_worker_del(item);
205 break;
206 case REPL_BCREATE:
207 repl_worker_bcreate(item);
208 break;
209 default:
210 error(0,0,"bad repl type %d (url=%s) skipped",
211 item->type, item->path);
213 free_ms(item->ms);
214 free(item->path);
215 free(item);
216 /* No atomic dec without test? Lame. */
217 (void)g_atomic_int_dec_and_test(&rep_count);
221 void
222 repl_init (void)
224 pthread_t tid;
226 sem_init(&queue_sema,0,0);
227 pthread_mutex_init(&queue_lock,NULL);
228 pthread_create(&tid,NULL,repl_worker,NULL);
231 static const char *
232 repl_oget (void *ctx, const char *id)
234 query_ctx_t *qctx = ctx;
235 char *cur_value = NULL;
237 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
239 return cur_value;
242 static const char *
243 repl_sget (void *ctx, const char *id)
245 query_ctx_t *qctx = ctx;
246 provider_t *prov = qctx->cur_server;
248 if (!strcmp(id,"name")) {
249 return prov->name;
251 if (!strcmp(id,"type")) {
252 return prov->type;
254 if (!strcmp(id,"host")) {
255 return prov->host;
257 if (!strcmp(id,"key")) {
258 return prov->username;
260 if (!strcmp(id,"secret")) {
261 return prov->password;
263 if (!strcmp(id,"path")) {
264 return prov->path;
267 return g_hash_table_lookup(prov->attrs,id);
270 void
271 replicate (const char *url, size_t size, const char *policy, my_state *ms)
273 repl_item *item;
274 value_t *expr;
275 int res;
276 char *url2;
277 char *stctx;
278 query_ctx_t qctx;
279 getter_t oget;
280 getter_t sget;
281 GHashTableIter iter;
282 gpointer key;
283 gpointer value;
284 provider_t *prov;
286 url2 = strdup(url);
287 if (!url2) {
288 error(0,0,"could not parse url %s",url);
289 return;
291 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
292 qctx.cur_key = strtok_r(NULL,"/",&stctx);
294 if (!size) {
295 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
296 DPRINTF("fetched size %zu for %s\n",size,url);
299 if (policy) {
300 DPRINTF("--- policy = %s\n",policy);
301 expr = parse(policy);
303 else {
304 expr = NULL;
307 oget.func = repl_oget;
308 oget.ctx = &qctx;
309 sget.func = repl_sget;
310 sget.ctx = &qctx;
312 init_prov_iter(&iter);
313 while (g_hash_table_iter_next(&iter,&key,&value)) {
314 if (!strcmp(key,me)) {
315 continue;
317 prov = (provider_t *)value;
318 if (expr) {
319 qctx.cur_server = prov;
320 res = eval(expr,&oget,&sget);
322 else {
323 res = 0;
325 if (res <= 0) {
326 DPRINTF("skipping %s for %s\n",prov->name,url);
327 continue;
329 DPRINTF("REPLICATING %s to %s\n",url,prov->name);
330 item = malloc(sizeof(*item));
331 if (!item) {
332 error(0,errno,"could not create repl_item for %s",
333 url);
334 break;
336 item->type = REPL_PUT;
337 item->path = strdup(url);
338 if (!item->path) {
339 error(0,errno,"could not create repl_item for %s",
340 url);
341 break;
343 item->server = prov;
344 item->size = size;
345 item->ms = ms;
346 g_atomic_int_inc(&ms->refcnt);
347 pthread_mutex_lock(&queue_lock);
348 if (queue_tail) {
349 item->next = queue_tail->next;
350 queue_tail->next = item;
352 else {
353 item->next = NULL;
354 queue_head = item;
356 queue_tail = item;
357 pthread_mutex_unlock(&queue_lock);
358 g_atomic_int_inc(&rep_count);
359 sem_post(&queue_sema);
362 if (expr) {
363 free_value(expr);
365 free(url2);
368 static void
369 replicate_namespace_action (const char *name, repl_t action, my_state *ms)
371 repl_item *item;
372 GHashTableIter iter;
373 gpointer key;
374 gpointer value;
376 init_prov_iter(&iter);
377 while (g_hash_table_iter_next(&iter,&key,&value)) {
378 if (!strcmp(key,me)) {
379 continue;
381 DPRINTF("replicating %s(%s) on %s\n",
382 (action == REPL_ODELETE ? "delete" : "create"),
383 name,
384 ((provider_t *)value)->name);
385 item = malloc(sizeof(*item));
386 if (!item) {
387 error(0,errno,"could not create repl_item for %s",
388 name);
389 return;
391 item->type = action;
392 item->path = strdup(name);
393 if (!item->path) {
394 free(item);
395 return;
397 item->server = (provider_t *)value;
398 item->ms = ms;
399 g_atomic_int_inc(&ms->refcnt);
400 pthread_mutex_lock(&queue_lock);
401 if (queue_tail) {
402 item->next = queue_tail->next;
403 queue_tail->next = item;
405 else {
406 item->next = NULL;
407 queue_head = item;
409 queue_tail = item;
410 pthread_mutex_unlock(&queue_lock);
411 g_atomic_int_inc(&rep_count);
412 sem_post(&queue_sema);
416 void
417 replicate_delete (const char *name, my_state *ms)
419 replicate_namespace_action(name,REPL_ODELETE,ms);
422 void
423 replicate_bcreate (const char *name, my_state *ms)
425 replicate_namespace_action(name,REPL_BCREATE,ms);
428 /* Part of our API to the query module. */
429 char *
430 follow_link (char *object, const char *key)
432 char *slash;
433 char *value = NULL;
435 slash = strchr(object,'/');
436 if (!slash) {
437 return NULL;
440 *(slash++) = '\0';
441 (void)meta_get_value(object,slash,key,&value);
442 *(--slash) = '/';
444 DPRINTF("%s: %s:%s => %s\n",__func__,object,key,value);
445 return value;
449 get_rep_count (void)
451 return g_atomic_int_get(&rep_count);