implement missing function in buffer.c
[libdht.git] / dht.c
blob092082706308828d4c597bc1e7ecf332e694e79a
1 /*
2 * Copyright (c) 2016 Mohamed Aslan <maslan@sce.carleton.ca>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 #include <stdlib.h>
18 #include <string.h>
19 #include <sys/types.h>
20 #include <sys/event.h>
21 #include <sys/time.h>
22 #include <netinet/in.h>
23 #include <unistd.h>
24 #include <arpa/inet.h>
26 #include <dht.h>
27 #include <murmur3.h>
28 #include <hashtab.h>
30 #define TCPBACKLOG 500
32 #define POSTINIT_WAIT 0
33 #define POSTINIT_TID 0
35 #define MAX_PEERS 64
37 #include <stdio.h>
40 static int
41 ht_init(void **ht)
43 struct hashtab *hts;
45 if (ht == NULL)
46 return 0;
47 /* allocate two hashtabales one for data and one for timestamps */
48 hts = (struct hashtab *)calloc(2, sizeof(struct hashtab));
49 if (!hashtab_init(&hts[0], 16, NULL))
50 return 0;
51 if (!hashtab_init(&hts[1], 16, NULL))
52 return 0;
53 *ht = (void *)(hts);
54 return 1;
57 static int
58 ht_put(void *ht, const char *key, const char *val, struct timespec *ts)
60 struct hashtab *hts = (struct hashtab *)ht;
62 if (!hashtab_put(&hts[0], (void *)key, strlen(key) + 1, (void *)val, strlen(val) + 1))
63 return 0;
64 if (!hashtab_put(&hts[1], (void *)key, strlen(key) + 1, (void *)ts, sizeof(struct timespec)))
65 return 0;
66 return 1;
69 static int
70 ht_get(void *ht, const char *key, char **val, struct timespec **ts)
72 size_t len;
73 struct hashtab *hts = (struct hashtab *)ht;
75 if (!hashtab_get(&hts[0], (void *)key, strlen(key) + 1, (void **)val, &len))
76 return 0;
77 if (!hashtab_get(&hts[1], (void *)key, strlen(key) + 1, (void **)ts, &len))
78 return 0;
79 return 1;
82 static int
83 ht_del(void *ht, const char *key)
85 struct hashtab *hts = (struct hashtab *)ht;
87 if (!hashtab_del(&hts[0], (void *)key, strlen(key) + 1))
88 return 0;
89 if (!hashtab_del(&hts[1], (void *)key, strlen(key) + 1))
90 return 0;
91 return 1;
94 static uint64_t
95 murmur3_partitioner(const char *key)
97 uint64_t token[2];
99 murmurhash3_x64_128(key, strlen(key), 0, token);
100 return token[0];
103 static struct dht_options defopts = {
104 .partitioner = murmur3_partitioner,
105 .ht_init = ht_init,
106 .ht_put = ht_put,
107 .ht_get = ht_get,
108 .ht_del = ht_del
112 * - assume array is already sorted
113 * - O(n)
115 static int
116 insert_token_sorted(struct dht_peer peers[], int *n, uint64_t token)
118 int i, j;
120 for (i = 0 ; i < *n ; i++) {
121 if (peers[i].token > token)
122 break;
124 for (j = *n ; j > i ; j--) {
125 peers[j] = peers[j - 1];
127 peers[i].token = token;
128 ++*n;
129 return i;
133 * - slightly modified binary search with wrapping range
134 * - O(log n)
136 static int
137 find_coordinator(struct dht_peer peers[], int n, uint64_t token)
139 int min = 0, max = n - 1, mid = 0;
141 if (peers[max].token < token)
142 return min;
143 while (min <= max) {
144 mid = min + ((max - min) / 2);
145 if (peers[mid].token == token)
146 return mid;
147 if (peers[mid].token < token)
148 min = mid + 1;
149 else
150 max = mid - 1;
152 #ifdef DEBUG
153 printf("min = %d, mid = %d, max = %d\n", min, mid, max);
154 #endif
155 return min;
158 #ifdef DEBUG
159 static void
160 print(struct dht_peer peers[], int n)
162 int i;
164 for (i = 0 ; i < n ; i++)
165 printf("0x%llx ", peers[i].token);
166 printf("\n");
168 #endif
171 dht_init(struct dht_node *node, const char *id, int port, int n, uint32_t flags, struct dht_options *opts)
173 int s_type;
174 struct kevent kev;
176 if (node == NULL || id == NULL)
177 return 0;
178 node->id = strdup(id);
179 node->port = port;
180 node->n_replicas = n;
181 node->flags = flags;
182 if (opts == NULL)
183 node->opts = &defopts;
184 if ((node->ev_queue = kqueue()) == -1)
185 return 0;
186 if (!(node->flags & DHT_USEUDP)) /* TCP */
187 s_type = SOCK_STREAM;
188 else /* UDP */
189 s_type = SOCK_DGRAM;
190 if ((node->socket = socket(AF_INET, s_type, 0)) < 0)
191 return 0;
192 /* for now assume ipv4 */
193 memset((void *)&(node->saddr), 0, sizeof(struct sockaddr_in));
194 ((struct sockaddr_in *)&(node->saddr))->sin_family = AF_INET;
195 ((struct sockaddr_in *)&(node->saddr))->sin_port = htons(node->port);
196 ((struct sockaddr_in *)&(node->saddr))->sin_addr.s_addr = htonl(INADDR_ANY);
197 if (bind(node->socket, (struct sockaddr *)&(node->saddr), sizeof(struct sockaddr)) == -1)
198 goto error;
199 if (!(node->flags & DHT_USEUDP))
200 if (listen(node->socket, TCPBACKLOG) == -1)
201 goto error;
202 EV_SET(&kev, POSTINIT_TID, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, POSTINIT_WAIT, NULL);
203 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
204 goto error;
205 EV_SET(&kev, node->socket, EVFILT_READ, EV_ADD, 0, 0, NULL);
206 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
207 goto error;
208 node->peers = reallocarray(NULL, MAX_PEERS, sizeof(struct dht_peer));
209 node->token = node->opts->partitioner(node->id);
210 node->n_peers = 0;
211 insert_token_sorted(node->peers, &node->n_peers, node->token);
212 if (!node->opts->ht_init(&node->ht))
213 goto error;
214 node->ready = 1;
215 return 1;
216 error:
217 close(node->socket);
218 return 0;
221 void
222 dht_add_peer(struct dht_node *node, const char *id, const char *ip, int port)
224 uint64_t token;
225 int i, s_type;
227 if (node == NULL || id == NULL || ip == NULL)
228 return;
229 if (node->n_peers == MAX_PEERS)
230 return;
231 token = node->opts->partitioner(id);
232 i = insert_token_sorted(node->peers, &node->n_peers, token);
233 node->peers[i].id = strdup(id);
234 node->peers[i].last_recv = 0;
235 node->peers[i].ready = 0;
236 #ifdef DEBUG
237 print(node->peers, node->n_peers);
238 #endif
239 if (!(node->flags & DHT_USEUDP)) /* TCP */
240 s_type = SOCK_STREAM;
241 else /* UDP */
242 s_type = SOCK_DGRAM;
243 if ((node->peers[i].socket = socket(AF_INET, s_type, 0)) < 0)
244 return;
245 /* for now assume ipv4 */
246 memset((void *)&(node->peers[i].s_addr), 0, sizeof(struct sockaddr_in));
247 ((struct sockaddr_in *)&(node->peers[i].s_addr))->sin_family = AF_INET;
248 ((struct sockaddr_in *)&(node->peers[i].s_addr))->sin_port = htons(port);
249 ((struct sockaddr_in *)&(node->peers[i].s_addr))->sin_addr.s_addr = inet_addr(ip);
252 static void
253 post_init(struct dht_node *node)
255 int i;
257 if (!(node->flags & DHT_USEUDP)) { /* TCP */
258 for (i = 0 ; i < node->n_peers ; i++) {
259 if (node->token == node->peers[i].token) /* do not connect to oneself */
260 continue;
261 if (connect(node->peers[i].socket, (struct sockaddr *)&node->peers[i].s_addr, sizeof(struct sockaddr)) < 0) {
262 printf("[%s] warning connection error\n", node->id);
263 } else {
264 node->peers[i].ready = 1;
271 void
272 dht_event_loop(struct dht_node *node)
274 char *vp;
275 char buf[BUFSIZ + 1];
276 char cmd[BUFSIZ + 1], key[BUFSIZ + 1], val[BUFSIZ + 1];
277 int n, s;
278 int cl;
279 struct kevent kev;
280 struct sockaddr_storage sw_addr;
281 struct timespec ts, *tsp;
282 socklen_t addr_len = sizeof(struct sockaddr_storage);
283 ssize_t bytes;
285 if (node == NULL)
286 return;
288 while (1) {
289 n = kevent(node->ev_queue, NULL, 0, &kev, 1, NULL); /* TODO: use bigger eventlist */
290 if (n < 0)
291 break;
292 if (n == 0)
293 continue;
294 if (kev.filter == EVFILT_TIMER) {
295 s = (int)(kev.ident); /* uintptr_t != int */
296 if (s == POSTINIT_TID)
297 post_init(node);
298 } else if (kev.filter == EVFILT_READ) {
299 s = (int)(kev.ident); /* uintptr_t != int */
300 if ((s == node->socket) && !(node->flags & DHT_USEUDP)) { /* new connection */
301 cl = accept(s, (struct sockaddr *)&sw_addr, &addr_len);
302 EV_SET(&kev, cl, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, NULL);
303 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
304 break;
305 } else { /* data received */
306 if (!(node->flags & DHT_USEUDP)) { /* TCP */
307 bytes = read(s, buf, sizeof(buf));
308 if (bytes == 0) {
309 EV_SET(&kev, s, EVFILT_READ, EV_DELETE, 0, 0, NULL);
310 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
311 break;
312 printf("[%s] peer disconnected.\n", node->id);
313 } else {
314 #ifdef DEBUG
315 printf("[%s] read %zd bytes.\n", node->id, bytes);
316 #endif
317 /* TODO: implement a proper parser */
318 sscanf(buf, "%s", cmd);
319 if (!strncmp(cmd, "PUT", sizeof(cmd))) {
320 sscanf(buf + 4, "%s %lld.%ld %s", key, &ts.tv_sec, &ts.tv_nsec, val); /* unsafe */
321 node->opts->ht_put(node->ht, key, val, &ts);
323 else if (!strncmp(cmd, "GET", sizeof(cmd))) {
324 sscanf(buf + 4, "%s", key); /* unsafe */
325 if (node->opts->ht_get(node->ht, key, &vp, &tsp)) {
326 snprintf(val, sizeof(val), "%lld.%.9ld %s\n", ts.tv_sec, ts.tv_nsec, vp);
327 write(s, val, strlen(val));
329 /* XXX: else */
331 else {
332 printf("[%s] unknown command: %s.\n", node->id, cmd);
335 } else { /* UDP */
336 /* XXX: NOT YET IMPLEMENTED */
337 bytes = recvfrom(s, buf, BUFSIZ, 0, (struct sockaddr *)&sw_addr, &addr_len);
338 if (bytes == 0)
339 break;
340 #ifdef DEBUG
341 else
342 printf("read %zd bytes.\n", bytes);
343 #endif
351 dht_put_tunable(struct dht_node *node, const char *key, const char *val, struct timespec *ts, int w)
353 char *msg;
354 int coord, i = 0, replica, success = 0;
355 uint64_t token;
357 if (node == NULL || key == NULL || val == NULL || ts == NULL)
358 return 0;
359 asprintf(&msg, "PUT %s %lld.%.9ld %s\n", key, ts->tv_sec, ts->tv_nsec, val);
361 token = murmur3_partitioner(key);
362 coord = find_coordinator(node->peers, node->n_peers, token);
363 #ifdef DEBUG
364 printf("[%s] coord for \"%s\" 0x%llx is 0x%llx\n", node->id, key, token, node->peers[coord].token);
365 #endif
367 while (i < node->n_replicas) {
368 replica = (coord + i++) % node->n_peers;
369 #ifdef DEBUG
370 printf("[%s] replica: %d\n", node->id, replica);
371 #endif
372 if (node->peers[replica].token == node->token) {
373 if (node->opts->ht_put(node->ht, key, val, ts))
374 success++;
375 continue;
377 if (!node->peers[replica].ready)
378 continue;
379 if (!(node->flags & DHT_USEUDP)) { /* TCP */
380 if (write(node->peers[replica].socket, msg, strlen(msg)) > 0)
381 success++;
382 } else { /* UDP */
383 if (sendto(node->peers[replica].socket, msg, strlen(msg), 0, (struct sockaddr *)&(node->peers[replica].s_addr), sizeof(struct sockaddr)) > 0)
384 success++;
387 free(msg);
388 #ifdef DEBUG
389 printf("[%s] success: %d\n", node->id, success);
390 #endif
391 if (success >= w)
392 return 1;
393 return 0;
397 dht_get_tunable(struct dht_node *node, const char *key, char **val, int r)
399 char *msg, *tmpval, *recent_val = NULL;
400 char buf[BUFSIZ + 1], vbuf[BUFSIZ + 1];
401 int coord, i = 0, replica, success = 0;
402 struct timespec recent_ts, ts, *tsp;
403 ssize_t bytes;
404 uint64_t token;
406 if (node == NULL || key == NULL || val == NULL)
407 return 0;
408 timespecclear(&recent_ts);
409 asprintf(&msg, "GET %s\n", key);
411 token = murmur3_partitioner(key);
412 coord = find_coordinator(node->peers, node->n_peers, token);
413 #ifdef DEBUG
414 printf("[%s] coord for \"%s\" 0x%llx is 0x%llx\n", node->id, key, token, node->peers[coord].token);
415 #endif
417 while (i < node->n_replicas) {
418 replica = (coord + i++) % node->n_peers;
419 #ifdef DEBUG
420 printf("[%s] replica: %d\n", node->id, replica);
421 #endif
422 if (node->peers[replica].token == node->token) {
423 if (node->opts->ht_get(node->ht, key, &tmpval, &tsp)) {
424 if (timespeccmp(tsp, &recent_ts, >=)) {
425 recent_ts = *tsp;
426 if (recent_val)
427 free(recent_val);
428 asprintf(&recent_val, "%s", tmpval);
430 success++;
432 continue;
434 if (!node->peers[replica].ready)
435 continue;
436 if (!(node->flags & DHT_USEUDP)) { /* TCP */
437 bytes = write(node->peers[replica].socket, msg, strlen(msg));
438 if (bytes <= 0)
439 continue;
440 #ifdef DEBUG
441 printf("[%s] message sent to peer [0x%llx].\n", node->id, node->peers[replica].token);
442 #endif
443 bytes = read(node->peers[replica].socket, buf, sizeof(buf));
444 #ifdef DEBUG
445 printf("[%s] reponse is %zu bytes\n", node->id, bytes);
446 #endif
447 if (bytes <= 0)
448 continue;
449 buf[bytes - 1] = 0;
450 sscanf(buf, "%lld.%ld %s", &ts.tv_sec, &ts.tv_nsec, vbuf);
451 #ifdef DEBUG
452 printf("[%s] GOT val: %s for key: %s.\n", node->id, vbuf, key);
453 #endif
454 if (timespeccmp(&ts, &recent_ts, >=)) {
455 recent_ts = ts;
456 if (recent_val)
457 free(recent_val);
458 asprintf(&recent_val, "%s", vbuf);
460 success++;
461 } else { /* UDP */
462 /* XXX: NOT YET IMPLEMENTED */
463 if (sendto(node->peers[replica].socket, msg, strlen(msg), 0, (struct sockaddr *)&(node->peers[replica].s_addr), sizeof(struct sockaddr)) > 0)
464 success++;
467 if (success)
468 *val = recent_val;
469 free(msg);
470 #ifdef DEBUG
471 printf("[%s] success: %d\n", node->id, success);
472 #endif
473 if (success >= r)
474 return 1;
475 return 0;