1 /* Copyright (C) 2010-2011 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
31 #include <microhttpd.h> /* for HTTP status values */
40 REPL_PUT
, /* store an object */
41 REPL_ODELETE
, /* delete an object */
42 REPL_BCREATE
, /* create a bucket */
43 /* TBD: bucket deletion, others? */
46 typedef struct _repl_item
{
47 struct _repl_item
*next
;
59 provider_t
*cur_server
;
62 static repl_item
*queue_head
= NULL
;
63 static repl_item
*queue_tail
= NULL
;
64 static pthread_mutex_t queue_lock
;
65 static sem_t queue_sema
;
66 static volatile gint rep_count
= 0;
69 proxy_repl_prod (void *ctx
)
71 repl_item
*item
= ctx
;
72 backend_thunk_t thunk
;
75 thunk
.parent
= item
->ms
;
76 thunk
.prov
= get_main_provider();
78 result
= thunk
.prov
->func_tbl
->get_child_func(&thunk
);
83 proxy_repl_cons (void *ctx
)
85 repl_item
*item
= ctx
;
86 my_state
*ms
= item
->ms
;
89 pp
= pipe_init_private(&ms
->pipe
);
91 pipe_cons_siginit(&ms
->pipe
,-1);
95 pp
->prov
= item
->server
;
98 return item
->server
->func_tbl
->put_child_func(pp
);
102 repl_worker_del (const repl_item
*item
)
108 bucket
= strdup(item
->path
);
110 error(0,errno
,"ran out of memory replicating delete for %s",
115 key
= strchr(bucket
,'/');
117 error(0,0,"invalid path replicating delete for %s",item
->path
);
122 rc
= item
->server
->func_tbl
->delete_func(item
->server
,
123 bucket
, key
, item
->path
);
124 if (rc
!= MHD_HTTP_OK
) {
125 error(0,0,"got status %d replicating delete for %s",
129 DPRINTF("finished replicating delete for %s, rc = %d\n",item
->path
,rc
);
133 repl_worker_bcreate (repl_item
*item
)
137 rc
= item
->server
->func_tbl
->bcreate_func(item
->server
,item
->path
);
138 if (rc
!= MHD_HTTP_OK
) {
139 error(0,0,"got status %d replicating bcreate for %s",
143 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item
->path
,rc
);
146 /* Use this to diagnose failed thread creation. */
147 #define xpthread_create(thread, start_routine, item, msg) \
149 int err = pthread_create (thread, NULL, start_routine, item); \
151 error (0, err, msg); \
157 repl_worker (void *notused ATTRIBUTE_UNUSED
)
165 sem_wait(&queue_sema
);
166 pthread_mutex_lock(&queue_lock
);
168 queue_head
= item
->next
;
172 pthread_mutex_unlock(&queue_lock
);
175 * Do a full initialization here, not just in the rest. It's
176 * necessary in the oddball case where we're re-replicating as
177 * a result of an attribute/policy change, and it's not harmful
178 * in the normal case where we're actually storing a new file.
181 pipe_init_shared(&ms
->pipe
,ms
,1);
182 switch (item
->type
) {
184 if (pipe(item
->pipes
) >= 0) {
185 xpthread_create(&prod
,proxy_repl_prod
,item
,
186 "failed to start producer thread");
187 xpthread_create(&cons
,proxy_repl_cons
,item
,
188 "failed to start consumer thread");
189 pthread_join(prod
,NULL
);
190 pthread_join(cons
,NULL
);
193 error(0,errno
,"pipe");
197 repl_worker_del(item
);
200 repl_worker_bcreate(item
);
203 error(0,0,"bad repl type %d (url=%s) skipped",
204 item
->type
, item
->path
);
206 /* No atomic dec without test? Lame. */
207 (void)g_atomic_int_dec_and_test(&rep_count
);
216 sem_init(&queue_sema
,0,0);
217 pthread_mutex_init(&queue_lock
,NULL
);
218 pthread_create(&tid
,NULL
,repl_worker
,NULL
);
222 repl_oget (void *ctx
, const char *id
)
224 query_ctx_t
*qctx
= ctx
;
225 char *cur_value
= NULL
;
227 (void)meta_get_value(qctx
->cur_bucket
,qctx
->cur_key
,id
,&cur_value
);
233 repl_sget (void *ctx
, const char *id
)
235 query_ctx_t
*qctx
= ctx
;
236 provider_t
*prov
= qctx
->cur_server
;
238 if (!strcmp(id
,"name")) {
241 if (!strcmp(id
,"type")) {
244 if (!strcmp(id
,"host")) {
247 if (!strcmp(id
,"key")) {
248 return prov
->username
;
250 if (!strcmp(id
,"secret")) {
251 return prov
->password
;
253 if (!strcmp(id
,"path")) {
258 kv
.key
= (char *) id
;
259 struct kv_pair
*p
= hash_lookup (prov
->attrs
, &kv
);
261 return p
? p
->val
: NULL
;
265 replicate (const char *url
, size_t size
, const char *policy
, my_state
*ms
)
278 error(0,0,"could not parse url %s",url
);
281 qctx
.cur_bucket
= strtok_r(url2
,"/",&stctx
);
282 qctx
.cur_key
= strtok_r(NULL
,"/",&stctx
);
285 size
= meta_get_size(qctx
.cur_bucket
,qctx
.cur_key
);
286 DPRINTF("fetched size %zu for %s\n",size
,url
);
290 DPRINTF("--- policy = %s\n",policy
);
291 expr
= parse(policy
);
297 oget
.func
= repl_oget
;
299 sget
.func
= repl_sget
;
303 for (prov
= hash_get_first_prov (); prov
;
304 prov
= hash_get_next_prov (prov
)) {
305 if (!strcmp(prov
->name
, me
)) {
309 qctx
.cur_server
= prov
;
310 res
= eval(expr
,&oget
,&sget
);
316 DPRINTF("skipping %s for %s\n",prov
->name
,url
);
319 DPRINTF("REPLICATING %s to %s\n",url
,prov
->name
);
320 item
= malloc(sizeof(*item
));
322 error(0,errno
,"could not create repl_item for %s",
326 item
->type
= REPL_PUT
;
327 item
->path
= strdup(url
);
329 error(0,errno
,"could not create repl_item for %s",
336 pthread_mutex_lock(&queue_lock
);
338 item
->next
= queue_tail
->next
;
339 queue_tail
->next
= item
;
346 pthread_mutex_unlock(&queue_lock
);
347 g_atomic_int_inc(&rep_count
);
348 sem_post(&queue_sema
);
353 replicate_namespace_action (const char *name
, repl_t action
, my_state
*ms
)
356 for (prov
= hash_get_first_prov (); prov
;
357 prov
= hash_get_next_prov (prov
)) {
358 if (!strcmp(prov
->name
, me
)) {
361 DPRINTF("replicating %s(%s) on %s\n",
362 (action
== REPL_ODELETE
? "delete" : "create"),
364 repl_item
*item
= malloc(sizeof(*item
));
366 error(0,errno
,"could not create repl_item for %s",
371 item
->path
= strdup(name
);
377 pthread_mutex_lock(&queue_lock
);
379 item
->next
= queue_tail
->next
;
380 queue_tail
->next
= item
;
387 pthread_mutex_unlock(&queue_lock
);
388 g_atomic_int_inc(&rep_count
);
389 sem_post(&queue_sema
);
394 replicate_delete (const char *name
, my_state
*ms
)
396 replicate_namespace_action(name
,REPL_ODELETE
,ms
);
400 replicate_bcreate (const char *name
, my_state
*ms
)
402 replicate_namespace_action(name
,REPL_BCREATE
,ms
);
405 /* Part of our API to the query module. */
407 follow_link (char *object
, const char *key
)
412 slash
= strchr(object
,'/');
418 (void)meta_get_value(object
,slash
,key
,&value
);
421 DPRINTF("%s: %s:%s => %s\n",__func__
,object
,key
,value
);
428 return g_atomic_int_get(&rep_count
);