use symbolic names in place of more hard-coded constants
[iwhd.git] / replica.c
blob3efe65c69ec25d0ff10273314a32ce92d5f4cee9
1 /* Copyright (C) 2010-2011 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include <hstor.h>
31 #include <microhttpd.h> /* for HTTP status values */
33 #include "iwh.h"
34 #include "setup.h"
35 #include "query.h"
36 #include "meta.h"
37 #include "replica.h"
39 typedef enum {
40 REPL_PUT, /* store an object */
41 REPL_ODELETE, /* delete an object */
42 REPL_BCREATE, /* create a bucket */
43 /* TBD: bucket deletion, others? */
44 } repl_t;
46 typedef struct _repl_item {
47 struct _repl_item *next;
48 repl_t type;
49 char *path;
50 provider_t *server;
51 size_t size;
52 int pipes[2];
53 my_state *ms;
54 } repl_item;
56 typedef struct {
57 char *cur_bucket;
58 char *cur_key;
59 provider_t *cur_server;
60 } query_ctx_t;
62 static repl_item *queue_head = NULL;
63 static repl_item *queue_tail = NULL;
64 static pthread_mutex_t queue_lock;
65 static sem_t queue_sema;
66 static volatile gint rep_count = 0;
68 static void *
69 proxy_repl_prod (void *ctx)
71 repl_item *item = ctx;
72 backend_thunk_t thunk;
73 void *result;
75 thunk.parent = item->ms;
76 thunk.prov = get_main_provider();
78 result = thunk.prov->func_tbl->get_child_func(&thunk);
79 return result;
82 static void *
83 proxy_repl_cons (void *ctx)
85 repl_item *item = ctx;
86 my_state *ms = item->ms;
87 pipe_private *pp;
89 pp = pipe_init_private(&ms->pipe);
90 if (!pp) {
91 pipe_cons_siginit(&ms->pipe,-1);
92 return THREAD_FAILED;
95 pp->prov = item->server;
96 ms->be_flags = 0;
98 return item->server->func_tbl->put_child_func(pp);
101 static void
102 repl_worker_del (const repl_item *item)
104 char *bucket;
105 char *key;
106 int rc;
108 bucket = strdup(item->path);
109 if (!bucket) {
110 error(0,errno,"ran out of memory replicating delete for %s",
111 item->path);
112 return;
115 key = strchr(bucket,'/');
116 if (!key) {
117 error(0,0,"invalid path replicating delete for %s",item->path);
118 return;
120 ++key;
122 rc = item->server->func_tbl->delete_func(item->server,
123 bucket, key, item->path);
124 if (rc != MHD_HTTP_OK) {
125 error(0,0,"got status %d replicating delete for %s",
126 rc, item->path);
129 DPRINTF("finished replicating delete for %s, rc = %d\n",item->path,rc);
132 static void
133 repl_worker_bcreate (repl_item *item)
135 int rc;
137 rc = item->server->func_tbl->bcreate_func(item->server,item->path);
138 if (rc != MHD_HTTP_OK) {
139 error(0,0,"got status %d replicating bcreate for %s",
140 rc, item->path);
143 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item->path,rc);
146 /* Use this to diagnose failed thread creation. */
147 #define xpthread_create(thread, start_routine, item, msg) \
148 do { \
149 int err = pthread_create (thread, NULL, start_routine, item); \
150 if (err) { \
151 error (0, err, msg); \
152 return NULL; \
154 } while (0)
156 static void *
157 repl_worker (void *notused ATTRIBUTE_UNUSED)
159 repl_item *item;
160 pthread_t cons;
161 pthread_t prod;
162 my_state *ms;
164 for (;;) {
165 sem_wait(&queue_sema);
166 pthread_mutex_lock(&queue_lock);
167 item = queue_head;
168 queue_head = item->next;
169 if (!queue_head) {
170 queue_tail = NULL;
172 pthread_mutex_unlock(&queue_lock);
175 * Do a full initialization here, not just in the rest. It's
176 * necessary in the oddball case where we're re-replicating as
177 * a result of an attribute/policy change, and it's not harmful
178 * in the normal case where we're actually storing a new file.
180 ms = item->ms;
181 pipe_init_shared(&ms->pipe,ms,1);
182 switch (item->type) {
183 case REPL_PUT:
184 if (pipe(item->pipes) >= 0) {
185 xpthread_create(&prod,proxy_repl_prod,item,
186 "failed to start producer thread");
187 xpthread_create(&cons,proxy_repl_cons,item,
188 "failed to start consumer thread");
189 pthread_join(prod,NULL);
190 pthread_join(cons,NULL);
192 else {
193 error(0,errno,"pipe");
195 break;
196 case REPL_ODELETE:
197 repl_worker_del(item);
198 break;
199 case REPL_BCREATE:
200 repl_worker_bcreate(item);
201 break;
202 default:
203 error(0,0,"bad repl type %d (url=%s) skipped",
204 item->type, item->path);
206 /* No atomic dec without test? Lame. */
207 (void)g_atomic_int_dec_and_test(&rep_count);
211 void
212 repl_init (void)
214 pthread_t tid;
216 sem_init(&queue_sema,0,0);
217 pthread_mutex_init(&queue_lock,NULL);
218 pthread_create(&tid,NULL,repl_worker,NULL);
221 static const char *
222 repl_oget (void *ctx, const char *id)
224 query_ctx_t *qctx = ctx;
225 char *cur_value = NULL;
227 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
229 return cur_value;
232 static const char *
233 repl_sget (void *ctx, const char *id)
235 query_ctx_t *qctx = ctx;
236 provider_t *prov = qctx->cur_server;
238 if (!strcmp(id,"name")) {
239 return prov->name;
241 if (!strcmp(id,"type")) {
242 return prov->type;
244 if (!strcmp(id,"host")) {
245 return prov->host;
247 if (!strcmp(id,"key")) {
248 return prov->username;
250 if (!strcmp(id,"secret")) {
251 return prov->password;
253 if (!strcmp(id,"path")) {
254 return prov->path;
257 struct kv_pair kv;
258 kv.key = (char *) id;
259 struct kv_pair *p = hash_lookup (prov->attrs, &kv);
261 return p ? p->val : NULL;
264 void
265 replicate (const char *url, size_t size, const char *policy, my_state *ms)
267 repl_item *item;
268 value_t *expr;
269 int res;
270 char *url2;
271 char *stctx;
272 query_ctx_t qctx;
273 getter_t oget;
274 getter_t sget;
276 url2 = strdup(url);
277 if (!url2) {
278 error(0,0,"could not parse url %s",url);
279 return;
281 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
282 qctx.cur_key = strtok_r(NULL,"/",&stctx);
284 if (!size) {
285 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
286 DPRINTF("fetched size %zu for %s\n",size,url);
289 if (policy) {
290 DPRINTF("--- policy = %s\n",policy);
291 expr = parse(policy);
293 else {
294 expr = NULL;
297 oget.func = repl_oget;
298 oget.ctx = &qctx;
299 sget.func = repl_sget;
300 sget.ctx = &qctx;
302 provider_t *prov;
303 for (prov = hash_get_first_prov (); prov;
304 prov = hash_get_next_prov (prov)) {
305 if (!strcmp(prov->name, me)) {
306 continue;
308 if (expr) {
309 qctx.cur_server = prov;
310 res = eval(expr,&oget,&sget);
312 else {
313 res = 0;
315 if (res <= 0) {
316 DPRINTF("skipping %s for %s\n",prov->name,url);
317 continue;
319 DPRINTF("REPLICATING %s to %s\n",url,prov->name);
320 item = malloc(sizeof(*item));
321 if (!item) {
322 error(0,errno,"could not create repl_item for %s",
323 url);
324 break;
326 item->type = REPL_PUT;
327 item->path = strdup(url);
328 if (!item->path) {
329 error(0,errno,"could not create repl_item for %s",
330 url);
331 break;
333 item->server = prov;
334 item->size = size;
335 item->ms = ms;
336 pthread_mutex_lock(&queue_lock);
337 if (queue_tail) {
338 item->next = queue_tail->next;
339 queue_tail->next = item;
341 else {
342 item->next = NULL;
343 queue_head = item;
345 queue_tail = item;
346 pthread_mutex_unlock(&queue_lock);
347 g_atomic_int_inc(&rep_count);
348 sem_post(&queue_sema);
352 static void
353 replicate_namespace_action (const char *name, repl_t action, my_state *ms)
355 provider_t *prov;
356 for (prov = hash_get_first_prov (); prov;
357 prov = hash_get_next_prov (prov)) {
358 if (!strcmp(prov->name, me)) {
359 continue;
361 DPRINTF("replicating %s(%s) on %s\n",
362 (action == REPL_ODELETE ? "delete" : "create"),
363 name, prov->name);
364 repl_item *item = malloc(sizeof(*item));
365 if (!item) {
366 error(0,errno,"could not create repl_item for %s",
367 name);
368 return;
370 item->type = action;
371 item->path = strdup(name);
372 if (!item->path) {
373 return;
375 item->server = prov;
376 item->ms = ms;
377 pthread_mutex_lock(&queue_lock);
378 if (queue_tail) {
379 item->next = queue_tail->next;
380 queue_tail->next = item;
382 else {
383 item->next = NULL;
384 queue_head = item;
386 queue_tail = item;
387 pthread_mutex_unlock(&queue_lock);
388 g_atomic_int_inc(&rep_count);
389 sem_post(&queue_sema);
393 void
394 replicate_delete (const char *name, my_state *ms)
396 replicate_namespace_action(name,REPL_ODELETE,ms);
399 void
400 replicate_bcreate (const char *name, my_state *ms)
402 replicate_namespace_action(name,REPL_BCREATE,ms);
405 /* Part of our API to the query module. */
406 char *
407 follow_link (char *object, const char *key)
409 char *slash;
410 char *value = NULL;
412 slash = strchr(object,'/');
413 if (!slash) {
414 return NULL;
417 *(slash++) = '\0';
418 (void)meta_get_value(object,slash,key,&value);
419 *(--slash) = '/';
421 DPRINTF("%s: %s:%s => %s\n",__func__,object,key,value);
422 return value;
426 get_rep_count (void)
428 return g_atomic_int_get(&rep_count);