2 #include <pthread.h> /* threading functions */
3 #include <time.h> /* nanosleep() */
4 #include <errno.h> /* ETIMEDOUT */
5 #include <string.h> /* memcmp() */
6 #include <stdlib.h> /* malloc()/free() */
7 #include <stdio.h> /* snprintf() */
13 #include "net-const.h"
20 static void *db_loop(void *arg
);
21 static void process_op(struct db_conn
*db
, struct queue_entry
*e
);
24 /* Used to signal the loop that it should exit when the queue becomes empty.
25 * It's not the cleanest way, but it's simple and effective. */
26 static int loop_should_stop
= 0;
29 pthread_t
*db_loop_start(struct db_conn
*db
)
33 thread
= malloc(sizeof(pthread_t
));
37 pthread_create(thread
, NULL
, db_loop
, (void *) db
);
42 void db_loop_stop(pthread_t
*thread
)
45 pthread_join(*thread
, NULL
);
51 static void *db_loop(void *arg
)
55 struct queue_entry
*e
;
58 db
= (struct db_conn
*) arg
;
61 /* Condition waits are specified with absolute timeouts, see
62 * pthread_cond_timedwait()'s SUSv3 specification for more
63 * information. We need to calculate it each time.
64 * We sleep for 1 sec. There's no real need for it to be too
65 * fast (it's only used so that stop detection doesn't take
66 * long), but we don't want it to be too slow either. */
67 clock_gettime(CLOCK_REALTIME
, &ts
);
72 while (queue_isempty(op_queue
) && rv
== 0) {
73 rv
= queue_timedwait(op_queue
, &ts
);
76 if (rv
!= 0 && rv
!= ETIMEDOUT
) {
77 errlog("Error in queue_timedwait()");
78 /* When the timedwait fails the lock is released, so
79 * we need to properly annotate this case. */
80 __release(op_queue
->lock
);
84 e
= queue_get(op_queue
);
85 queue_unlock(op_queue
);
88 if (loop_should_stop
) {
97 /* Free the entry that was allocated when tipc queued the
98 * operation. This also frees it's components. */
105 static void process_op(struct db_conn
*db
, struct queue_entry
*e
)
108 if (e
->operation
== REQ_SET
) {
109 rv
= db
->set(db
, e
->key
, e
->ksize
, e
->val
, e
->vsize
);
110 if (!(e
->req
->flags
& FLAGS_SYNC
))
114 e
->req
->reply_err(e
->req
, ERR_DB
);
117 e
->req
->reply_mini(e
->req
, REP_OK
);
119 } else if (e
->operation
== REQ_GET
) {
121 size_t vsize
= 64 * 1024;
125 e
->req
->reply_err(e
->req
, ERR_MEM
);
128 rv
= db
->get(db
, e
->key
, e
->ksize
, val
, &vsize
);
130 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
134 e
->req
->reply_long(e
->req
, REP_OK
, val
, vsize
);
137 } else if (e
->operation
== REQ_DEL
) {
138 rv
= db
->del(db
, e
->key
, e
->ksize
);
139 if (!(e
->req
->flags
& FLAGS_SYNC
))
143 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
146 e
->req
->reply_mini(e
->req
, REP_OK
);
148 } else if (e
->operation
== REQ_CAS
) {
149 unsigned char *dbval
;
150 size_t dbvsize
= 64 * 1024;
153 dbval
= malloc(dbvsize
);
155 e
->req
->reply_err(e
->req
, ERR_MEM
);
158 rv
= db
->get(db
, e
->key
, e
->ksize
, dbval
, &dbvsize
);
160 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
165 if (e
->vsize
== dbvsize
&&
166 memcmp(e
->val
, dbval
, dbvsize
) == 0) {
168 rv
= db
->set(db
, e
->key
, e
->ksize
,
169 e
->newval
, e
->nvsize
);
171 e
->req
->reply_err(e
->req
, ERR_DB
);
175 e
->req
->reply_mini(e
->req
, REP_OK
);
180 e
->req
->reply_mini(e
->req
, REP_NOMATCH
);
183 } else if (e
->operation
== REQ_INCR
) {
184 unsigned char *dbval
;
185 size_t dbvsize
= 64 * 1024;
188 dbval
= malloc(dbvsize
);
190 e
->req
->reply_err(e
->req
, ERR_MEM
);
193 rv
= db
->get(db
, e
->key
, e
->ksize
, dbval
, &dbvsize
);
195 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
200 /* val must be NULL terminated; see cache_incr() */
201 if (dbval
&& dbval
[dbvsize
- 1] != '\0') {
202 e
->req
->reply_mini(e
->req
, REP_NOMATCH
);
207 intval
= strtoll((char *) dbval
, NULL
, 10);
208 intval
= intval
+ * (int64_t *) e
->val
;
211 /* We know dbval is long enough because we've
212 * allocated it, so we only change dbvsize */
216 snprintf((char *) dbval
, dbvsize
, "%23lld",
217 (long long int) intval
);
219 rv
= db
->set(db
, e
->key
, e
->ksize
, dbval
, dbvsize
);
221 e
->req
->reply_err(e
->req
, ERR_DB
);
225 intval
= htonll(intval
);
226 e
->req
->reply_long(e
->req
, REP_OK
,
227 (unsigned char *) &intval
, sizeof(intval
));
231 } else if (e
->operation
== REQ_FIRSTKEY
) {
233 size_t ksize
= 64 * 1024;
235 if (db
->firstkey
== NULL
) {
236 e
->req
->reply_err(e
->req
, ERR_DB
);
242 e
->req
->reply_err(e
->req
, ERR_MEM
);
245 rv
= db
->firstkey(db
, key
, &ksize
);
247 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
251 e
->req
->reply_long(e
->req
, REP_OK
, key
, ksize
);
254 } else if (e
->operation
== REQ_NEXTKEY
) {
255 unsigned char *newkey
;
256 size_t nksize
= 64 * 1024;
258 if (db
->nextkey
== NULL
) {
259 e
->req
->reply_err(e
->req
, ERR_DB
);
263 newkey
= malloc(nksize
);
264 if (newkey
== NULL
) {
265 e
->req
->reply_err(e
->req
, ERR_MEM
);
268 rv
= db
->nextkey(db
, e
->key
, e
->ksize
, newkey
, &nksize
);
270 e
->req
->reply_mini(e
->req
, REP_NOTIN
);
274 e
->req
->reply_long(e
->req
, REP_OK
, newkey
, nksize
);
277 wlog("Unknown op 0x%x\n", e
->operation
);