initial import
[libdht.git] / dht.c
blob73091d35726079934fd393d5564972ded63bf162
1 /*
2 * Copyright (c) 2016 Mohamed Aslan <maslan@sce.carleton.ca>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 #include <string.h>
18 #include <sys/types.h>
19 #include <sys/event.h>
20 #include <sys/time.h>
21 #include <netinet/in.h>
22 #include <unistd.h>
24 #include <dht.h>
26 #define TCPBACKLOG 500
28 #define POSTINIT_WAIT 0
29 #define POSTINIT_TID 0
31 #include <stdio.h>
33 static struct dht_options defopts = {
34 .partitioner = NULL,
35 .ht_init = NULL,
36 .ht_put = NULL,
37 .ht_get = NULL,
38 .ht_del = NULL
41 int
42 dht_init(struct dht_node *node, int port, int n, uint32_t flags, struct dht_options *opts)
44 int s_type;
45 struct kevent kev;
47 if (node == NULL)
48 return 0;
49 node->port = port;
50 node->n_replica = n;
51 node->flags = flags;
52 if (opts == NULL)
53 node->opts = &defopts;
54 if ((node->ev_queue = kqueue()) == -1)
55 return 0;
56 if (!(node->flags & DHT_USEUDP)) /* TCP */
57 s_type = SOCK_STREAM;
58 else /* UDP */
59 s_type = SOCK_DGRAM;
60 if ((node->socket = socket(AF_INET, s_type, 0)) < 0)
61 return 0;
62 /* for now assume ipv4 */
63 memset((void *)&(node->saddr), 0, sizeof(struct sockaddr_in));
64 ((struct sockaddr_in *)&(node->saddr))->sin_family = AF_INET;
65 ((struct sockaddr_in *)&(node->saddr))->sin_port = htons(node->port);
66 ((struct sockaddr_in *)&(node->saddr))->sin_addr.s_addr = htonl(INADDR_ANY);
67 if (bind(node->socket, (struct sockaddr *)&(node->saddr), sizeof(struct sockaddr)) == -1)
68 goto error;
69 if (!(node->flags & DHT_USEUDP))
70 if (listen(node->socket, TCPBACKLOG) == -1)
71 goto error;
72 EV_SET(&kev, POSTINIT_TID, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, POSTINIT_WAIT, NULL);
73 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
74 goto error;
75 EV_SET(&kev, node->socket, EVFILT_READ, EV_ADD, 0, 0, NULL);
76 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
77 goto error;
78 node->ready = 1;
79 return 1;
80 error:
81 close(node->socket);
82 return 0;
85 void
86 dht_add_peer(struct dht_node *node, const char *ip, int port)
91 static void
92 post_init(struct dht_node *node)
98 void
99 dht_event_loop(struct dht_node *node)
101 char buf[BUFSIZ + 1];
102 int n, s;
103 int cl;
104 struct kevent kev;
105 struct sockaddr_storage sw_addr;
106 socklen_t addr_len = sizeof(struct sockaddr_storage);
107 ssize_t bytes;
109 if (node == NULL)
110 return;
112 while (1) {
113 n = kevent(node->ev_queue, NULL, 0, &kev, 1, NULL); /* TODO: use bigger eventlist */
114 if (n < 0)
115 break;
116 if (n == 0)
117 continue;
118 if (kev.filter == EVFILT_TIMER) {
119 s = (int)(kev.ident); /* uintptr_t != int */
120 if (s == POSTINIT_TID)
121 post_init(node);
122 } else if (kev.filter == EVFILT_READ) {
123 s = (int)(kev.ident); /* uintptr_t != int */
124 if ((s == node->socket) && !(node->flags & DHT_USEUDP)) { /* new connection */
125 cl = accept(s, (struct sockaddr *)&sw_addr, &addr_len);
126 EV_SET(&kev, cl, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, NULL);
127 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
128 break;
129 } else { /* data received */
130 if (!(node->flags & DHT_USEUDP)) { /* TCP */
131 bytes = read(s, buf, sizeof(buf));
132 if (bytes == 0) {
133 EV_SET(&kev, s, EVFILT_READ, EV_DELETE, 0, 0, NULL);
134 if ((kevent(node->ev_queue, &kev, 1, NULL, 0, NULL)) == -1)
135 break;
136 } else {
137 printf("read %zd bytes.\n", bytes);
139 } else { /* UDP */
140 bytes = recvfrom(s, buf, BUFSIZ, 0, (struct sockaddr *)&sw_addr, &addr_len);
141 if (bytes == 0)
142 break;
143 else
144 printf("read %zd bytes.\n", bytes);