avoid file descriptor leak in replication
[iwhd.git] / replica.c
blobee1034a58481d61a3c9c441219b7fff9f890fadd
1 /* Copyright (C) 2010-2011 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16 #include <config.h>
18 #include <errno.h>
19 #include <error.h>
20 #include <fcntl.h>
21 #include <poll.h>
22 #include <pthread.h>
23 #include <semaphore.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include <hstor.h>
31 #include <microhttpd.h> /* for HTTP status values */
33 #include "iwh.h"
34 #include "setup.h"
35 #include "query.h"
36 #include "meta.h"
37 #include "replica.h"
39 typedef enum {
40 REPL_PUT, /* store an object */
41 REPL_ODELETE, /* delete an object */
42 REPL_BCREATE, /* create a bucket */
43 /* TBD: bucket deletion, others? */
44 } repl_t;
46 typedef struct _repl_item {
47 struct _repl_item *next;
48 repl_t type;
49 char *path;
50 provider_t *server;
51 size_t size;
52 my_state *ms;
53 } repl_item;
55 typedef struct {
56 char *cur_bucket;
57 char *cur_key;
58 provider_t *cur_server;
59 } query_ctx_t;
61 static repl_item *queue_head = NULL;
62 static repl_item *queue_tail = NULL;
63 static pthread_mutex_t queue_lock;
64 static sem_t queue_sema;
65 static volatile gint rep_count = 0;
67 static void *
68 proxy_repl_prod (void *ctx)
70 repl_item *item = ctx;
71 backend_thunk_t thunk;
72 void *result;
74 thunk.parent = item->ms;
75 thunk.prov = get_main_provider();
77 result = thunk.prov->func_tbl->get_child_func(&thunk);
78 return result;
81 static void *
82 proxy_repl_cons (void *ctx)
84 repl_item *item = ctx;
85 my_state *ms = item->ms;
86 pipe_private *pp;
88 pp = pipe_init_private(&ms->pipe);
89 if (!pp) {
90 pipe_cons_siginit(&ms->pipe,-1);
91 return THREAD_FAILED;
94 pp->prov = item->server;
95 ms->be_flags = 0;
97 return item->server->func_tbl->put_child_func(pp);
100 static void
101 repl_worker_del (const repl_item *item)
103 char *bucket;
104 char *key;
105 int rc;
107 bucket = strdup(item->path);
108 if (!bucket) {
109 error(0,errno,"ran out of memory replicating delete for %s",
110 item->path);
111 return;
114 key = strchr(bucket,'/');
115 if (!key) {
116 error(0,0,"invalid path replicating delete for %s",item->path);
117 return;
119 ++key;
121 rc = item->server->func_tbl->delete_func(item->server,
122 bucket, key, item->path);
123 if (rc != MHD_HTTP_OK) {
124 error(0,0,"got status %d replicating delete for %s",
125 rc, item->path);
128 DPRINTF("finished replicating delete for %s, rc = %d\n",item->path,rc);
131 static void
132 repl_worker_bcreate (repl_item *item)
134 int rc;
136 rc = item->server->func_tbl->bcreate_func(item->server,item->path);
137 if (rc != MHD_HTTP_OK) {
138 error(0,0,"got status %d replicating bcreate for %s",
139 rc, item->path);
142 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item->path,rc);
145 /* Use this to diagnose failed thread creation. */
146 #define xpthread_create(thread, start_routine, item, msg) \
147 do { \
148 int err = pthread_create (thread, NULL, start_routine, item); \
149 if (err) { \
150 error (0, err, msg); \
151 return NULL; \
153 } while (0)
155 static void *
156 repl_worker (void *notused ATTRIBUTE_UNUSED)
158 repl_item *item;
159 pthread_t cons;
160 pthread_t prod;
161 my_state *ms;
163 for (;;) {
164 sem_wait(&queue_sema);
165 pthread_mutex_lock(&queue_lock);
166 item = queue_head;
167 queue_head = item->next;
168 if (!queue_head) {
169 queue_tail = NULL;
171 pthread_mutex_unlock(&queue_lock);
174 * Do a full initialization here, not just in the rest. It's
175 * necessary in the oddball case where we're re-replicating as
176 * a result of an attribute/policy change, and it's not harmful
177 * in the normal case where we're actually storing a new file.
179 ms = item->ms;
180 pipe_init_shared(&ms->pipe,ms,1);
181 switch (item->type) {
182 case REPL_PUT:
183 xpthread_create(&prod,proxy_repl_prod,item,
184 "failed to start producer thread");
185 xpthread_create(&cons,proxy_repl_cons,item,
186 "failed to start consumer thread");
187 pthread_join(prod,NULL);
188 pthread_join(cons,NULL);
189 break;
190 case REPL_ODELETE:
191 repl_worker_del(item);
192 break;
193 case REPL_BCREATE:
194 repl_worker_bcreate(item);
195 break;
196 default:
197 error(0,0,"bad repl type %d (url=%s) skipped",
198 item->type, item->path);
200 /* No atomic dec without test? Lame. */
201 (void)g_atomic_int_dec_and_test(&rep_count);
205 void
206 repl_init (void)
208 pthread_t tid;
210 sem_init(&queue_sema,0,0);
211 pthread_mutex_init(&queue_lock,NULL);
212 pthread_create(&tid,NULL,repl_worker,NULL);
215 static const char *
216 repl_oget (void *ctx, const char *id)
218 query_ctx_t *qctx = ctx;
219 char *cur_value = NULL;
221 (void)meta_get_value(qctx->cur_bucket,qctx->cur_key,id,&cur_value);
223 return cur_value;
226 static const char *
227 repl_sget (void *ctx, const char *id)
229 query_ctx_t *qctx = ctx;
230 provider_t *prov = qctx->cur_server;
232 if (!strcmp(id,"name")) {
233 return prov->name;
235 if (!strcmp(id,"type")) {
236 return prov->type;
238 if (!strcmp(id,"host")) {
239 return prov->host;
241 if (!strcmp(id,"key")) {
242 return prov->username;
244 if (!strcmp(id,"secret")) {
245 return prov->password;
247 if (!strcmp(id,"path")) {
248 return prov->path;
251 struct kv_pair kv;
252 kv.key = (char *) id;
253 struct kv_pair *p = hash_lookup (prov->attrs, &kv);
255 return p ? p->val : NULL;
258 void
259 replicate (const char *url, size_t size, const char *policy, my_state *ms)
261 repl_item *item;
262 value_t *expr;
263 int res;
264 char *url2;
265 char *stctx;
266 query_ctx_t qctx;
267 getter_t oget;
268 getter_t sget;
270 url2 = strdup(url);
271 if (!url2) {
272 error(0,0,"could not parse url %s",url);
273 return;
275 qctx.cur_bucket = strtok_r(url2,"/",&stctx);
276 qctx.cur_key = strtok_r(NULL,"/",&stctx);
278 if (!size) {
279 size = meta_get_size(qctx.cur_bucket,qctx.cur_key);
280 DPRINTF("fetched size %zu for %s\n",size,url);
283 if (policy) {
284 DPRINTF("--- policy = %s\n",policy);
285 expr = parse(policy);
287 else {
288 expr = NULL;
291 oget.func = repl_oget;
292 oget.ctx = &qctx;
293 sget.func = repl_sget;
294 sget.ctx = &qctx;
296 size_t n_prov;
297 provider_t **prov_list = hash_get_prov_list (&n_prov);
298 if (prov_list == NULL) {
299 DPRINTF("failed to allocate space for provider list\n");
300 return;
303 size_t i;
304 for (i = 0; i < n_prov; i++) {
305 provider_t *prov = prov_list[i];
306 if (!strcmp(prov->name, me)) {
307 continue;
309 if (expr) {
310 qctx.cur_server = prov;
311 res = eval(expr,&oget,&sget);
313 else {
314 res = 0;
316 if (res <= 0) {
317 DPRINTF("skipping %s for %s\n",prov->name,url);
318 continue;
320 DPRINTF("REPLICATING %s to %s\n",url,prov->name);
321 item = malloc(sizeof(*item));
322 if (!item) {
323 error(0,errno,"could not create repl_item for %s",
324 url);
325 break;
327 item->type = REPL_PUT;
328 item->path = strdup(url);
329 if (!item->path) {
330 error(0,errno,"could not create repl_item for %s",
331 url);
332 break;
334 item->server = prov;
335 item->size = size;
336 item->ms = ms;
337 pthread_mutex_lock(&queue_lock);
338 if (queue_tail) {
339 item->next = queue_tail->next;
340 queue_tail->next = item;
342 else {
343 item->next = NULL;
344 queue_head = item;
346 queue_tail = item;
347 pthread_mutex_unlock(&queue_lock);
348 g_atomic_int_inc(&rep_count);
349 sem_post(&queue_sema);
353 static void
354 replicate_namespace_action (const char *name, repl_t action, my_state *ms)
356 size_t n_prov;
357 provider_t **prov_list = hash_get_prov_list (&n_prov);
358 if (prov_list == NULL) {
359 DPRINTF("failed to allocate space for provider list\n");
360 return;
363 size_t i;
364 for (i = 0; i < n_prov; i++) {
365 provider_t *prov = prov_list[i];
366 if (!strcmp(prov->name, me)) {
367 continue;
369 DPRINTF("replicating %s(%s) on %s\n",
370 (action == REPL_ODELETE ? "delete" : "create"),
371 name, prov->name);
372 repl_item *item = malloc(sizeof(*item));
373 if (!item) {
374 error(0,errno,"could not create repl_item for %s",
375 name);
376 return;
378 item->type = action;
379 item->path = strdup(name);
380 if (!item->path) {
381 return;
383 item->server = prov;
384 item->ms = ms;
385 pthread_mutex_lock(&queue_lock);
386 if (queue_tail) {
387 item->next = queue_tail->next;
388 queue_tail->next = item;
390 else {
391 item->next = NULL;
392 queue_head = item;
394 queue_tail = item;
395 pthread_mutex_unlock(&queue_lock);
396 g_atomic_int_inc(&rep_count);
397 sem_post(&queue_sema);
401 void
402 replicate_delete (const char *name, my_state *ms)
404 replicate_namespace_action(name,REPL_ODELETE,ms);
407 void
408 replicate_bcreate (const char *name, my_state *ms)
410 replicate_namespace_action(name,REPL_BCREATE,ms);
413 /* Part of our API to the query module. */
414 char *
415 follow_link (char *object, const char *key)
417 char *slash;
418 char *value = NULL;
420 slash = strchr(object,'/');
421 if (!slash) {
422 return NULL;
425 *(slash++) = '\0';
426 (void)meta_get_value(object,slash,key,&value);
427 *(--slash) = '/';
429 DPRINTF("%s: %s:%s => %s\n",__func__,object,key,value);
430 return value;
434 get_rep_count (void)
436 return g_atomic_int_get(&rep_count);