tools/kq_sendrecv: Fix building on FreeBSD
[dragonfly.git] / tools / tools / netrate / kq_sendrecv / kq_recvserv / kq_recvserv.c
blob7c46c96522a535c243579641cdc75692385a98e0
1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
21 #include "kq_sendrecv_proto.h"
23 #define RECV_EVENT_MAX 64
24 #define RECV_BUFLEN (128 * 1024)
26 struct recv_thrctx {
27 int t_id;
28 struct sockaddr_in t_in;
30 pthread_mutex_t t_lock;
31 pthread_cond_t t_cond;
33 pthread_t t_tid;
36 static void *recv_thread(void *);
38 static int recv_buflen = RECV_BUFLEN;
39 static int recv_reuseport = 0;
40 static int recv_bindcpu = 0;
42 static void
43 usage(const char *cmd)
45 fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] [-B] "
46 "[-b buflen]\n", cmd);
47 exit(2);
50 int
51 main(int argc, char *argv[])
53 struct recv_thrctx *ctx_arr;
54 struct recv_info *info;
55 struct sockaddr_in in;
56 sigset_t sigset;
57 int opt, s, on, nthr, i, info_sz, do_daemon;
58 size_t sz;
60 sigemptyset(&sigset);
61 sigaddset(&sigset, SIGPIPE);
62 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
63 err(1, "sigprocmask failed");
65 sz = sizeof(nthr);
66 if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
67 err(1, "sysctl hw.ncpu failed");
69 memset(&in, 0, sizeof(in));
70 in.sin_family = AF_INET;
71 in.sin_addr.s_addr = htonl(INADDR_ANY);
72 in.sin_port = htons(RECV_PORT);
74 do_daemon = 1;
76 while ((opt = getopt(argc, argv, "4:BDRb:p:t:")) != -1) {
77 switch (opt) {
78 case '4':
79 if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
80 errx(1, "inet_pton failed %s", optarg);
81 break;
83 case 'B':
84 recv_bindcpu = 1;
85 break;
87 case 'D':
88 do_daemon = 0;
89 break;
91 case 'R':
92 #ifdef __DragonFly__
93 recv_reuseport = 1;
94 #else
95 /* Not supported on other BSDs */
96 #endif
97 break;
99 case 'b':
100 recv_buflen = strtol(optarg, NULL, 10);
101 if (recv_buflen <= 0)
102 errx(1, "invalid -b");
103 break;
105 case 'p':
106 in.sin_port = htons(strtoul(optarg, NULL, 10));
107 break;
109 case 't':
110 nthr = strtol(optarg, NULL, 10);
111 if (nthr <= 0)
112 errx(1, "invalid -t");
113 break;
115 default:
116 usage(argv[0]);
120 s = socket(AF_INET, SOCK_STREAM, 0);
121 if (s < 0)
122 err(1, "socket failed");
124 on = 1;
125 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
126 err(1, "setsockopt(REUSEPADDR) failed");
128 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
129 err(1, "bind failed");
131 if (listen(s, -1) < 0)
132 err(1, "listen failed");
134 ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
135 if (ctx_arr == NULL)
136 err(1, "calloc failed");
138 info_sz = __offsetof(struct recv_info, dport[nthr]);
139 info = calloc(1, info_sz);
140 if (info == NULL)
141 err(1, "calloc failed");
142 info->ndport = nthr;
144 if (do_daemon)
145 daemon(0, 0);
147 pthread_set_name_np(pthread_self(), "main");
149 for (i = 0; i < nthr; ++i) {
150 struct recv_thrctx *ctx = &ctx_arr[i];
151 int error;
153 ctx->t_in = in;
154 ctx->t_in.sin_port = 0;
156 ctx->t_id = i;
157 pthread_mutex_init(&ctx->t_lock, NULL);
158 pthread_cond_init(&ctx->t_cond, NULL);
160 /* Start receiver */
161 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
162 if (error)
163 errc(1, error, "pthread_create %d failed", i);
166 * Wait for the receiver to select a proper data port
167 * and start a listen socket on the data port.
169 pthread_mutex_lock(&ctx->t_lock);
170 while (ctx->t_in.sin_port == 0)
171 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
172 pthread_mutex_unlock(&ctx->t_lock);
174 info->dport[i] = ctx->t_in.sin_port;
178 * Send information, e.g. data ports, back to the clients.
180 for (;;) {
181 int s1;
183 s1 = accept(s, NULL, NULL);
184 if (s1 < 0)
185 continue;
186 write(s1, info, info_sz);
187 close(s1);
190 /* NEVER REACHED */
191 exit(0);
194 static void *
195 recv_thread(void *xctx)
197 struct recv_thrctx *ctx = xctx;
198 struct kevent change_evt0[RECV_EVENT_MAX];
199 struct conn_ack ack;
200 uint8_t *buf;
201 char name[32];
202 u_short port;
203 int s, kq, nchange;
206 * Select a proper data port and create a listen socket on it.
208 if (recv_reuseport)
209 port = RECV_PORT;
210 else
211 port = RECV_PORT + ctx->t_id;
212 for (;;) {
213 struct sockaddr_in in = ctx->t_in;
214 int on;
216 ++port;
217 if (port < RECV_PORT)
218 errx(1, "failed to find a data port");
219 in.sin_port = htons(port);
221 s = socket(AF_INET, SOCK_STREAM, 0);
222 if (s < 0)
223 err(1, "socket failed");
225 on = 1;
226 if (recv_reuseport) {
227 if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on,
228 sizeof(on)))
229 err(1, "setsockopt(REUSEPORT) failed");
230 } else {
231 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on,
232 sizeof(on)))
233 err(1, "setsockopt(REUSEADDR) failed");
236 on = 1;
237 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
238 err(1, "ioctl(FIONBIO) failed");
240 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
241 close(s);
242 continue;
245 if (listen(s, -1) < 0)
246 err(1, "listen failed");
248 if (recv_bindcpu) {
249 int cpu = -1, error;
250 #ifdef __FreeBSD__
251 cpuset_t mask;
252 #else
253 cpu_set_t mask;
254 #endif
256 #ifdef __DragonFly__
257 if (recv_reuseport) {
258 socklen_t olen;
260 olen = sizeof(cpu);
261 if (getsockopt(s, SOL_SOCKET, SO_CPUHINT,
262 &cpu, &olen) < 0)
263 err(1, "getsockopt(CPUHINT) failed");
265 #endif
266 if (cpu < 0) {
267 int ncpus;
268 size_t len;
270 len = sizeof(ncpus);
271 if (sysctlbyname("hw.ncpu", &ncpus, &len,
272 NULL, 0) < 0)
273 err(1, "sysctlbyname hw.ncpu failed");
274 cpu = ctx->t_id % ncpus;
277 CPU_ZERO(&mask);
278 CPU_SET(cpu, &mask);
279 error = pthread_setaffinity_np(pthread_self(),
280 sizeof(mask), &mask);
281 if (error) {
282 errc(1, error, "pthread_setaffinity_np cpu%d "
283 "failed", cpu);
286 break;
289 kq = kqueue();
290 if (kq < 0)
291 err(1, "kqueue failed");
293 buf = malloc(recv_buflen);
294 if (buf == NULL)
295 err(1, "malloc %d failed", recv_buflen);
297 memset(&ack, 0, sizeof(ack));
299 snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
300 pthread_set_name_np(pthread_self(), name);
303 * Inform the main thread that we are ready.
305 pthread_mutex_lock(&ctx->t_lock);
306 ctx->t_in.sin_port = htons(port);
307 pthread_mutex_unlock(&ctx->t_lock);
308 pthread_cond_signal(&ctx->t_cond);
310 EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
311 nchange = 1;
313 for (;;) {
314 const struct kevent *change_evt = NULL;
315 struct kevent evt[RECV_EVENT_MAX];
316 int nevt, i;
318 if (nchange > 0)
319 change_evt = change_evt0;
321 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
322 NULL);
323 if (nevt < 0)
324 err(1, "kevent failed");
325 nchange = 0;
327 for (i = 0; i < nevt; ++i) {
328 int n;
330 if (evt[i].ident == (u_int)s) {
331 while (nchange < RECV_EVENT_MAX) {
332 int s1;
334 s1 = accept(s, NULL, NULL);
335 if (s1 < 0)
336 break;
338 /* TODO: keepalive */
340 n = write(s1, &ack, sizeof(ack));
341 if (n != sizeof(ack)) {
342 close(s1);
343 continue;
346 EV_SET(&change_evt0[nchange], s1,
347 EVFILT_READ, EV_ADD, 0, 0, NULL);
348 ++nchange;
350 } else {
351 n = read(evt[i].ident, buf, recv_buflen);
352 if (n <= 0) {
353 if (n == 0 || errno != EAGAIN)
354 close(evt[i].ident);
360 /* NEVER REACHED */
361 return NULL;