1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
31 #include <microhttpd.h> /* for HTTP status values */
40 REPL_PUT
, /* store an object */
41 REPL_ODELETE
, /* delete an object */
42 REPL_BCREATE
, /* create a bucket */
43 /* TBD: bucket deletion, others? */
46 typedef struct _repl_item
{
47 struct _repl_item
*next
;
59 provider_t
*cur_server
;
62 static repl_item
*queue_head
= NULL
;
63 static repl_item
*queue_tail
= NULL
;
64 static pthread_mutex_t queue_lock
;
65 static sem_t queue_sema
;
66 static volatile gint rep_count
= 0;
69 proxy_repl_prod (void *ctx
)
71 repl_item
*item
= ctx
;
72 backend_thunk_t thunk
;
75 thunk
.parent
= item
->ms
;
76 thunk
.prov
= get_main_provider();
78 result
= thunk
.prov
->func_tbl
->get_child_func(&thunk
);
83 proxy_repl_cons (void *ctx
)
85 repl_item
*item
= ctx
;
86 my_state
*ms
= item
->ms
;
89 pp
= pipe_init_private(&ms
->pipe
);
91 pipe_cons_siginit(&ms
->pipe
,-1);
95 pp
->prov
= item
->server
;
98 return item
->server
->func_tbl
->put_child_func(pp
);
102 repl_worker_del (const repl_item
*item
)
108 bucket
= strdup(item
->path
);
110 error(0,errno
,"ran out of memory replicating delete for %s",
115 key
= strchr(bucket
,'/');
117 error(0,0,"invalid path replicating delete for %s",item
->path
);
123 rc
= item
->server
->func_tbl
->delete_func(item
->server
,
124 bucket
, key
, item
->path
);
125 if (rc
!= MHD_HTTP_OK
) {
126 error(0,0,"got status %d replicating delete for %s",
131 DPRINTF("finished replicating delete for %s, rc = %d\n",item
->path
,rc
);
135 repl_worker_bcreate (repl_item
*item
)
139 rc
= item
->server
->func_tbl
->bcreate_func(item
->server
,item
->path
);
140 if (rc
!= MHD_HTTP_OK
) {
141 error(0,0,"got status %d replicating bcreate for %s",
145 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item
->path
,rc
);
148 /* Use this to diagnose failed thread creation. */
149 #define xpthread_create(thread, start_routine, item, msg) \
151 int err = pthread_create (thread, NULL, start_routine, item); \
153 error (0, err, msg); \
159 repl_worker (void *notused ATTRIBUTE_UNUSED
)
167 sem_wait(&queue_sema
);
168 pthread_mutex_lock(&queue_lock
);
170 queue_head
= item
->next
;
174 pthread_mutex_unlock(&queue_lock
);
177 * Do a full initialization here, not just in the rest. It's
178 * necessary in the oddball case where we're re-replicating as
179 * a result of an attribute/policy change, and it's not harmful
180 * in the normal case where we're actually storing a new file.
183 pipe_init_shared(&ms
->pipe
,ms
,1);
184 switch (item
->type
) {
186 if (pipe(item
->pipes
) >= 0) {
187 xpthread_create(&prod
,proxy_repl_prod
,item
,
188 "failed to start producer thread");
189 xpthread_create(&cons
,proxy_repl_cons
,item
,
190 "failed to start consumer thread");
191 pthread_join(prod
,NULL
);
192 pthread_join(cons
,NULL
);
195 error(0,errno
,"pipe");
199 repl_worker_del(item
);
202 repl_worker_bcreate(item
);
205 error(0,0,"bad repl type %d (url=%s) skipped",
206 item
->type
, item
->path
);
210 /* No atomic dec without test? Lame. */
211 (void)g_atomic_int_dec_and_test(&rep_count
);
220 sem_init(&queue_sema
,0,0);
221 pthread_mutex_init(&queue_lock
,NULL
);
222 pthread_create(&tid
,NULL
,repl_worker
,NULL
);
226 repl_oget (void *ctx
, const char *id
)
228 query_ctx_t
*qctx
= ctx
;
229 char *cur_value
= NULL
;
231 (void)meta_get_value(qctx
->cur_bucket
,qctx
->cur_key
,id
,&cur_value
);
237 repl_sget (void *ctx
, const char *id
)
239 query_ctx_t
*qctx
= ctx
;
240 provider_t
*prov
= qctx
->cur_server
;
242 if (!strcmp(id
,"name")) {
245 if (!strcmp(id
,"type")) {
248 if (!strcmp(id
,"host")) {
251 if (!strcmp(id
,"key")) {
252 return prov
->username
;
254 if (!strcmp(id
,"secret")) {
255 return prov
->password
;
257 if (!strcmp(id
,"path")) {
261 return g_hash_table_lookup(prov
->attrs
,id
);
265 replicate (const char *url
, size_t size
, const char *policy
, my_state
*ms
)
282 error(0,0,"could not parse url %s",url
);
285 qctx
.cur_bucket
= strtok_r(url2
,"/",&stctx
);
286 qctx
.cur_key
= strtok_r(NULL
,"/",&stctx
);
289 size
= meta_get_size(qctx
.cur_bucket
,qctx
.cur_key
);
290 DPRINTF("fetched size %zu for %s\n",size
,url
);
294 DPRINTF("--- policy = %s\n",policy
);
295 expr
= parse(policy
);
301 oget
.func
= repl_oget
;
303 sget
.func
= repl_sget
;
306 init_prov_iter(&iter
);
307 while (g_hash_table_iter_next(&iter
,&key
,&value
)) {
308 if (!strcmp(key
,me
)) {
311 prov
= (provider_t
*)value
;
313 qctx
.cur_server
= prov
;
314 res
= eval(expr
,&oget
,&sget
);
320 DPRINTF("skipping %s for %s\n",prov
->name
,url
);
323 DPRINTF("REPLICATING %s to %s\n",url
,prov
->name
);
324 item
= malloc(sizeof(*item
));
326 error(0,errno
,"could not create repl_item for %s",
330 item
->type
= REPL_PUT
;
331 item
->path
= strdup(url
);
333 error(0,errno
,"could not create repl_item for %s",
340 pthread_mutex_lock(&queue_lock
);
342 item
->next
= queue_tail
->next
;
343 queue_tail
->next
= item
;
350 pthread_mutex_unlock(&queue_lock
);
351 g_atomic_int_inc(&rep_count
);
352 sem_post(&queue_sema
);
359 replicate_namespace_action (const char *name
, repl_t action
, my_state
*ms
)
366 init_prov_iter(&iter
);
367 while (g_hash_table_iter_next(&iter
,&key
,&value
)) {
368 if (!strcmp(key
,me
)) {
371 DPRINTF("replicating %s(%s) on %s\n",
372 (action
== REPL_ODELETE
? "delete" : "create"),
374 ((provider_t
*)value
)->name
);
375 item
= malloc(sizeof(*item
));
377 error(0,errno
,"could not create repl_item for %s",
382 item
->path
= strdup(name
);
387 item
->server
= (provider_t
*)value
;
389 pthread_mutex_lock(&queue_lock
);
391 item
->next
= queue_tail
->next
;
392 queue_tail
->next
= item
;
399 pthread_mutex_unlock(&queue_lock
);
400 g_atomic_int_inc(&rep_count
);
401 sem_post(&queue_sema
);
406 replicate_delete (const char *name
, my_state
*ms
)
408 replicate_namespace_action(name
,REPL_ODELETE
,ms
);
412 replicate_bcreate (const char *name
, my_state
*ms
)
414 replicate_namespace_action(name
,REPL_BCREATE
,ms
);
417 /* Part of our API to the query module. */
419 follow_link (char *object
, const char *key
)
424 slash
= strchr(object
,'/');
430 (void)meta_get_value(object
,slash
,key
,&value
);
433 DPRINTF("%s: %s:%s => %s\n",__func__
,object
,key
,value
);
440 return g_atomic_int_get(&rep_count
);