1 /* Copyright (C) 2010 Red Hat, Inc.
3 This program is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program. If not, see <http://www.gnu.org/licenses/>. */
29 /* Mongo (rather antisocially) tries to define this itself. */
34 #include <mongo/client/dbclient.h>
35 using namespace mongo
;
37 /* TBD: parameterize */
38 #define MAIN_TBL "repo.main"
41 * Since the client isn't inherently MT-safe, we serialize access to it
42 * ourselves. Fortunately, none of our metadata operations should be very
43 * long-lived; if they are it probably means our connection is FUBAR and other
44 * threads will be affected anyway.
47 #define SHOW_CONTENTION
49 pthread_mutex_t client_lock
= PTHREAD_MUTEX_INITIALIZER
;
50 #if defined(SHOW_CONTENTION)
51 #define CLIENT_LOCK do { \
52 if (pthread_mutex_trylock(&client_lock) != 0) { \
53 cout << "contention in " << __func__ << endl; \
54 pthread_mutex_lock(&client_lock); \
58 #define CLIENT_LOCK pthread_mutex_lock(&client_lock)
60 #define CLIENT_UNLOCK pthread_mutex_unlock(&client_lock)
63 dbl_to_str (double *foo
, char *optr
)
66 unsigned char *iptr
= (unsigned char *)foo
;
68 for (i
= 0; i
< sizeof(*foo
); ++i
) {
69 optr
+= sprintf(optr
,"%02x",*(iptr
++));
82 DBClientConnection client
;
84 char * DidPut (const char *bucket
, const char *key
,
85 const char *loc
, size_t size
);
86 void GotCopy (const char *bucket
, const char *key
,
88 char * HasCopy (const char *bucket
, const char *key
,
90 int SetValue (const char *bucket
, const char *key
,
91 const char *mkey
, const char * mvalue
);
92 int GetValue (const char *bucket
, const char *key
,
93 const char *mkey
, char ** mvalue
);
94 RepoQuery
* NewQuery (const char *bucket
, const char *key
,
96 auto_ptr
<DBClientCursor
> GetCursor (Query
&q
);
97 void Delete (const char *bucket
, const char *key
);
98 size_t GetSize (const char *bucket
, const char *key
);
99 int Check (const char *bucket
, const char *key
,
105 DBClientCursor
* curs
;
108 RepoQuery (const char *, const char *, const char *,
119 RepoMeta::RepoMeta ()
123 sprintf(addr
,"%s:%u",db_host
,db_port
);
124 client
.connect(addr
);
133 RepoMeta::~RepoMeta ()
143 auto_ptr
<DBClientCursor
>
144 RepoMeta::GetCursor (Query
&q
)
146 auto_ptr
<DBClientCursor
> curs
;
148 curs
= client
.query(MAIN_TBL
,q
);
150 cout
<< "reconnecting" << endl
;
152 client
.connect("localhost");
154 catch (ConnectException
&ce
) {
155 cout
<< "server down" << endl
;
158 curs
= client
.query(MAIN_TBL
,q
);
165 RepoMeta::DidPut (const char *bucket
, const char *key
, const char *loc
,
169 struct timeval now_tv
;
171 auto_ptr
<DBClientCursor
> curs
;
173 char now_str
[sizeof(now
)*2+1];
175 gettimeofday(&now_tv
,NULL
);
176 now
= (double)now_tv
.tv_sec
+ (double)now_tv
.tv_usec
/ 1000000.0;
177 dbl_to_str(&now
,now_str
);
178 cout
<< "now_str = " << now_str
<< endl
;
180 q
= QUERY("bucket"<<bucket
<<"key"<<key
);
183 /* Nice functionality, but what an ugly syntax! */
184 client
.update(MAIN_TBL
,q
,BSON(
185 "$set"<<BSON("loc"<<BSON_ARRAY(loc
))
186 << "$set"<<BSON("date"<<now
)
187 << "$set"<<BSON("etag"<<now_str
)
188 << "$set"<<BSON("size"<<(long long)size
)));
190 client
.update(MAIN_TBL
,q
,
191 BSON("$set"<<BSON("loc"<<BSON_ARRAY(loc
))));
192 client
.update(MAIN_TBL
,q
,
193 BSON("$set"<<BSON("date"<<now
)));
194 client
.update(MAIN_TBL
,q
,
195 BSON("$set"<<BSON("etag"<<now_str
)));
196 client
.update(MAIN_TBL
,q
,
197 BSON("$set"<<BSON("size"<<(long long)size
)));
201 bb
<< "bucket" << bucket
<< "key" << key
202 << "loc" << BSON_ARRAY(loc
) << "date" << now
203 << "etag" << now_str
<< "size" << (long long)size
;
204 client
.insert(MAIN_TBL
,bb
.obj());
207 return strdup(now_str
);
211 meta_did_put (const char *bucket
, const char *key
, const char *loc
, size_t size
)
215 cout
<< "meta_did_put(" << bucket
<< "," << key
<< "," << loc
<< ")"
219 rc
= it
->DidPut(bucket
,key
,loc
,size
);
226 RepoMeta::GotCopy (const char *bucket
, const char *key
, const char *loc
)
229 auto_ptr
<DBClientCursor
> curs
;
232 q
= QUERY("bucket"<<bucket
<<"key"<<key
);
235 /* Nice functionality, but what an ugly syntax! */
236 client
.update(MAIN_TBL
,q
,BSON("$addToSet"<<BSON("loc"<<loc
)));
239 cerr
<< bucket
<< ":" << key
<< " not found in GotCopy!" << endl
;
244 meta_got_copy (const char *bucket
, const char *key
, const char *loc
)
247 it
->GotCopy(bucket
,key
,loc
);
252 RepoMeta::HasCopy (const char *bucket
, const char *key
, const char *loc
)
255 auto_ptr
<DBClientCursor
> curs
;
259 q
= QUERY("bucket"<<bucket
<<"key"<<key
<<"loc"<<loc
);
262 cout
<< bucket
<< "/" << key
<< " not found at " << loc
<< endl
;
266 value
= curs
->next().getStringField("etag");
267 if (!value
|| !*value
) {
268 cout
<< bucket
<< "/" << key
<< " no etag at " << loc
<< endl
;
272 cout
<< bucket
<< "/" << key
<< " etag = " << value
<< endl
;
273 return strdup(value
);
277 meta_has_copy (const char *bucket
, const char *key
, const char *loc
)
282 rc
= it
->HasCopy(bucket
,key
,loc
);
289 RepoMeta::SetValue (const char *bucket
, const char *key
, const char *mkey
,
292 Query q
= QUERY("bucket"<<bucket
<<"key"<<key
);
294 client
.update(MAIN_TBL
,q
,BSON("$set"<<BSON(mkey
<<mvalue
)),1);
295 // TBD: check for and propagate errors.
300 meta_set_value (const char *bucket
, const char *key
, const char *mkey
,
306 rc
= it
->SetValue(bucket
,key
,mkey
,mvalue
);
313 RepoMeta::GetValue (const char *bucket
, const char *key
, const char *mkey
,
316 auto_ptr
<DBClientCursor
> curs
;
321 q
= QUERY("bucket"<<bucket
<<"key"<<key
);
329 data
= bo
.getStringField(mkey
);
330 if (!data
|| !*data
) {
334 *mvalue
= strdup(data
);
339 meta_get_value (const char *bucket
, const char *key
, const char *mkey
,
345 rc
= it
->GetValue(bucket
,key
,mkey
,mvalue
);
351 RepoQuery::RepoQuery (const char *bucket
, const char *key
, const char *qstr
,
356 auto_ptr
<DBClientCursor
> tmp
;
359 cout
<< "bucket is " << bucket
<< " and we don't care" << endl
;
360 q
= QUERY("bucket"<<bucket
);
363 cout
<< "key is " << key
<< " and we don't care" << endl
;
364 q
= QUERY("key"<<key
);
371 * TBD: we should really convert our query into one of Mongo's,
372 * and let them do all the work. Handling the general case
373 * would be pretty messy, but we could handle specific cases
374 * pretty easily. For example, a very high percentage of
375 * queries are likely to be a single field/value comparison.
376 * For now just punt, but revisit later.
385 cout
<< "could not parse " << qstr
<< endl
;
392 curs
= parent
.GetCursor(q
).release();
397 RepoQuery::~RepoQuery ()
399 cout
<< "in " << __func__
<< endl
;
408 meta_query_stop (void * qobj
)
411 delete (RepoQuery
*)qobj
;
416 query_getter (void *ctx
, const char *id
)
418 BSONObj
*cur_bo
= (BSONObj
*)ctx
;
420 return (char *)cur_bo
->getStringField(id
);
424 RepoQuery::Next (void)
428 while (curs
->more()) {
431 getter
.func
= query_getter
;
432 getter
.ctx
= (void *)&bo
;
433 if (eval(expr
,&getter
,NULL
) <= 0) {
437 bucket
= (char *)bo
.getStringField("bucket");
438 key
= (char *)bo
.getStringField("key");
446 RepoMeta::NewQuery (const char *bucket
, const char *key
, const char *expr
)
448 return new RepoQuery(bucket
,key
,expr
,*this);
452 meta_query_new (const char *bucket
, const char *key
, const char *expr
)
456 if ((bucket
&& key
) || (!bucket
&& !key
)) {
461 rc
= it
->NewQuery(bucket
,key
,expr
);
468 meta_query_next (void * qobj
, char ** bucket
, char ** key
)
470 RepoQuery
* rq
= (RepoQuery
*)qobj
;
479 *bucket
= rq
->bucket
;
486 RepoMeta::BucketList (void)
489 * TBD: make this return values instead of producing output.
490 * This is just a code fragment showing how to get a list of buckets,
495 BSONObj dist
= BSON("distinct"<<"main"<<"key"<<"bucket");
496 if (client
.runCommand("repo",dist
,repl
)) {
497 cout
<< repl
.toString() << endl
;
498 BSONObj elem
= repl
.getField("values").embeddedObject();
499 for (int i
= 0; i
< elem
.nFields(); ++i
) {
500 cout
<< elem
[i
].str() << endl
;
507 RepoMeta::Delete (const char *bucket
, const char *key
)
509 Query q
= QUERY("bucket"<<bucket
<<"key"<<key
);
511 client
.remove(MAIN_TBL
,q
);
516 meta_delete (const char *bucket
, const char *key
)
519 it
->Delete(bucket
,key
);
524 RepoMeta::GetSize (const char *bucket
, const char *key
)
526 auto_ptr
<DBClientCursor
> curs
;
531 q
= QUERY("bucket"<<bucket
<<"key"<<key
);
539 return bo
.getField("size").numberLong();
544 meta_get_size (const char *bucket
, const char *key
)
549 rc
= it
->GetSize(bucket
,key
);