1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
29 /* Mongo (rather antisocially) tries to define this itself. */
34 #include <mongo/client/dbclient.h>
35 using namespace mongo
;
37 /* TBD: parameterize */
38 #define MAIN_TBL "repo.main"
41 * Since the client isn't inherently MT-safe, we serialize access to it
42 * ourselves. Fortunately, none of our metadata operations should be very
43 * long-lived; if they are it probably means our connection is FUBAR and other
44 * threads will be affected anyway.
47 #define SHOW_CONTENTION
49 pthread_mutex_t client_lock
= PTHREAD_MUTEX_INITIALIZER
;
50 #if defined(SHOW_CONTENTION)
51 #define CLIENT_LOCK do { \
52 if (pthread_mutex_trylock(&client_lock) != 0) { \
53 cout << "contention in " << __func__ << endl; \
54 pthread_mutex_lock(&client_lock); \
58 #define CLIENT_LOCK pthread_mutex_lock(&client_lock)
60 #define CLIENT_UNLOCK pthread_mutex_unlock(&client_lock)
63 dbl_to_str (double *foo
, char *optr
)
66 unsigned char *iptr
= (unsigned char *)foo
;
68 for (i
= 0; i
< sizeof(*foo
); ++i
) {
69 optr
+= sprintf(optr
,"%02x",*(iptr
++));
82 DBClientConnection client
;
85 char * DidPut (const char *bucket
, const char *key
,
86 const char *loc
, size_t size
);
87 void GotCopy (const char *bucket
, const char *key
,
89 char * HasCopy (const char *bucket
, const char *key
,
91 int SetValue (const char *bucket
, const char *key
,
92 const char *mkey
, const char * mvalue
);
93 int GetValue (const char *bucket
, const char *key
,
94 const char *mkey
, char ** mvalue
);
95 RepoQuery
* NewQuery (const char *bucket
, const char *key
,
97 auto_ptr
<DBClientCursor
> GetCursor (Query
&q
);
98 void Delete (const char *bucket
, const char *key
);
99 size_t GetSize (const char *bucket
, const char *key
);
100 int Check (const char *bucket
, const char *key
,
102 void * GetAttrList (const char *bucket
, const char *key
);
107 DBClientCursor
* curs
;
110 RepoQuery (const char *, const char *, const char *,
121 RepoMeta::RepoMeta ()
125 cout
<< "bite me" << endl
;
128 // TBD: assemble this string properly
129 sprintf(addr
,"%s:%u",db_host
,db_port
);
131 client
.connect(addr
);
133 catch (ConnectException
&ce
) {
134 cerr
<< "server down, no metadata access" << endl
;
144 RepoMeta::~RepoMeta ()
154 auto_ptr
<DBClientCursor
>
155 RepoMeta::GetCursor (Query
&q
)
157 auto_ptr
<DBClientCursor
> curs
;
158 bool looping
= false;
161 if (!client
.isFailed()) {
162 curs
= client
.query(MAIN_TBL
,q
);
171 client
.connect(addr
);
173 catch (ConnectException
&ce
) {
174 cerr
<< "reconnection to " << addr
<< " failed"
184 RepoMeta::DidPut (const char *bucket
, const char *key
, const char *loc
,
188 struct timeval now_tv
;
190 auto_ptr
<DBClientCursor
> curs
;
192 char now_str
[sizeof(now
)*2+1];
194 gettimeofday(&now_tv
,NULL
);
195 now
= (double)now_tv
.tv_sec
+ (double)now_tv
.tv_usec
/ 1000000.0;
196 dbl_to_str(&now
,now_str
);
197 cout
<< "now_str = " << now_str
<< endl
;
199 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
202 cerr
<< "DidPut failed for " << bucket
<< "/" << key
<< endl
;
206 /* Nice functionality, but what an ugly syntax! */
207 client
.update(MAIN_TBL
,q
,BSON(
208 "$set"<<BSON("_loc"<<BSON_ARRAY(loc
))
209 << "$set"<<BSON("_date"<<now
)
210 << "$set"<<BSON("_etag"<<now_str
)
211 << "$set"<<BSON("_size"<<(long long)size
)));
213 client
.update(MAIN_TBL
,q
,
214 BSON("$set"<<BSON("_loc"<<BSON_ARRAY(loc
))));
215 client
.update(MAIN_TBL
,q
,
216 BSON("$set"<<BSON("_date"<<now
)));
217 client
.update(MAIN_TBL
,q
,
218 BSON("$set"<<BSON("_etag"<<now_str
)));
219 client
.update(MAIN_TBL
,q
,
220 BSON("$set"<<BSON("_size"<<(long long)size
)));
224 bb
<< "_bucket" << bucket
<< "_key" << key
225 << "_loc" << BSON_ARRAY(loc
) << "_date" << now
226 << "_etag" << now_str
<< "_size" << (long long)size
;
227 client
.insert(MAIN_TBL
,bb
.obj());
230 return strdup(now_str
);
234 meta_did_put (const char *bucket
, const char *key
, const char *loc
, size_t size
)
238 cout
<< "meta_did_put(" << bucket
<< "," << key
<< "," << loc
<< ")"
242 rc
= it
->DidPut(bucket
,key
,loc
,size
);
249 RepoMeta::GotCopy (const char *bucket
, const char *key
, const char *loc
)
252 auto_ptr
<DBClientCursor
> curs
;
255 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
258 cerr
<< "GotCopy failed for " << bucket
<< "/" << key
<< endl
;
262 /* Nice functionality, but what an ugly syntax! */
263 client
.update(MAIN_TBL
,q
,BSON("$addToSet"<<BSON("_loc"<<loc
)));
266 cerr
<< bucket
<< "/" << key
<< " not found in GotCopy!" << endl
;
271 meta_got_copy (const char *bucket
, const char *key
, const char *loc
)
274 it
->GotCopy(bucket
,key
,loc
);
279 RepoMeta::HasCopy (const char *bucket
, const char *key
, const char *loc
)
282 auto_ptr
<DBClientCursor
> curs
;
286 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
<<"_loc"<<loc
);
289 cerr
<< "HasCopy failed for " << bucket
<< "/" << key
<< endl
;
293 cout
<< bucket
<< "/" << key
<< " not found at " << loc
<< endl
;
297 value
= curs
->next().getStringField("_etag");
298 if (!value
|| !*value
) {
299 cout
<< bucket
<< "/" << key
<< " no _etag at " << loc
<< endl
;
303 cout
<< bucket
<< "/" << key
<< " _etag = " << value
<< endl
;
304 return strdup(value
);
308 meta_has_copy (const char *bucket
, const char *key
, const char *loc
)
313 rc
= it
->HasCopy(bucket
,key
,loc
);
320 RepoMeta::SetValue (const char *bucket
, const char *key
, const char *mkey
,
323 Query q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
326 client
.update(MAIN_TBL
,q
,BSON("$set"<<BSON(mkey
<<mvalue
)),1);
328 catch (ConnectException
&ce
) {
329 cerr
<< "SetValue failed for " << bucket
<< "/" << key
<< ":"
334 // TBD: check for and propagate errors.
339 meta_set_value (const char *bucket
, const char *key
, const char *mkey
,
345 rc
= it
->SetValue(bucket
,key
,mkey
,mvalue
);
352 RepoMeta::GetValue (const char *bucket
, const char *key
, const char *mkey
,
355 auto_ptr
<DBClientCursor
> curs
;
360 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
363 cerr
<< "GetValue failed for " << bucket
<< "/" << key
<< ":"
372 data
= bo
.getStringField(mkey
);
373 if (!data
|| !*data
) {
377 *mvalue
= strdup(data
);
382 meta_get_value (const char *bucket
, const char *key
, const char *mkey
,
388 rc
= it
->GetValue(bucket
,key
,mkey
,mvalue
);
394 RepoQuery::RepoQuery (const char *bucket
, const char *key
, const char *qstr
,
399 auto_ptr
<DBClientCursor
> tmp
;
402 cout
<< "bucket is " << bucket
<< " and we don't care" << endl
;
403 q
= QUERY("_bucket"<<bucket
);
406 cout
<< "key is " << key
<< " and we don't care" << endl
;
407 q
= QUERY("_key"<<key
);
414 * TBD: we should really convert our query into one of Mongo's,
415 * and let them do all the work. Handling the general case
416 * would be pretty messy, but we could handle specific cases
417 * pretty easily. For example, a very high percentage of
418 * queries are likely to be a single field/value comparison.
419 * For now just punt, but revisit later.
428 cout
<< "could not parse " << qstr
<< endl
;
435 curs
= parent
.GetCursor(q
).release();
440 RepoQuery::~RepoQuery ()
442 cout
<< "in " << __func__
<< endl
;
451 meta_query_stop (void * qobj
)
454 delete (RepoQuery
*)qobj
;
458 extern "C" const char *
459 query_getter (void *ctx
, const char *id
)
461 BSONObj
*cur_bo
= (BSONObj
*)ctx
;
463 return (char *)cur_bo
->getStringField(id
);
467 RepoQuery::Next (void)
475 while (curs
->more()) {
478 getter
.func
= query_getter
;
479 getter
.ctx
= (void *)&bo
;
480 if (eval(expr
,&getter
,NULL
) <= 0) {
484 bucket
= (char *)bo
.getStringField("_bucket");
485 key
= (char *)bo
.getStringField("_key");
493 RepoMeta::NewQuery (const char *bucket
, const char *key
, const char *expr
)
495 return new RepoQuery(bucket
,key
,expr
,*this);
499 meta_query_new (const char *bucket
, const char *key
, const char *expr
)
503 if ((bucket
&& key
) || (!bucket
&& !key
)) {
508 rc
= it
->NewQuery(bucket
,key
,expr
);
515 meta_query_next (void * qobj
, char ** bucket
, char ** key
)
517 RepoQuery
* rq
= (RepoQuery
*)qobj
;
526 *bucket
= rq
->bucket
;
533 RepoMeta::BucketList (void)
536 * TBD: make this return values instead of producing output.
537 * This is just a code fragment showing how to get a list of buckets,
542 BSONObj dist
= BSON("distinct"<<"main"<<"_key"<<"_bucket");
543 if (client
.runCommand("repo",dist
,repl
)) {
544 cout
<< repl
.toString() << endl
;
545 BSONObj elem
= repl
.getField("values").embeddedObject();
546 for (int i
= 0; i
< elem
.nFields(); ++i
) {
547 cout
<< elem
[i
].str() << endl
;
554 RepoMeta::Delete (const char *bucket
, const char *key
)
556 Query q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
559 client
.remove(MAIN_TBL
,q
);
561 catch (ConnectException
&ce
) {
562 cerr
<< "Delete failed for " << bucket
<< "/" << key
<< endl
;
568 meta_delete (const char *bucket
, const char *key
)
571 it
->Delete(bucket
,key
);
576 RepoMeta::GetSize (const char *bucket
, const char *key
)
578 auto_ptr
<DBClientCursor
> curs
;
585 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
593 return bo
.getField("_size").numberLong();
598 meta_get_size (const char *bucket
, const char *key
)
603 rc
= it
->GetSize(bucket
,key
);
611 AttrList (BSONObj
&);
612 int Next (const char **, const char **);
614 vector
<BSONElement
> vec
;
618 AttrList::AttrList (BSONObj
&bo
)
626 AttrList::Next (const char **name
, const char **value
)
630 while (idx
< vec
.size()) {
632 if (elem
.type() == String
) {
633 *name
= elem
.fieldName();
634 *value
= elem
.String().c_str();
643 RepoMeta::GetAttrList (const char *bucket
, const char *key
)
645 auto_ptr
<DBClientCursor
> curs
;
649 q
= QUERY("_bucket"<<bucket
<<"_key"<<key
);
657 return new AttrList(bo
);
661 meta_get_attrs (const char *bucket
, const char *key
)
666 poc
= it
->GetAttrList(bucket
,key
);
674 meta_attr_next (void *ctx
, const char **name
, const char **value
)
676 AttrList
*poc
= (AttrList
*)ctx
;
678 return poc
->Next(name
,value
);
683 meta_attr_stop (void *ctx
)
685 AttrList
*poc
= (AttrList
*)ctx
;