2 * Copyright (c) 2016 Mohamed Aslan <maslan@sce.carleton.ca>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19 #include <sys/types.h>
20 #include <sys/event.h>
22 #include <netinet/in.h>
24 #include <arpa/inet.h>
30 #define TCPBACKLOG 500
32 #define POSTINIT_WAIT 0
33 #define POSTINIT_TID 0
47 /* allocate two hashtabales one for data and one for timestamps */
48 hts
= (struct hashtab
*)calloc(2, sizeof(struct hashtab
));
49 if (!hashtab_init(&hts
[0], 16, NULL
))
51 if (!hashtab_init(&hts
[1], 16, NULL
))
58 ht_put(void *ht
, const char *key
, const char *val
, struct timespec
*ts
)
60 struct hashtab
*hts
= (struct hashtab
*)ht
;
62 if (!hashtab_put(&hts
[0], (void *)key
, strlen(key
) + 1, (void *)val
, strlen(val
) + 1))
64 if (!hashtab_put(&hts
[1], (void *)key
, strlen(key
) + 1, (void *)ts
, sizeof(struct timespec
)))
70 ht_get(void *ht
, const char *key
, char **val
, struct timespec
**ts
)
73 struct hashtab
*hts
= (struct hashtab
*)ht
;
75 if (!hashtab_get(&hts
[0], (void *)key
, strlen(key
) + 1, (void **)val
, &len
))
77 if (!hashtab_get(&hts
[1], (void *)key
, strlen(key
) + 1, (void **)ts
, &len
))
83 ht_del(void *ht
, const char *key
)
85 struct hashtab
*hts
= (struct hashtab
*)ht
;
87 if (!hashtab_del(&hts
[0], (void *)key
, strlen(key
) + 1))
89 if (!hashtab_del(&hts
[1], (void *)key
, strlen(key
) + 1))
95 murmur3_partitioner(const char *key
)
99 murmurhash3_x64_128(key
, strlen(key
), 0, token
);
103 static struct dht_options defopts
= {
104 .partitioner
= murmur3_partitioner
,
112 * - assume array is already sorted
116 insert_token_sorted(struct dht_peer peers
[], int *n
, uint64_t token
)
120 for (i
= 0 ; i
< *n
; i
++) {
121 if (peers
[i
].token
> token
)
124 for (j
= *n
; j
> i
; j
--) {
125 peers
[j
] = peers
[j
- 1];
127 peers
[i
].token
= token
;
133 * - slightly modified binary search with wrapping range
137 find_coordinator(struct dht_peer peers
[], int n
, uint64_t token
)
139 int min
= 0, max
= n
- 1, mid
= 0;
141 if (peers
[max
].token
< token
)
144 mid
= min
+ ((max
- min
) / 2);
145 if (peers
[mid
].token
== token
)
147 if (peers
[mid
].token
< token
)
153 printf("min = %d, mid = %d, max = %d\n", min
, mid
, max
);
160 print(struct dht_peer peers
[], int n
)
164 for (i
= 0 ; i
< n
; i
++)
165 printf("0x%llx ", peers
[i
].token
);
171 dht_init(struct dht_node
*node
, const char *id
, int port
, int n
, uint32_t flags
, struct dht_options
*opts
)
176 if (node
== NULL
|| id
== NULL
)
178 node
->id
= strdup(id
);
180 node
->n_replicas
= n
;
183 node
->opts
= &defopts
;
184 if ((node
->ev_queue
= kqueue()) == -1)
186 if (!(node
->flags
& DHT_USEUDP
)) /* TCP */
187 s_type
= SOCK_STREAM
;
190 if ((node
->socket
= socket(AF_INET
, s_type
, 0)) < 0)
192 /* for now assume ipv4 */
193 memset((void *)&(node
->saddr
), 0, sizeof(struct sockaddr_in
));
194 ((struct sockaddr_in
*)&(node
->saddr
))->sin_family
= AF_INET
;
195 ((struct sockaddr_in
*)&(node
->saddr
))->sin_port
= htons(node
->port
);
196 ((struct sockaddr_in
*)&(node
->saddr
))->sin_addr
.s_addr
= htonl(INADDR_ANY
);
197 if (bind(node
->socket
, (struct sockaddr
*)&(node
->saddr
), sizeof(struct sockaddr
)) == -1)
199 if (!(node
->flags
& DHT_USEUDP
))
200 if (listen(node
->socket
, TCPBACKLOG
) == -1)
202 EV_SET(&kev
, POSTINIT_TID
, EVFILT_TIMER
, EV_ADD
| EV_ENABLE
| EV_ONESHOT
, 0, POSTINIT_WAIT
, NULL
);
203 if ((kevent(node
->ev_queue
, &kev
, 1, NULL
, 0, NULL
)) == -1)
205 EV_SET(&kev
, node
->socket
, EVFILT_READ
, EV_ADD
, 0, 0, NULL
);
206 if ((kevent(node
->ev_queue
, &kev
, 1, NULL
, 0, NULL
)) == -1)
208 node
->peers
= reallocarray(NULL
, MAX_PEERS
, sizeof(struct dht_peer
));
209 node
->token
= node
->opts
->partitioner(node
->id
);
211 insert_token_sorted(node
->peers
, &node
->n_peers
, node
->token
);
212 if (!node
->opts
->ht_init(&node
->ht
))
222 dht_add_peer(struct dht_node
*node
, const char *id
, const char *ip
, int port
)
227 if (node
== NULL
|| id
== NULL
|| ip
== NULL
)
229 if (node
->n_peers
== MAX_PEERS
)
231 token
= node
->opts
->partitioner(id
);
232 i
= insert_token_sorted(node
->peers
, &node
->n_peers
, token
);
233 node
->peers
[i
].id
= strdup(id
);
234 node
->peers
[i
].last_recv
= 0;
235 node
->peers
[i
].ready
= 0;
237 print(node
->peers
, node
->n_peers
);
239 if (!(node
->flags
& DHT_USEUDP
)) /* TCP */
240 s_type
= SOCK_STREAM
;
243 if ((node
->peers
[i
].socket
= socket(AF_INET
, s_type
, 0)) < 0)
245 /* for now assume ipv4 */
246 memset((void *)&(node
->peers
[i
].s_addr
), 0, sizeof(struct sockaddr_in
));
247 ((struct sockaddr_in
*)&(node
->peers
[i
].s_addr
))->sin_family
= AF_INET
;
248 ((struct sockaddr_in
*)&(node
->peers
[i
].s_addr
))->sin_port
= htons(port
);
249 ((struct sockaddr_in
*)&(node
->peers
[i
].s_addr
))->sin_addr
.s_addr
= inet_addr(ip
);
253 post_init(struct dht_node
*node
)
257 if (!(node
->flags
& DHT_USEUDP
)) { /* TCP */
258 for (i
= 0 ; i
< node
->n_peers
; i
++) {
259 if (node
->token
== node
->peers
[i
].token
) /* do not connect to oneself */
261 if (connect(node
->peers
[i
].socket
, (struct sockaddr
*)&node
->peers
[i
].s_addr
, sizeof(struct sockaddr
)) < 0) {
262 printf("[%s] warning connection error\n", node
->id
);
264 node
->peers
[i
].ready
= 1;
272 dht_event_loop(struct dht_node
*node
)
275 char buf
[BUFSIZ
+ 1];
276 char cmd
[BUFSIZ
+ 1], key
[BUFSIZ
+ 1], val
[BUFSIZ
+ 1];
280 struct sockaddr_storage sw_addr
;
281 struct timespec ts
, *tsp
;
282 socklen_t addr_len
= sizeof(struct sockaddr_storage
);
289 n
= kevent(node
->ev_queue
, NULL
, 0, &kev
, 1, NULL
); /* TODO: use bigger eventlist */
294 if (kev
.filter
== EVFILT_TIMER
) {
295 s
= (int)(kev
.ident
); /* uintptr_t != int */
296 if (s
== POSTINIT_TID
)
298 } else if (kev
.filter
== EVFILT_READ
) {
299 s
= (int)(kev
.ident
); /* uintptr_t != int */
300 if ((s
== node
->socket
) && !(node
->flags
& DHT_USEUDP
)) { /* new connection */
301 cl
= accept(s
, (struct sockaddr
*)&sw_addr
, &addr_len
);
302 EV_SET(&kev
, cl
, EVFILT_READ
, EV_ADD
| EV_CLEAR
, 0, 0, NULL
);
303 if ((kevent(node
->ev_queue
, &kev
, 1, NULL
, 0, NULL
)) == -1)
305 } else { /* data received */
306 if (!(node
->flags
& DHT_USEUDP
)) { /* TCP */
307 bytes
= read(s
, buf
, sizeof(buf
));
309 EV_SET(&kev
, s
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
310 if ((kevent(node
->ev_queue
, &kev
, 1, NULL
, 0, NULL
)) == -1)
312 printf("[%s] peer disconnected.\n", node
->id
);
315 printf("[%s] read %zd bytes.\n", node
->id
, bytes
);
317 /* TODO: implement a proper parser */
318 sscanf(buf
, "%s", cmd
);
319 if (!strncmp(cmd
, "PUT", sizeof(cmd
))) {
320 sscanf(buf
+ 4, "%s %lld.%ld %s", key
, &ts
.tv_sec
, &ts
.tv_nsec
, val
); /* unsafe */
321 node
->opts
->ht_put(node
->ht
, key
, val
, &ts
);
323 else if (!strncmp(cmd
, "GET", sizeof(cmd
))) {
324 sscanf(buf
+ 4, "%s", key
); /* unsafe */
325 if (node
->opts
->ht_get(node
->ht
, key
, &vp
, &tsp
)) {
326 snprintf(val
, sizeof(val
), "%lld.%.9ld %s\n", ts
.tv_sec
, ts
.tv_nsec
, vp
);
327 write(s
, val
, strlen(val
));
332 printf("[%s] unknown command: %s.\n", node
->id
, cmd
);
336 /* XXX: NOT YET IMPLEMENTED */
337 bytes
= recvfrom(s
, buf
, BUFSIZ
, 0, (struct sockaddr
*)&sw_addr
, &addr_len
);
342 printf("read %zd bytes.\n", bytes
);
351 dht_put_tunable(struct dht_node
*node
, const char *key
, const char *val
, struct timespec
*ts
, int w
)
354 int coord
, i
= 0, replica
, success
= 0;
357 if (node
== NULL
|| key
== NULL
|| val
== NULL
|| ts
== NULL
)
359 asprintf(&msg
, "PUT %s %lld.%.9ld %s\n", key
, ts
->tv_sec
, ts
->tv_nsec
, val
);
361 token
= murmur3_partitioner(key
);
362 coord
= find_coordinator(node
->peers
, node
->n_peers
, token
);
364 printf("[%s] coord for \"%s\" 0x%llx is 0x%llx\n", node
->id
, key
, token
, node
->peers
[coord
].token
);
367 while (i
< node
->n_replicas
) {
368 replica
= (coord
+ i
++) % node
->n_peers
;
370 printf("[%s] replica: %d\n", node
->id
, replica
);
372 if (node
->peers
[replica
].token
== node
->token
) {
373 if (node
->opts
->ht_put(node
->ht
, key
, val
, ts
))
377 if (!node
->peers
[replica
].ready
)
379 if (!(node
->flags
& DHT_USEUDP
)) { /* TCP */
380 if (write(node
->peers
[replica
].socket
, msg
, strlen(msg
)) > 0)
383 if (sendto(node
->peers
[replica
].socket
, msg
, strlen(msg
), 0, (struct sockaddr
*)&(node
->peers
[replica
].s_addr
), sizeof(struct sockaddr
)) > 0)
389 printf("[%s] success: %d\n", node
->id
, success
);
397 dht_get_tunable(struct dht_node
*node
, const char *key
, char **val
, int r
)
399 char *msg
, *tmpval
, *recent_val
= NULL
;
400 char buf
[BUFSIZ
+ 1], vbuf
[BUFSIZ
+ 1];
401 int coord
, i
= 0, replica
, success
= 0;
402 struct timespec recent_ts
, ts
, *tsp
;
406 if (node
== NULL
|| key
== NULL
|| val
== NULL
)
408 timespecclear(&recent_ts
);
409 asprintf(&msg
, "GET %s\n", key
);
411 token
= murmur3_partitioner(key
);
412 coord
= find_coordinator(node
->peers
, node
->n_peers
, token
);
414 printf("[%s] coord for \"%s\" 0x%llx is 0x%llx\n", node
->id
, key
, token
, node
->peers
[coord
].token
);
417 while (i
< node
->n_replicas
) {
418 replica
= (coord
+ i
++) % node
->n_peers
;
420 printf("[%s] replica: %d\n", node
->id
, replica
);
422 if (node
->peers
[replica
].token
== node
->token
) {
423 if (node
->opts
->ht_get(node
->ht
, key
, &tmpval
, &tsp
)) {
424 if (timespeccmp(tsp
, &recent_ts
, >=)) {
428 asprintf(&recent_val
, "%s", tmpval
);
434 if (!node
->peers
[replica
].ready
)
436 if (!(node
->flags
& DHT_USEUDP
)) { /* TCP */
437 bytes
= write(node
->peers
[replica
].socket
, msg
, strlen(msg
));
441 printf("[%s] message sent to peer [0x%llx].\n", node
->id
, node
->peers
[replica
].token
);
443 bytes
= read(node
->peers
[replica
].socket
, buf
, sizeof(buf
));
445 printf("[%s] reponse is %zu bytes\n", node
->id
, bytes
);
450 sscanf(buf
, "%lld.%ld %s", &ts
.tv_sec
, &ts
.tv_nsec
, vbuf
);
452 printf("[%s] GOT val: %s for key: %s.\n", node
->id
, vbuf
, key
);
454 if (timespeccmp(&ts
, &recent_ts
, >=)) {
458 asprintf(&recent_val
, "%s", vbuf
);
462 /* XXX: NOT YET IMPLEMENTED */
463 if (sendto(node
->peers
[replica
].socket
, msg
, strlen(msg
), 0, (struct sockaddr
*)&(node
->peers
[replica
].s_addr
), sizeof(struct sockaddr
)) > 0)
471 printf("[%s] success: %d\n", node
->id
, success
);