1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
31 #include <microhttpd.h> /* for HTTP status values */
40 REPL_PUT
, /* store an object */
41 REPL_ODELETE
, /* delete an object */
42 REPL_BCREATE
, /* create a bucket */
43 /* TBD: bucket deletion, others? */
46 typedef struct _repl_item
{
47 struct _repl_item
*next
;
59 provider_t
*cur_server
;
62 static repl_item
*queue_head
= NULL
;
63 static repl_item
*queue_tail
= NULL
;
64 static pthread_mutex_t queue_lock
;
65 static sem_t queue_sema
;
66 static volatile gint rep_count
= 0;
69 proxy_repl_prod (void *ctx
)
71 repl_item
*item
= ctx
;
72 backend_thunk_t thunk
;
75 thunk
.parent
= item
->ms
;
76 thunk
.prov
= main_prov
;
78 result
= thunk
.prov
->func_tbl
->get_child_func(&thunk
);
83 proxy_repl_cons (void *ctx
)
85 repl_item
*item
= ctx
;
86 my_state
*ms
= item
->ms
;
89 pp
= pipe_init_private(&ms
->pipe
);
91 pipe_cons_siginit(&ms
->pipe
,-1);
95 pp
->prov
= item
->server
;
98 return item
->server
->func_tbl
->put_child_func(pp
);
102 repl_worker_del (const repl_item
*item
)
108 bucket
= strdup(item
->path
);
110 error(0,errno
,"ran out of memory replicating delete for %s",
115 key
= strchr(bucket
,'/');
117 error(0,0,"invalid path replicating delete for %s",item
->path
);
123 rc
= item
->server
->func_tbl
->delete_func(item
->server
,
124 bucket
, key
, item
->path
);
125 if (rc
!= MHD_HTTP_OK
) {
126 error(0,0,"got status %d replicating delete for %s",
131 DPRINTF("finished replicating delete for %s, rc = %d\n",item
->path
,rc
);
135 repl_worker_bcreate (repl_item
*item
)
139 rc
= item
->server
->func_tbl
->bcreate_func(item
->server
,item
->path
);
140 if (rc
!= MHD_HTTP_OK
) {
141 error(0,0,"got status %d replicating bcreate for %s",
145 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item
->path
,rc
);
148 /* Use this to diagnose failed thread creation. */
149 #define xpthread_create(thread, start_routine, item, msg) \
151 int err = pthread_create (thread, NULL, start_routine, item); \
153 error (0, err, msg); \
159 repl_worker (void *notused ATTRIBUTE_UNUSED
)
167 sem_wait(&queue_sema
);
168 pthread_mutex_lock(&queue_lock
);
170 queue_head
= item
->next
;
174 pthread_mutex_unlock(&queue_lock
);
177 * Do a full initialization here, not just in the rest. It's
178 * necessary in the oddball case where we're re-replicating as
179 * a result of an attribute/policy change, and it's not harmful
180 * in the normal case where we're actually storing a new file.
183 pipe_init_shared(&ms
->pipe
,ms
,1);
184 switch (item
->type
) {
186 if (pipe(item
->pipes
) >= 0) {
187 xpthread_create(&prod
,proxy_repl_prod
,item
,
188 "failed to start producer thread");
189 xpthread_create(&cons
,proxy_repl_cons
,item
,
190 "failed to start consumer thread");
191 pthread_join(prod
,NULL
);
192 pthread_join(cons
,NULL
);
195 error(0,errno
,"pipe");
199 repl_worker_del(item
);
202 repl_worker_bcreate(item
);
205 error(0,0,"bad repl type %d (url=%s) skipped",
206 item
->type
, item
->path
);
211 /* No atomic dec without test? Lame. */
212 (void)g_atomic_int_dec_and_test(&rep_count
);
221 sem_init(&queue_sema
,0,0);
222 pthread_mutex_init(&queue_lock
,NULL
);
223 pthread_create(&tid
,NULL
,repl_worker
,NULL
);
227 repl_oget (void *ctx
, const char *id
)
229 query_ctx_t
*qctx
= ctx
;
230 char *cur_value
= NULL
;
232 (void)meta_get_value(qctx
->cur_bucket
,qctx
->cur_key
,id
,&cur_value
);
238 repl_sget (void *ctx
, const char *id
)
240 query_ctx_t
*qctx
= ctx
;
241 provider_t
*prov
= qctx
->cur_server
;
243 if (!strcmp(id
,"name")) {
246 if (!strcmp(id
,"type")) {
249 if (!strcmp(id
,"host")) {
252 if (!strcmp(id
,"key")) {
253 return prov
->username
;
255 if (!strcmp(id
,"secret")) {
256 return prov
->password
;
258 if (!strcmp(id
,"path")) {
262 return g_hash_table_lookup(prov
->attrs
,id
);
266 replicate (const char *url
, size_t size
, const char *policy
, my_state
*ms
)
283 error(0,0,"could not parse url %s",url
);
286 qctx
.cur_bucket
= strtok_r(url2
,"/",&stctx
);
287 qctx
.cur_key
= strtok_r(NULL
,"/",&stctx
);
290 size
= meta_get_size(qctx
.cur_bucket
,qctx
.cur_key
);
291 DPRINTF("fetched size %zu for %s\n",size
,url
);
295 DPRINTF("--- policy = %s\n",policy
);
296 expr
= parse(policy
);
302 oget
.func
= repl_oget
;
304 sget
.func
= repl_sget
;
307 init_prov_iter(&iter
);
308 while (g_hash_table_iter_next(&iter
,&key
,&value
)) {
309 if (!strcmp(key
,me
)) {
312 prov
= (provider_t
*)value
;
314 qctx
.cur_server
= prov
;
315 res
= eval(expr
,&oget
,&sget
);
321 DPRINTF("skipping %s for %s\n",prov
->name
,url
);
324 DPRINTF("REPLICATING %s to %s\n",url
,prov
->name
);
325 item
= malloc(sizeof(*item
));
327 error(0,errno
,"could not create repl_item for %s",
331 item
->type
= REPL_PUT
;
332 item
->path
= strdup(url
);
334 error(0,errno
,"could not create repl_item for %s",
341 g_atomic_int_inc(&ms
->refcnt
);
342 pthread_mutex_lock(&queue_lock
);
344 item
->next
= queue_tail
->next
;
345 queue_tail
->next
= item
;
352 pthread_mutex_unlock(&queue_lock
);
353 g_atomic_int_inc(&rep_count
);
354 sem_post(&queue_sema
);
364 replicate_namespace_action (const char *name
, repl_t action
, my_state
*ms
)
371 init_prov_iter(&iter
);
372 while (g_hash_table_iter_next(&iter
,&key
,&value
)) {
373 if (!strcmp(key
,me
)) {
376 DPRINTF("replicating %s(%s) on %s\n",
377 (action
== REPL_ODELETE
? "delete" : "create"),
379 ((provider_t
*)value
)->name
);
380 item
= malloc(sizeof(*item
));
382 error(0,errno
,"could not create repl_item for %s",
387 item
->path
= strdup(name
);
392 item
->server
= (provider_t
*)value
;
394 g_atomic_int_inc(&ms
->refcnt
);
395 pthread_mutex_lock(&queue_lock
);
397 item
->next
= queue_tail
->next
;
398 queue_tail
->next
= item
;
405 pthread_mutex_unlock(&queue_lock
);
406 g_atomic_int_inc(&rep_count
);
407 sem_post(&queue_sema
);
412 replicate_delete (const char *name
, my_state
*ms
)
414 replicate_namespace_action(name
,REPL_ODELETE
,ms
);
418 replicate_bcreate (const char *name
, my_state
*ms
)
420 replicate_namespace_action(name
,REPL_BCREATE
,ms
);
423 /* Part of our API to the query module. */
425 follow_link (char *object
, const char *key
)
430 slash
= strchr(object
,'/');
436 (void)meta_get_value(object
,slash
,key
,&value
);
439 DPRINTF("%s: %s:%s => %s\n",__func__
,object
,key
,value
);
446 return g_atomic_int_get(&rep_count
);