Put the objects before the libraries when building
[nmdb.git] / nmdb / dbloop.c
blob7a4f4fd43a3e81f3c3b3ed4ee13e8e053c014996
2 #include <pthread.h> /* threading functions */
3 #include <time.h> /* nanosleep() */
4 #include <errno.h> /* ETIMEDOUT */
5 #include <string.h> /* memcmp() */
6 #include <stdlib.h> /* malloc()/free() */
7 #include <stdio.h> /* snprintf() */
9 #include "common.h"
10 #include "dbloop.h"
11 #include "be.h"
12 #include "queue.h"
13 #include "net-const.h"
14 #include "req.h"
15 #include "log.h"
16 #include "netutils.h"
17 #include "sparse.h"
20 static void *db_loop(void *arg);
21 static void process_op(struct db_conn *db, struct queue_entry *e);
24 /* Used to signal the loop that it should exit when the queue becomes empty.
25 * It's not the cleanest way, but it's simple and effective. */
26 static int loop_should_stop = 0;
29 pthread_t *db_loop_start(struct db_conn *db)
31 pthread_t *thread;
33 thread = malloc(sizeof(pthread_t));
34 if (thread == NULL)
35 return NULL;
37 pthread_create(thread, NULL, db_loop, (void *) db);
39 return thread;
42 void db_loop_stop(pthread_t *thread)
44 loop_should_stop = 1;
45 pthread_join(*thread, NULL);
46 free(thread);
47 return;
51 static void *db_loop(void *arg)
53 int rv;
54 struct timespec ts;
55 struct queue_entry *e;
56 struct db_conn *db;
58 db = (struct db_conn *) arg;
60 for (;;) {
61 /* Condition waits are specified with absolute timeouts, see
62 * pthread_cond_timedwait()'s SUSv3 specification for more
63 * information. We need to calculate it each time.
64 * We sleep for 1 sec. There's no real need for it to be too
65 * fast (it's only used so that stop detection doesn't take
66 * long), but we don't want it to be too slow either. */
67 clock_gettime(CLOCK_REALTIME, &ts);
68 ts.tv_sec += 1;
70 rv = 0;
71 queue_lock(op_queue);
72 while (queue_isempty(op_queue) && rv == 0) {
73 rv = queue_timedwait(op_queue, &ts);
76 if (rv != 0 && rv != ETIMEDOUT) {
77 errlog("Error in queue_timedwait()");
78 /* When the timedwait fails the lock is released, so
79 * we need to properly annotate this case. */
80 __release(op_queue->lock);
81 continue;
84 e = queue_get(op_queue);
85 queue_unlock(op_queue);
87 if (e == NULL) {
88 if (loop_should_stop) {
89 break;
90 } else {
91 continue;
95 process_op(db, e);
97 /* Free the entry that was allocated when tipc queued the
98 * operation. This also frees it's components. */
99 queue_entry_free(e);
102 return NULL;
105 static void process_op(struct db_conn *db, struct queue_entry *e)
107 int rv;
108 if (e->operation == REQ_SET) {
109 rv = db->set(db, e->key, e->ksize, e->val, e->vsize);
110 if (!(e->req->flags & FLAGS_SYNC))
111 return;
113 if (!rv) {
114 e->req->reply_err(e->req, ERR_DB);
115 return;
117 e->req->reply_mini(e->req, REP_OK);
119 } else if (e->operation == REQ_GET) {
120 unsigned char *val;
121 size_t vsize = 64 * 1024;
123 val = malloc(vsize);
124 if (val == NULL) {
125 e->req->reply_err(e->req, ERR_MEM);
126 return;
128 rv = db->get(db, e->key, e->ksize, val, &vsize);
129 if (rv == 0) {
130 e->req->reply_mini(e->req, REP_NOTIN);
131 free(val);
132 return;
134 e->req->reply_long(e->req, REP_OK, val, vsize);
135 free(val);
137 } else if (e->operation == REQ_DEL) {
138 rv = db->del(db, e->key, e->ksize);
139 if (!(e->req->flags & FLAGS_SYNC))
140 return;
142 if (rv == 0) {
143 e->req->reply_mini(e->req, REP_NOTIN);
144 return;
146 e->req->reply_mini(e->req, REP_OK);
148 } else if (e->operation == REQ_CAS) {
149 unsigned char *dbval;
150 size_t dbvsize = 64 * 1024;
152 /* Compare */
153 dbval = malloc(dbvsize);
154 if (dbval == NULL) {
155 e->req->reply_err(e->req, ERR_MEM);
156 return;
158 rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
159 if (rv == 0) {
160 e->req->reply_mini(e->req, REP_NOTIN);
161 free(dbval);
162 return;
165 if (e->vsize == dbvsize &&
166 memcmp(e->val, dbval, dbvsize) == 0) {
167 /* Swap */
168 rv = db->set(db, e->key, e->ksize,
169 e->newval, e->nvsize);
170 if (!rv) {
171 e->req->reply_err(e->req, ERR_DB);
172 return;
175 e->req->reply_mini(e->req, REP_OK);
176 free(dbval);
177 return;
180 e->req->reply_mini(e->req, REP_NOMATCH);
181 free(dbval);
183 } else if (e->operation == REQ_INCR) {
184 unsigned char *dbval;
185 size_t dbvsize = 64 * 1024;
186 int64_t intval;
188 dbval = malloc(dbvsize);
189 if (dbval == NULL) {
190 e->req->reply_err(e->req, ERR_MEM);
191 return;
193 rv = db->get(db, e->key, e->ksize, dbval, &dbvsize);
194 if (rv == 0) {
195 e->req->reply_mini(e->req, REP_NOTIN);
196 free(dbval);
197 return;
200 /* val must be NULL terminated; see cache_incr() */
201 if (dbval && dbval[dbvsize - 1] != '\0') {
202 e->req->reply_mini(e->req, REP_NOMATCH);
203 free(dbval);
204 return;
207 intval = strtoll((char *) dbval, NULL, 10);
208 intval = intval + * (int64_t *) e->val;
210 if (dbvsize < 24) {
211 /* We know dbval is long enough because we've
212 * allocated it, so we only change dbvsize */
213 dbvsize = 24;
216 snprintf((char *) dbval, dbvsize, "%23lld",
217 (long long int) intval);
219 rv = db->set(db, e->key, e->ksize, dbval, dbvsize);
220 if (!rv) {
221 e->req->reply_err(e->req, ERR_DB);
222 return;
225 intval = htonll(intval);
226 e->req->reply_long(e->req, REP_OK,
227 (unsigned char *) &intval, sizeof(intval));
229 free(dbval);
231 } else if (e->operation == REQ_FIRSTKEY) {
232 unsigned char *key;
233 size_t ksize = 64 * 1024;
235 if (db->firstkey == NULL) {
236 e->req->reply_err(e->req, ERR_DB);
237 return;
240 key = malloc(ksize);
241 if (key == NULL) {
242 e->req->reply_err(e->req, ERR_MEM);
243 return;
245 rv = db->firstkey(db, key, &ksize);
246 if (rv == 0) {
247 e->req->reply_mini(e->req, REP_NOTIN);
248 free(key);
249 return;
251 e->req->reply_long(e->req, REP_OK, key, ksize);
252 free(key);
254 } else if (e->operation == REQ_NEXTKEY) {
255 unsigned char *newkey;
256 size_t nksize = 64 * 1024;
258 if (db->nextkey == NULL) {
259 e->req->reply_err(e->req, ERR_DB);
260 return;
263 newkey = malloc(nksize);
264 if (newkey == NULL) {
265 e->req->reply_err(e->req, ERR_MEM);
266 return;
268 rv = db->nextkey(db, e->key, e->ksize, newkey, &nksize);
269 if (rv == 0) {
270 e->req->reply_mini(e->req, REP_NOTIN);
271 free(newkey);
272 return;
274 e->req->reply_long(e->req, REP_OK, newkey, nksize);
275 free(newkey);
276 } else {
277 wlog("Unknown op 0x%x\n", e->operation);