remove functions and struct members that are no longer needed
[iwhd.git] / replica.c
blob266da20bd15ab1dac5b2910888534bf9bea41a1f
1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include <hstor.h>
31 #include <microhttpd.h> /* for HTTP status values */
33 #include "iwh.h"
34 #include "setup.h"
35 #include "query.h"
36 #include "meta.h"
37 #include "replica.h"
39 typedef enum {
40 REPL_PUT, /* store an object */
41 REPL_ODELETE, /* delete an object */
42 REPL_BCREATE, /* create a bucket */
43 /* TBD: bucket deletion, others? */
44 } repl_t;
46 typedef struct _repl_item {
47 struct _repl_item *next;
48 repl_t type;
49 char *path;
50 provider_t *server;
51 size_t size;
52 int pipes[2];
53 my_state *ms;
54 } repl_item;
56 typedef struct {
57 char *cur_bucket;
58 char *cur_key;
59 provider_t *cur_server;
60 } query_ctx_t;
62 static repl_item *queue_head = NULL;
63 static repl_item *queue_tail = NULL;
64 static pthread_mutex_t queue_lock;
65 static sem_t queue_sema;
66 static volatile gint rep_count = 0;
68 static void *
69 proxy_repl_prod (void *ctx)
71 repl_item *item = ctx;
72 backend_thunk_t thunk;
73 void *result;
75 thunk.parent = item->ms;
76 thunk.prov = get_main_provider();
78 result = thunk.prov->func_tbl->get_child_func(&thunk);
79 return result;
82 static void *
83 proxy_repl_cons (void *ctx)
85 repl_item *item = ctx;
86 my_state *ms = item->ms;
87 pipe_private *pp;
89 pp = pipe_init_private(&ms->pipe);
90 if (!pp) {
91 pipe_cons_siginit(&ms->pipe,-1);
92 return THREAD_FAILED;
95 pp->prov = item->server;
96 ms->be_flags = 0;
98 return item->server->func_tbl->put_child_func(pp);
101 static void
102 repl_worker_del (const repl_item *item)
104 char *bucket;
105 char *key;
106 int rc;
108 bucket = strdup(item->path);
109 if (!bucket) {
110 error(0,errno,"ran out of memory replicating delete for %s",
111 item->path);
112 return;
115 key = strchr(bucket,'/');
116 if (!key) {
117 error(0,0,"invalid path replicating delete for %s",item->path);
118 free(bucket);
119 return;
121 ++key;
123 rc = item->server->func_tbl->delete_func(item->server,
124 bucket, key, item->path);
125 if (rc != MHD_HTTP_OK) {
126 error(0,0,"got status %d replicating delete for %s",
127 rc, item->path);
129 free(bucket);
131 DPRINTF("finished replicating delete for %s, rc = %d\n",item->path,rc);
134 static void
135 repl_worker_bcreate (repl_item *item)
137 int rc;
139 rc = item->server->func_tbl->bcreate_func(item->server,item->path);
140 if (rc != MHD_HTTP_OK) {
141 error(0,0,"got status %d replicating bcreate for %s",
142 rc, item->path);
145 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item->path,rc);
148 /* Use this to diagnose failed thread creation. */
149 #define xpthread_create(thread, start_routine, item, msg) \
150 do { \
151 int err = pthread_create (thread, NULL, start_routine, item); \
152 if (err) { \
153 error (0, err, msg); \
154 return NULL; \
156 } while (0)
158 static void *
159 repl_worker (void *notused ATTRIBUTE_UNUSED)
161 repl_item *item;
162 pthread_t cons;
163 pthread_t prod;
164 my_state *ms;
166 for (;;) {
167 sem_wait(&queue_sema);
168 pthread_mutex_lock(&queue_lock);
169 item = queue_head;
170 queue_head = item->next;
171 if (!queue_head) {
172 queue_tail = NULL;
174 pthread_mutex_unlock(&queue_lock);
177 * Do a full initialization here, not just in the rest. It's
178 * necessary in the oddball case where we're re-replicating as
179 * a result of an attribute/policy change, and it's not harmful
180 * in the normal case where we're actually storing a new file.
182 ms = item->ms;
183 pipe_init_shared(&ms->pipe,ms,1);
184 switch (item->type) {
185 case REPL_PUT:
186 if (pipe(item->pipes) >= 0) {
187 xpthread_create(&prod,proxy_repl_prod,item,
188 "failed to start producer thread");
189 xpthread_create(&cons,proxy_repl_cons,item,
190 "failed to start consumer thread");
191 pthread_join(prod,NULL);
192 pthread_join(cons,NULL);
194 else {
195 error(0,errno,"pipe");
197 break;
198 case REPL_ODELETE:
199 repl_worker_del(item);
200 break;
201 case REPL_BCREATE:
202 repl_worker_bcreate(item);
203 break;
204 default:
205 error(0,0,"bad repl type %d (url=%s) skipped",
206 item->type, item->path);
208 free(item->path);
209 free(item);
210 /* No atomic dec without test? Lame. */
211 (void)g_atomic_int_dec_and_test(&rep_count);
215 void
216 repl_init (void)
218 pthread_t tid;
220 sem_init(&queue_sema,0,0);
221 pthread_mutex_init(&queue_lock,NULL);
222 pthread_create(&tid,NULL,repl_worker,NULL);
225 static const char *
226 repl_oget (void *ctx, const char *id)
228 query_ctx_t *qctx = ctx;
229 char *cur_value = NULL;
231 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
233 return cur_value;
236 static const char *
237 repl_sget (void *ctx, const char *id)
239 query_ctx_t *qctx = ctx;
240 provider_t *prov = qctx->cur_server;
242 if (!strcmp(id,"name")) {
243 return prov->name;
245 if (!strcmp(id,"type")) {
246 return prov->type;
248 if (!strcmp(id,"host")) {
249 return prov->host;
251 if (!strcmp(id,"key")) {
252 return prov->username;
254 if (!strcmp(id,"secret")) {
255 return prov->password;
257 if (!strcmp(id,"path")) {
258 return prov->path;
261 return g_hash_table_lookup(prov->attrs,id);
264 void
265 replicate (const char *url, size_t size, const char *policy, my_state *ms)
267 repl_item *item;
268 value_t *expr;
269 int res;
270 char *url2;
271 char *stctx;
272 query_ctx_t qctx;
273 getter_t oget;
274 getter_t sget;
275 GHashTableIter iter;
276 gpointer key;
277 gpointer value;
278 provider_t *prov;
280 url2 = strdup(url);
281 if (!url2) {
282 error(0,0,"could not parse url %s",url);
283 return;
285 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
286 qctx.cur_key = strtok_r(NULL,"/",&stctx);
288 if (!size) {
289 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
290 DPRINTF("fetched size %zu for %s\n",size,url);
293 if (policy) {
294 DPRINTF("--- policy = %s\n",policy);
295 expr = parse(policy);
297 else {
298 expr = NULL;
301 oget.func = repl_oget;
302 oget.ctx = &qctx;
303 sget.func = repl_sget;
304 sget.ctx = &qctx;
306 init_prov_iter(&iter);
307 while (g_hash_table_iter_next(&iter,&key,&value)) {
308 if (!strcmp(key,me)) {
309 continue;
311 prov = (provider_t *)value;
312 if (expr) {
313 qctx.cur_server = prov;
314 res = eval(expr,&oget,&sget);
316 else {
317 res = 0;
319 if (res <= 0) {
320 DPRINTF("skipping %s for %s\n",prov->name,url);
321 continue;
323 DPRINTF("REPLICATING %s to %s\n",url,prov->name);
324 item = malloc(sizeof(*item));
325 if (!item) {
326 error(0,errno,"could not create repl_item for %s",
327 url);
328 break;
330 item->type = REPL_PUT;
331 item->path = strdup(url);
332 if (!item->path) {
333 error(0,errno,"could not create repl_item for %s",
334 url);
335 break;
337 item->server = prov;
338 item->size = size;
339 item->ms = ms;
340 pthread_mutex_lock(&queue_lock);
341 if (queue_tail) {
342 item->next = queue_tail->next;
343 queue_tail->next = item;
345 else {
346 item->next = NULL;
347 queue_head = item;
349 queue_tail = item;
350 pthread_mutex_unlock(&queue_lock);
351 g_atomic_int_inc(&rep_count);
352 sem_post(&queue_sema);
355 free(url2);
358 static void
359 replicate_namespace_action (const char *name, repl_t action, my_state *ms)
361 repl_item *item;
362 GHashTableIter iter;
363 gpointer key;
364 gpointer value;
366 init_prov_iter(&iter);
367 while (g_hash_table_iter_next(&iter,&key,&value)) {
368 if (!strcmp(key,me)) {
369 continue;
371 DPRINTF("replicating %s(%s) on %s\n",
372 (action == REPL_ODELETE ? "delete" : "create"),
373 name,
374 ((provider_t *)value)->name);
375 item = malloc(sizeof(*item));
376 if (!item) {
377 error(0,errno,"could not create repl_item for %s",
378 name);
379 return;
381 item->type = action;
382 item->path = strdup(name);
383 if (!item->path) {
384 free(item);
385 return;
387 item->server = (provider_t *)value;
388 item->ms = ms;
389 pthread_mutex_lock(&queue_lock);
390 if (queue_tail) {
391 item->next = queue_tail->next;
392 queue_tail->next = item;
394 else {
395 item->next = NULL;
396 queue_head = item;
398 queue_tail = item;
399 pthread_mutex_unlock(&queue_lock);
400 g_atomic_int_inc(&rep_count);
401 sem_post(&queue_sema);
405 void
406 replicate_delete (const char *name, my_state *ms)
408 replicate_namespace_action(name,REPL_ODELETE,ms);
411 void
412 replicate_bcreate (const char *name, my_state *ms)
414 replicate_namespace_action(name,REPL_BCREATE,ms);
417 /* Part of our API to the query module. */
418 char *
419 follow_link (char *object, const char *key)
421 char *slash;
422 char *value = NULL;
424 slash = strchr(object,'/');
425 if (!slash) {
426 return NULL;
429 *(slash++) = '\0';
430 (void)meta_get_value(object,slash,key,&value);
431 *(--slash) = '/';
433 DPRINTF("%s: %s:%s => %s\n",__func__,object,key,value);
434 return value;
438 get_rep_count (void)
440 return g_atomic_int_get(&rep_count);