4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
8 #include <netinet/in.h>
13 #include <pthread_np.h>
21 #include "kq_sendrecv_proto.h"
23 #define RECV_EVENT_MAX 64
24 #define RECV_BUFLEN (128 * 1024)
28 struct sockaddr_in t_in
;
30 pthread_mutex_t t_lock
;
31 pthread_cond_t t_cond
;
36 static void *recv_thread(void *);
38 static int recv_buflen
= RECV_BUFLEN
;
39 static int recv_reuseport
= 0;
40 static int recv_bindcpu
= 0;
43 usage(const char *cmd
)
45 fprintf(stderr
, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] [-B] "
46 "[-b buflen]\n", cmd
);
51 main(int argc
, char *argv
[])
53 struct recv_thrctx
*ctx_arr
;
54 struct recv_info
*info
;
55 struct sockaddr_in in
;
57 int opt
, s
, on
, nthr
, i
, info_sz
, do_daemon
;
61 sigaddset(&sigset
, SIGPIPE
);
62 if (sigprocmask(SIG_BLOCK
, &sigset
, NULL
) < 0)
63 err(1, "sigprocmask failed");
66 if (sysctlbyname("hw.ncpu", &nthr
, &sz
, NULL
, 0) < 0)
67 err(1, "sysctl hw.ncpu failed");
69 memset(&in
, 0, sizeof(in
));
70 in
.sin_family
= AF_INET
;
71 in
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
72 in
.sin_port
= htons(RECV_PORT
);
76 while ((opt
= getopt(argc
, argv
, "4:BDRb:p:t:")) != -1) {
79 if (inet_pton(AF_INET
, optarg
, &in
.sin_addr
) <= 0)
80 errx(1, "inet_pton failed %s", optarg
);
95 /* Not supported on other BSDs */
100 recv_buflen
= strtol(optarg
, NULL
, 10);
101 if (recv_buflen
<= 0)
102 errx(1, "invalid -b");
106 in
.sin_port
= htons(strtoul(optarg
, NULL
, 10));
110 nthr
= strtol(optarg
, NULL
, 10);
112 errx(1, "invalid -t");
120 s
= socket(AF_INET
, SOCK_STREAM
, 0);
122 err(1, "socket failed");
125 if (setsockopt(s
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
)) < 0)
126 err(1, "setsockopt(REUSEPADDR) failed");
128 if (bind(s
, (const struct sockaddr
*)&in
, sizeof(in
)) < 0)
129 err(1, "bind failed");
131 if (listen(s
, -1) < 0)
132 err(1, "listen failed");
134 ctx_arr
= calloc(nthr
, sizeof(struct recv_thrctx
));
136 err(1, "calloc failed");
138 info_sz
= __offsetof(struct recv_info
, dport
[nthr
]);
139 info
= calloc(1, info_sz
);
141 err(1, "calloc failed");
147 pthread_set_name_np(pthread_self(), "main");
149 for (i
= 0; i
< nthr
; ++i
) {
150 struct recv_thrctx
*ctx
= &ctx_arr
[i
];
154 ctx
->t_in
.sin_port
= 0;
157 pthread_mutex_init(&ctx
->t_lock
, NULL
);
158 pthread_cond_init(&ctx
->t_cond
, NULL
);
161 error
= pthread_create(&ctx
->t_tid
, NULL
, recv_thread
, ctx
);
163 errc(1, error
, "pthread_create %d failed", i
);
166 * Wait for the receiver to select a proper data port
167 * and start a listen socket on the data port.
169 pthread_mutex_lock(&ctx
->t_lock
);
170 while (ctx
->t_in
.sin_port
== 0)
171 pthread_cond_wait(&ctx
->t_cond
, &ctx
->t_lock
);
172 pthread_mutex_unlock(&ctx
->t_lock
);
174 info
->dport
[i
] = ctx
->t_in
.sin_port
;
178 * Send information, e.g. data ports, back to the clients.
183 s1
= accept(s
, NULL
, NULL
);
186 write(s1
, info
, info_sz
);
195 recv_thread(void *xctx
)
197 struct recv_thrctx
*ctx
= xctx
;
198 struct kevent change_evt0
[RECV_EVENT_MAX
];
206 * Select a proper data port and create a listen socket on it.
211 port
= RECV_PORT
+ ctx
->t_id
;
213 struct sockaddr_in in
= ctx
->t_in
;
217 if (port
< RECV_PORT
)
218 errx(1, "failed to find a data port");
219 in
.sin_port
= htons(port
);
221 s
= socket(AF_INET
, SOCK_STREAM
, 0);
223 err(1, "socket failed");
226 if (recv_reuseport
) {
227 if (setsockopt(s
, SOL_SOCKET
, SO_REUSEPORT
, &on
,
229 err(1, "setsockopt(REUSEPORT) failed");
231 if (setsockopt(s
, SOL_SOCKET
, SO_REUSEADDR
, &on
,
233 err(1, "setsockopt(REUSEADDR) failed");
237 if (ioctl(s
, FIONBIO
, &on
, sizeof(on
)) < 0)
238 err(1, "ioctl(FIONBIO) failed");
240 if (bind(s
, (const struct sockaddr
*)&in
, sizeof(in
)) < 0) {
245 if (listen(s
, -1) < 0)
246 err(1, "listen failed");
257 if (recv_reuseport
) {
261 if (getsockopt(s
, SOL_SOCKET
, SO_CPUHINT
,
263 err(1, "getsockopt(CPUHINT) failed");
271 if (sysctlbyname("hw.ncpu", &ncpus
, &len
,
273 err(1, "sysctlbyname hw.ncpu failed");
274 cpu
= ctx
->t_id
% ncpus
;
279 error
= pthread_setaffinity_np(pthread_self(),
280 sizeof(mask
), &mask
);
282 errc(1, error
, "pthread_setaffinity_np cpu%d "
291 err(1, "kqueue failed");
293 buf
= malloc(recv_buflen
);
295 err(1, "malloc %d failed", recv_buflen
);
297 memset(&ack
, 0, sizeof(ack
));
299 snprintf(name
, sizeof(name
), "rcv%d %d", ctx
->t_id
, port
);
300 pthread_set_name_np(pthread_self(), name
);
303 * Inform the main thread that we are ready.
305 pthread_mutex_lock(&ctx
->t_lock
);
306 ctx
->t_in
.sin_port
= htons(port
);
307 pthread_mutex_unlock(&ctx
->t_lock
);
308 pthread_cond_signal(&ctx
->t_cond
);
310 EV_SET(&change_evt0
[0], s
, EVFILT_READ
, EV_ADD
, 0, 0, NULL
);
314 const struct kevent
*change_evt
= NULL
;
315 struct kevent evt
[RECV_EVENT_MAX
];
319 change_evt
= change_evt0
;
321 nevt
= kevent(kq
, change_evt
, nchange
, evt
, RECV_EVENT_MAX
,
324 err(1, "kevent failed");
327 for (i
= 0; i
< nevt
; ++i
) {
330 if (evt
[i
].ident
== (u_int
)s
) {
331 while (nchange
< RECV_EVENT_MAX
) {
334 s1
= accept(s
, NULL
, NULL
);
338 /* TODO: keepalive */
340 n
= write(s1
, &ack
, sizeof(ack
));
341 if (n
!= sizeof(ack
)) {
346 EV_SET(&change_evt0
[nchange
], s1
,
347 EVFILT_READ
, EV_ADD
, 0, 0, NULL
);
351 n
= read(evt
[i
].ident
, buf
, recv_buflen
);
353 if (n
== 0 || errno
!= EAGAIN
)