build: make the "rpm" rule work once again
[iwhd.git] / replica.c
blobf0b4b51754705b86cc49097047601f109d26b341
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include <hstor.h>
31 #include <microhttpd.h> /* for HTTP status values */
33 #include "iwh.h"
34 #include "setup.h"
35 #include "query.h"
36 #include "meta.h"
37 #include "replica.h"
39 typedef enum {
40 REPL_PUT, /* store an object */
41 REPL_ODELETE, /* delete an object */
42 REPL_BCREATE, /* create a bucket */
43 /* TBD: bucket deletion, others? */
44 } repl_t;
46 typedef struct _repl_item {
47 struct _repl_item *next;
48 repl_t type;
49 char *path;
50 provider_t *server;
51 size_t size;
52 int pipes[2];
53 my_state *ms;
54 } repl_item;
56 typedef struct {
57 char *cur_bucket;
58 char *cur_key;
59 provider_t *cur_server;
60 } query_ctx_t;
62 static repl_item *queue_head = NULL;
63 static repl_item *queue_tail = NULL;
64 static pthread_mutex_t queue_lock;
65 static sem_t queue_sema;
66 static volatile gint rep_count = 0;
68 static void *
69 proxy_repl_prod (void *ctx)
71 repl_item *item = ctx;
72 backend_thunk_t thunk;
73 void *result;
75 thunk.parent = item->ms;
76 thunk.prov = main_prov;
78 result = thunk.prov->func_tbl->get_child_func(&thunk);
79 return result;
82 static void *
83 proxy_repl_cons (void *ctx)
85 repl_item *item = ctx;
86 my_state *ms = item->ms;
87 pipe_private *pp;
89 pp = pipe_init_private(&ms->pipe);
90 if (!pp) {
91 pipe_cons_siginit(&ms->pipe,-1);
92 return THREAD_FAILED;
95 pp->prov = item->server;
96 ms->be_flags = 0;
98 return item->server->func_tbl->put_child_func(pp);
101 static void
102 repl_worker_del (const repl_item *item)
104 char *bucket;
105 char *key;
106 int rc;
108 bucket = strdup(item->path);
109 if (!bucket) {
110 error(0,errno,"ran out of memory replicating delete for %s",
111 item->path);
112 return;
115 key = strchr(bucket,'/');
116 if (!key) {
117 error(0,0,"invalid path replicating delete for %s",item->path);
118 free(bucket);
119 return;
121 ++key;
123 rc = item->server->func_tbl->delete_func(item->server,
124 bucket, key, item->path);
125 if (rc != MHD_HTTP_OK) {
126 error(0,0,"got status %d replicating delete for %s",
127 rc, item->path);
129 free(bucket);
131 DPRINTF("finished replicating delete for %s, rc = %d\n",item->path,rc);
134 static void
135 repl_worker_bcreate (repl_item *item)
137 int rc;
139 rc = item->server->func_tbl->bcreate_func(item->server,item->path);
140 if (rc != MHD_HTTP_OK) {
141 error(0,0,"got status %d replicating bcreate for %s",
142 rc, item->path);
145 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item->path,rc);
148 /* Use this to diagnose failed thread creation. */
149 #define xpthread_create(thread, start_routine, item, msg) \
150 do { \
151 int err = pthread_create (thread, NULL, start_routine, item); \
152 if (err) { \
153 error (0, err, msg); \
154 return NULL; \
156 } while (0)
158 static void *
159 repl_worker (void *notused ATTRIBUTE_UNUSED)
161 repl_item *item;
162 pthread_t cons;
163 pthread_t prod;
164 my_state *ms;
166 for (;;) {
167 sem_wait(&queue_sema);
168 pthread_mutex_lock(&queue_lock);
169 item = queue_head;
170 queue_head = item->next;
171 if (!queue_head) {
172 queue_tail = NULL;
174 pthread_mutex_unlock(&queue_lock);
177 * Do a full initialization here, not just in the rest. It's
178 * necessary in the oddball case where we're re-replicating as
179 * a result of an attribute/policy change, and it's not harmful
180 * in the normal case where we're actually storing a new file.
182 ms = item->ms;
183 pipe_init_shared(&ms->pipe,ms,1);
184 switch (item->type) {
185 case REPL_PUT:
186 if (pipe(item->pipes) >= 0) {
187 xpthread_create(&prod,proxy_repl_prod,item,
188 "failed to start producer thread");
189 xpthread_create(&cons,proxy_repl_cons,item,
190 "failed to start consumer thread");
191 pthread_join(prod,NULL);
192 pthread_join(cons,NULL);
194 else {
195 error(0,errno,"pipe");
197 break;
198 case REPL_ODELETE:
199 repl_worker_del(item);
200 break;
201 case REPL_BCREATE:
202 repl_worker_bcreate(item);
203 break;
204 default:
205 error(0,0,"bad repl type %d (url=%s) skipped",
206 item->type, item->path);
208 free_ms(item->ms);
209 free(item->path);
210 free(item);
211 /* No atomic dec without test? Lame. */
212 (void)g_atomic_int_dec_and_test(&rep_count);
216 void
217 repl_init (void)
219 pthread_t tid;
221 sem_init(&queue_sema,0,0);
222 pthread_mutex_init(&queue_lock,NULL);
223 pthread_create(&tid,NULL,repl_worker,NULL);
226 static const char *
227 repl_oget (void *ctx, const char *id)
229 query_ctx_t *qctx = ctx;
230 char *cur_value = NULL;
232 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
234 return cur_value;
237 static const char *
238 repl_sget (void *ctx, const char *id)
240 query_ctx_t *qctx = ctx;
241 provider_t *prov = qctx->cur_server;
243 if (!strcmp(id,"name")) {
244 return prov->name;
246 if (!strcmp(id,"type")) {
247 return prov->type;
249 if (!strcmp(id,"host")) {
250 return prov->host;
252 if (!strcmp(id,"key")) {
253 return prov->username;
255 if (!strcmp(id,"secret")) {
256 return prov->password;
258 if (!strcmp(id,"path")) {
259 return prov->path;
262 return g_hash_table_lookup(prov->attrs,id);
265 void
266 replicate (const char *url, size_t size, const char *policy, my_state *ms)
268 repl_item *item;
269 value_t *expr;
270 int res;
271 char *url2;
272 char *stctx;
273 query_ctx_t qctx;
274 getter_t oget;
275 getter_t sget;
276 GHashTableIter iter;
277 gpointer key;
278 gpointer value;
279 provider_t *prov;
281 url2 = strdup(url);
282 if (!url2) {
283 error(0,0,"could not parse url %s",url);
284 return;
286 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
287 qctx.cur_key = strtok_r(NULL,"/",&stctx);
289 if (!size) {
290 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
291 DPRINTF("fetched size %zu for %s\n",size,url);
294 if (policy) {
295 DPRINTF("--- policy = %s\n",policy);
296 expr = parse(policy);
298 else {
299 expr = NULL;
302 oget.func = repl_oget;
303 oget.ctx = &qctx;
304 sget.func = repl_sget;
305 sget.ctx = &qctx;
307 init_prov_iter(&iter);
308 while (g_hash_table_iter_next(&iter,&key,&value)) {
309 if (!strcmp(key,me)) {
310 continue;
312 prov = (provider_t *)value;
313 if (expr) {
314 qctx.cur_server = prov;
315 res = eval(expr,&oget,&sget);
317 else {
318 res = 0;
320 if (res <= 0) {
321 DPRINTF("skipping %s for %s\n",prov->name,url);
322 continue;
324 DPRINTF("REPLICATING %s to %s\n",url,prov->name);
325 item = malloc(sizeof(*item));
326 if (!item) {
327 error(0,errno,"could not create repl_item for %s",
328 url);
329 break;
331 item->type = REPL_PUT;
332 item->path = strdup(url);
333 if (!item->path) {
334 error(0,errno,"could not create repl_item for %s",
335 url);
336 break;
338 item->server = prov;
339 item->size = size;
340 item->ms = ms;
341 g_atomic_int_inc(&ms->refcnt);
342 pthread_mutex_lock(&queue_lock);
343 if (queue_tail) {
344 item->next = queue_tail->next;
345 queue_tail->next = item;
347 else {
348 item->next = NULL;
349 queue_head = item;
351 queue_tail = item;
352 pthread_mutex_unlock(&queue_lock);
353 g_atomic_int_inc(&rep_count);
354 sem_post(&queue_sema);
357 if (expr) {
358 free_value(expr);
360 free(url2);
363 static void
364 replicate_namespace_action (const char *name, repl_t action, my_state *ms)
366 repl_item *item;
367 GHashTableIter iter;
368 gpointer key;
369 gpointer value;
371 init_prov_iter(&iter);
372 while (g_hash_table_iter_next(&iter,&key,&value)) {
373 if (!strcmp(key,me)) {
374 continue;
376 DPRINTF("replicating %s(%s) on %s\n",
377 (action == REPL_ODELETE ? "delete" : "create"),
378 name,
379 ((provider_t *)value)->name);
380 item = malloc(sizeof(*item));
381 if (!item) {
382 error(0,errno,"could not create repl_item for %s",
383 name);
384 return;
386 item->type = action;
387 item->path = strdup(name);
388 if (!item->path) {
389 free(item);
390 return;
392 item->server = (provider_t *)value;
393 item->ms = ms;
394 g_atomic_int_inc(&ms->refcnt);
395 pthread_mutex_lock(&queue_lock);
396 if (queue_tail) {
397 item->next = queue_tail->next;
398 queue_tail->next = item;
400 else {
401 item->next = NULL;
402 queue_head = item;
404 queue_tail = item;
405 pthread_mutex_unlock(&queue_lock);
406 g_atomic_int_inc(&rep_count);
407 sem_post(&queue_sema);
411 void
412 replicate_delete (const char *name, my_state *ms)
414 replicate_namespace_action(name,REPL_ODELETE,ms);
417 void
418 replicate_bcreate (const char *name, my_state *ms)
420 replicate_namespace_action(name,REPL_BCREATE,ms);
423 /* Part of our API to the query module. */
424 char *
425 follow_link (char *object, const char *key)
427 char *slash;
428 char *value = NULL;
430 slash = strchr(object,'/');
431 if (!slash) {
432 return NULL;
435 *(slash++) = '\0';
436 (void)meta_get_value(object,slash,key,&value);
437 *(--slash) = '/';
439 DPRINTF("%s: %s:%s => %s\n",__func__,object,key,value);
440 return value;
444 get_rep_count (void)
446 return g_atomic_int_get(&rep_count);