1 /* Copyright (C) 2010-2011 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
23 #include <semaphore.h>
31 #include <microhttpd.h> /* for HTTP status values */
40 REPL_PUT
, /* store an object */
41 REPL_ODELETE
, /* delete an object */
42 REPL_BCREATE
, /* create a bucket */
43 /* TBD: bucket deletion, others? */
46 typedef struct _repl_item
{
47 struct _repl_item
*next
;
58 provider_t
*cur_server
;
61 static repl_item
*queue_head
= NULL
;
62 static repl_item
*queue_tail
= NULL
;
63 static pthread_mutex_t queue_lock
;
64 static sem_t queue_sema
;
65 static volatile gint rep_count
= 0;
68 proxy_repl_prod (void *ctx
)
70 repl_item
*item
= ctx
;
71 backend_thunk_t thunk
;
74 thunk
.parent
= item
->ms
;
75 thunk
.prov
= get_main_provider();
77 result
= thunk
.prov
->func_tbl
->get_child_func(&thunk
);
82 proxy_repl_cons (void *ctx
)
84 repl_item
*item
= ctx
;
85 my_state
*ms
= item
->ms
;
88 pp
= pipe_init_private(&ms
->pipe
);
90 pipe_cons_siginit(&ms
->pipe
,-1);
94 pp
->prov
= item
->server
;
97 return item
->server
->func_tbl
->put_child_func(pp
);
101 repl_worker_del (const repl_item
*item
)
107 bucket
= strdup(item
->path
);
109 error(0,errno
,"ran out of memory replicating delete for %s",
114 key
= strchr(bucket
,'/');
116 error(0,0,"invalid path replicating delete for %s",item
->path
);
121 rc
= item
->server
->func_tbl
->delete_func(item
->server
,
122 bucket
, key
, item
->path
);
123 if (rc
!= MHD_HTTP_OK
) {
124 error(0,0,"got status %d replicating delete for %s",
128 DPRINTF("finished replicating delete for %s, rc = %d\n",item
->path
,rc
);
132 repl_worker_bcreate (repl_item
*item
)
136 rc
= item
->server
->func_tbl
->bcreate_func(item
->server
,item
->path
);
137 if (rc
!= MHD_HTTP_OK
) {
138 error(0,0,"got status %d replicating bcreate for %s",
142 DPRINTF("finished replicating bcreate for %s, rc = %d\n",item
->path
,rc
);
145 /* Use this to diagnose failed thread creation. */
146 #define xpthread_create(thread, start_routine, item, msg) \
148 int err = pthread_create (thread, NULL, start_routine, item); \
150 error (0, err, msg); \
156 repl_worker (void *notused ATTRIBUTE_UNUSED
)
164 sem_wait(&queue_sema
);
165 pthread_mutex_lock(&queue_lock
);
167 queue_head
= item
->next
;
171 pthread_mutex_unlock(&queue_lock
);
174 * Do a full initialization here, not just in the rest. It's
175 * necessary in the oddball case where we're re-replicating as
176 * a result of an attribute/policy change, and it's not harmful
177 * in the normal case where we're actually storing a new file.
180 pipe_init_shared(&ms
->pipe
,ms
,1);
181 switch (item
->type
) {
183 xpthread_create(&prod
,proxy_repl_prod
,item
,
184 "failed to start producer thread");
185 xpthread_create(&cons
,proxy_repl_cons
,item
,
186 "failed to start consumer thread");
187 pthread_join(prod
,NULL
);
188 pthread_join(cons
,NULL
);
191 repl_worker_del(item
);
194 repl_worker_bcreate(item
);
197 error(0,0,"bad repl type %d (url=%s) skipped",
198 item
->type
, item
->path
);
200 /* No atomic dec without test? Lame. */
201 (void)g_atomic_int_dec_and_test(&rep_count
);
210 sem_init(&queue_sema
,0,0);
211 pthread_mutex_init(&queue_lock
,NULL
);
212 pthread_create(&tid
,NULL
,repl_worker
,NULL
);
216 repl_oget (void *ctx
, const char *id
)
218 query_ctx_t
*qctx
= ctx
;
219 char *cur_value
= NULL
;
221 (void)meta_get_value(qctx
->cur_bucket
,qctx
->cur_key
,id
,&cur_value
);
227 repl_sget (void *ctx
, const char *id
)
229 query_ctx_t
*qctx
= ctx
;
230 provider_t
*prov
= qctx
->cur_server
;
232 if (!strcmp(id
,"name")) {
235 if (!strcmp(id
,"type")) {
238 if (!strcmp(id
,"host")) {
241 if (!strcmp(id
,"key")) {
242 return prov
->username
;
244 if (!strcmp(id
,"secret")) {
245 return prov
->password
;
247 if (!strcmp(id
,"path")) {
252 kv
.key
= (char *) id
;
253 struct kv_pair
*p
= hash_lookup (prov
->attrs
, &kv
);
255 return p
? p
->val
: NULL
;
259 replicate (const char *url
, size_t size
, const char *policy
, my_state
*ms
)
272 error(0,0,"could not parse url %s",url
);
275 qctx
.cur_bucket
= strtok_r(url2
,"/",&stctx
);
276 qctx
.cur_key
= strtok_r(NULL
,"/",&stctx
);
279 size
= meta_get_size(qctx
.cur_bucket
,qctx
.cur_key
);
280 DPRINTF("fetched size %zu for %s\n",size
,url
);
284 DPRINTF("--- policy = %s\n",policy
);
285 expr
= parse(policy
);
291 oget
.func
= repl_oget
;
293 sget
.func
= repl_sget
;
297 provider_t
**prov_list
= hash_get_prov_list (&n_prov
);
298 if (prov_list
== NULL
) {
299 DPRINTF("failed to allocate space for provider list\n");
304 for (i
= 0; i
< n_prov
; i
++) {
305 provider_t
*prov
= prov_list
[i
];
306 if (!strcmp(prov
->name
, me
)) {
310 qctx
.cur_server
= prov
;
311 res
= eval(expr
,&oget
,&sget
);
317 DPRINTF("skipping %s for %s\n",prov
->name
,url
);
320 DPRINTF("REPLICATING %s to %s\n",url
,prov
->name
);
321 item
= malloc(sizeof(*item
));
323 error(0,errno
,"could not create repl_item for %s",
327 item
->type
= REPL_PUT
;
328 item
->path
= strdup(url
);
330 error(0,errno
,"could not create repl_item for %s",
337 pthread_mutex_lock(&queue_lock
);
339 item
->next
= queue_tail
->next
;
340 queue_tail
->next
= item
;
347 pthread_mutex_unlock(&queue_lock
);
348 g_atomic_int_inc(&rep_count
);
349 sem_post(&queue_sema
);
354 replicate_namespace_action (const char *name
, repl_t action
, my_state
*ms
)
357 provider_t
**prov_list
= hash_get_prov_list (&n_prov
);
358 if (prov_list
== NULL
) {
359 DPRINTF("failed to allocate space for provider list\n");
364 for (i
= 0; i
< n_prov
; i
++) {
365 provider_t
*prov
= prov_list
[i
];
366 if (!strcmp(prov
->name
, me
)) {
369 DPRINTF("replicating %s(%s) on %s\n",
370 (action
== REPL_ODELETE
? "delete" : "create"),
372 repl_item
*item
= malloc(sizeof(*item
));
374 error(0,errno
,"could not create repl_item for %s",
379 item
->path
= strdup(name
);
385 pthread_mutex_lock(&queue_lock
);
387 item
->next
= queue_tail
->next
;
388 queue_tail
->next
= item
;
395 pthread_mutex_unlock(&queue_lock
);
396 g_atomic_int_inc(&rep_count
);
397 sem_post(&queue_sema
);
402 replicate_delete (const char *name
, my_state
*ms
)
404 replicate_namespace_action(name
,REPL_ODELETE
,ms
);
408 replicate_bcreate (const char *name
, my_state
*ms
)
410 replicate_namespace_action(name
,REPL_BCREATE
,ms
);
413 /* Part of our API to the query module. */
415 follow_link (char *object
, const char *key
)
420 slash
= strchr(object
,'/');
426 (void)meta_get_value(object
,slash
,key
,&value
);
429 DPRINTF("%s: %s:%s => %s\n",__func__
,object
,key
,value
);
436 return g_atomic_int_get(&rep_count
);