Added manual re-replication trigger.
[iwhd.git] / meta.cpp
blobd39e6814a7ac366501072dd1130fe19576f2be98
1 #include <errno.h>
2 #include <stdio.h>
3 #include <sys/time.h>
4 #include <iostream>
5 #include "repo.h"
6 #include "meta.h"
7 #include "query.h"
9 using namespace std;
11 /* Mongo (rather antisocially) tries to define this itself. */
12 #if defined(VERSION)
13 #undef VERSION
14 #endif
16 #include <mongo/client/dbclient.h>
17 using namespace mongo;
19 /* TBD: parameterize */
20 #define MAIN_TBL "repo.main"
22 /* TBD: ick! Need to make the parser/query stuff reentrant. */
23 static BSONObj cur_bo;
25 void
26 dbl_to_str (double *foo, char *optr)
28 int i;
29 unsigned char *iptr = (unsigned char *)foo;
31 for (i = 0; i < sizeof(*foo); ++i) {
32 optr += sprintf(optr,"%02x",*(iptr++));
36 class RepoMeta;
37 class RepoQuery;
39 class RepoMeta {
41 public:
42 RepoMeta ();
43 ~RepoMeta ();
45 DBClientConnection client;
47 char * DidPut (char * bucket, char * key, char * loc,
48 size_t size);
49 void GotCopy (char * bucket, char * key, char * loc);
50 char * HasCopy (char * bucket, char * key, char * loc);
51 int SetValue (char * bucket, char * key, char * mkey,
52 char * mvalue);
53 int GetValue (char * bucket, char * key, char * mkey,
54 char ** mvalue);
55 RepoQuery * NewQuery (char * expr);
56 auto_ptr<DBClientCursor> GetCursor (Query &q);
57 void Delete (char * bucket, char * key);
58 size_t GetSize (char * bucket, char * key);
60 private:
61 void BucketList (void); /* just sample code, don't use */
64 class RepoQuery {
65 RepoMeta & parent;
66 DBClientCursor * curs;
67 value_t * expr;
68 public:
69 RepoQuery (char *, RepoMeta &);
70 ~RepoQuery ();
71 bool Next (void);
72 char *bucket;
73 char *key;
77 RepoMeta *it;
79 RepoMeta::RepoMeta ()
81 char addr[128];
83 sprintf(addr,"%s:%u",db_host,db_port);
84 client.connect(addr);
87 extern "C" void
88 meta_init (void)
90 it = new RepoMeta();
93 RepoMeta::~RepoMeta ()
97 extern "C" void
98 meta_fini (void)
100 delete it;
103 auto_ptr<DBClientCursor>
104 RepoMeta::GetCursor (Query &q)
106 auto_ptr<DBClientCursor> curs;
108 curs = client.query(MAIN_TBL,q);
109 if (!curs.get()) {
110 cout << "reconnecting" << endl;
111 try {
112 client.connect("localhost");
114 catch (ConnectException &ce) {
115 cout << "server down" << endl;
116 throw;
118 curs = client.query(MAIN_TBL,q);
121 return curs;
124 char *
125 RepoMeta::DidPut (char * bucket, char * key, char * loc, size_t size)
127 BSONObjBuilder bb;
128 struct timeval now_tv;
129 double now;
130 auto_ptr<DBClientCursor> curs;
131 Query q;
132 char now_str[sizeof(now)*2+1];
134 /* TBD: disambiguate master/slave cases a better way */
135 extern char * master_host;
137 gettimeofday(&now_tv,NULL);
138 now = (double)now_tv.tv_sec + (double)now_tv.tv_usec / 1000000.0;
139 dbl_to_str(&now,now_str);
140 cout << "now_str = " << now_str << endl;
142 q = QUERY("bucket"<<bucket<<"key"<<key);
143 curs = GetCursor(q);
144 if (curs->more()) {
145 /* Nice functionality, but what an ugly syntax! */
146 if (master_host) {
147 client.update(MAIN_TBL,q,
148 BSON("$addToSet"<<BSON("loc"<<loc)));
150 else {
151 client.update(MAIN_TBL,q,
152 BSON("$set"<<BSON("loc"<<BSON_ARRAY(loc))));
154 client.update(MAIN_TBL,q,BSON("$set"<<BSON("date"<<now)));
155 client.update(MAIN_TBL,q,BSON("$set"<<BSON("etag"<<now_str)));
157 else {
158 bb << "bucket" << bucket << "key" << key
159 << "loc" << BSON_ARRAY(loc) << "date" << now
160 << "etag" << now_str << "size" << (long long)size;
161 client.insert(MAIN_TBL,bb.obj());
164 return strdup(now_str);
167 extern "C" char *
168 meta_did_put (char * bucket, char * key, char * loc, size_t size)
170 return it->DidPut(bucket,key,loc,size);
173 void
174 RepoMeta::GotCopy (char * bucket, char * key, char * loc)
176 BSONObjBuilder bb;
177 auto_ptr<DBClientCursor> curs;
178 Query q;
180 q = QUERY("bucket"<<bucket<<"key"<<key);
181 curs = GetCursor(q);
182 if (curs->more()) {
183 /* Nice functionality, but what an ugly syntax! */
184 client.update(MAIN_TBL,q,BSON("$addToSet"<<BSON("loc"<<loc)));
186 else {
187 cerr << bucket << ":" << key << " not found in GotCopy!" << endl;
191 extern "C" void
192 meta_got_copy (char * bucket, char * key, char * loc)
194 it->GotCopy(bucket,key,loc);
197 char *
198 RepoMeta::HasCopy (char * bucket, char * key, char * loc)
200 BSONObjBuilder bb;
201 auto_ptr<DBClientCursor> curs;
202 Query q;
203 const char *value;
205 q = QUERY("bucket"<<bucket<<"key"<<key<<"loc"<<loc);
206 curs = GetCursor(q);
207 if (!curs->more()) {
208 return NULL;
211 value = curs->next().getStringField("etag");
212 if (!value || !*value) {
213 return NULL;
215 return strdup(value);
218 extern "C" char *
219 meta_has_copy (char * bucket, char * key, char * loc)
221 return it->HasCopy(bucket,key,loc);
225 RepoMeta::SetValue (char * bucket, char * key, char * mkey, char * mvalue)
227 Query q = QUERY("bucket"<<bucket<<"key"<<key);
229 client.update(MAIN_TBL,q,BSON("$set"<<BSON(mkey<<mvalue)),1);
230 // TBD: check for and propagate errors.
231 return 0;
234 extern "C" int
235 meta_set_value (char * bucket, char * key, char * mkey, char * mvalue)
237 return it->SetValue(bucket,key,mkey,mvalue);
241 RepoMeta::GetValue (char * bucket, char * key, char * mkey, char ** mvalue)
243 auto_ptr<DBClientCursor> curs;
244 Query q;
245 BSONObj bo;
246 const char * data;
248 q = QUERY("bucket"<<bucket<<"key"<<key);
249 curs = GetCursor(q);
251 if (!curs->more()) {
252 return ENXIO;
255 bo = curs->next();
256 data = bo.getStringField(mkey);
257 if (!data || !*data) {
258 return ENXIO;
261 *mvalue = strdup(data);
262 return 0;
265 extern "C" int
266 meta_get_value (char * bucket, char * key, char * mkey, char ** mvalue)
268 return it->GetValue(bucket,key,mkey,mvalue);
271 RepoQuery::RepoQuery (char *qstr, RepoMeta &p) : parent(p)
273 Query q;
274 auto_ptr<DBClientCursor> tmp;
276 if (*qstr == '/') {
277 expr = NULL;
278 q = QUERY("bucket"<<qstr+1);
280 else {
281 expr = parse(qstr);
282 if (expr) {
283 print_value(expr);
286 * TBD: we should really convert our query into one of Mongo's,
287 * and let them do all the work. Handling the general case
288 * would be pretty messy, but we could handle specific cases
289 * pretty easily. For example, a very high percentage of
290 * queries are likely to be a single field/value comparison.
291 * For now just punt, but revisit later.
293 q = Query();
296 curs = parent.GetCursor(q).release();
297 bucket = NULL;
298 key = NULL;
301 RepoQuery::~RepoQuery ()
303 cout << "in " << __func__ << endl;
304 if (expr) {
305 free_value(expr);
307 delete curs;
309 if (bucket) {
310 free(bucket);
312 if (key) {
313 free(key);
317 extern "C" void
318 meta_query_stop (void * qobj)
320 delete (RepoQuery *)qobj;
323 extern "C" char *
324 query_getter (char *id)
326 return (char *)cur_bo.getStringField(id);
329 bool
330 RepoQuery::Next (void)
332 BSONObj bo;
334 while (curs->more()) {
335 bo = curs->next();
336 if (expr) {
337 cur_bo = bo;
338 if (eval(expr,&query_getter,NULL) <= 0) {
339 continue;
342 if (bucket) { free(bucket); }
343 bucket = strdup(bo.getStringField("bucket"));
344 if (key) { free(key); }
345 key = strdup(bo.getStringField("key"));
346 return true;
349 curs = NULL;
350 return false;
353 RepoQuery *
354 RepoMeta::NewQuery (char *expr)
356 return new RepoQuery(expr,*this);
359 extern "C" void *
360 meta_query_new (char *expr)
362 return it->NewQuery(expr);
365 extern "C" int
366 meta_query_next (void * qobj, char ** bucket, char ** key)
368 RepoQuery * rq = (RepoQuery *)qobj;
370 if (!rq->Next()) {
371 delete rq;
372 return 0;
375 *bucket = rq->bucket;
376 *key = rq->key;
377 return 1;
380 void
381 RepoMeta::BucketList (void)
384 * TBD: make this return values instead of producing output.
385 * This is just a code fragment showing how to get a list of buckets,
386 * in case I forget.
388 BSONObj repl;
390 BSONObj dist = BSON("distinct"<<"main"<<"key"<<"bucket");
391 if (client.runCommand("repo",dist,repl)) {
392 cout << repl.toString() << endl;
393 BSONObj elem = repl.getField("values").embeddedObject();
394 for (int i = 0; i < elem.nFields(); ++i) {
395 cout << elem[i].str() << endl;
400 void
401 RepoMeta::Delete (char * bucket, char * key)
403 Query q = QUERY("bucket"<<bucket<<"key"<<key);
405 client.remove(MAIN_TBL,q);
408 extern "C"
409 void
410 meta_delete (char * bucket, char * key)
412 it->Delete(bucket,key);
415 size_t
416 RepoMeta::GetSize (char * bucket, char * key)
418 auto_ptr<DBClientCursor> curs;
419 Query q;
420 BSONObj bo;
421 const char * data;
423 q = QUERY("bucket"<<bucket<<"key"<<key);
424 curs = GetCursor(q);
426 if (!curs->more()) {
427 return 0;
430 bo = curs->next();
431 return bo.getField("size").numberLong();
434 extern "C"
435 size_t
436 meta_get_size (char * bucket, char * key)
438 return it->GetSize(bucket,key);