scripts: moved important shell / perl scripts to scripts/ instead of contrib
[netsniff-ng.git] / contrib / patches / trafgen-multithreaded.patch
blobbfcb88f176830fc75a8e894456e8cf96d7e091c3
1 diff --git a/src/trafgen.c b/src/trafgen.c
2 index 2980cff..04b30d0 100644
3 --- a/src/trafgen.c
4 +++ b/src/trafgen.c
5 @@ -20,6 +20,7 @@
6 * Chapter 'The Stairs of Cirith Ungol'.
7 */
9 +#define _GNU_SOURCE
10 #include <stdio.h>
11 #include <string.h>
12 #include <getopt.h>
13 @@ -35,6 +36,7 @@
14 #include <assert.h>
15 #include <fcntl.h>
16 #include <time.h>
17 +#include <pthread.h>
18 #include <net/ethernet.h>
20 #include "xmalloc.h"
21 @@ -88,7 +90,7 @@ struct stats {
24 struct mode {
25 - struct stats stats;
26 + struct stats stats; /* for: tgap */
27 char *device;
28 int cpu;
29 int rand;
30 @@ -101,10 +103,25 @@ struct mode {
31 #define CPU_UNKNOWN -1
32 #define CPU_NOTOUCH -2
34 +struct worker_struct {
35 + pthread_t trid;
36 + unsigned int cpu;
37 + int sock;
38 + struct itimerval itimer;
39 + unsigned long interval;
40 + struct pktconf *cfg;
41 + struct mode *mode;
42 + struct stats stats;
43 +};
45 sig_atomic_t sigint = 0;
47 static const char *short_options = "d:c:n:t:vJhS:HQb:B:rk:";
49 +static struct worker_struct *threadpool;
51 +static unsigned int cpus __read_mostly = 0;
53 static struct option long_options[] = {
54 {"dev", required_argument, 0, 'd'},
55 {"conf", required_argument, 0, 'c'},
56 @@ -124,8 +141,7 @@ static struct option long_options[] = {
59 static struct itimerval itimer;
60 -static int sock;
61 -static unsigned long interval = TX_KERNEL_PULL_INT;
62 +static unsigned long interval __read_mostly = TX_KERNEL_PULL_INT;
64 static inline uint8_t lcrand(uint8_t val)
66 @@ -147,12 +163,16 @@ static void signal_handler(int number)
68 static void timer_elapsed(int number)
70 + int i;
71 itimer.it_interval.tv_sec = 0;
72 itimer.it_interval.tv_usec = interval;
73 itimer.it_value.tv_sec = 0;
74 itimer.it_value.tv_usec = interval;
76 - pull_and_flush_tx_ring(sock);
77 + for (i = 0; i < cpus; ++i) {
78 + pull_and_flush_tx_ring(threadpool[i].sock);
79 + }
81 setitimer(ITIMER_REAL, &itimer, NULL);
84 @@ -235,7 +255,7 @@ static void version(void)
86 static void tx_tgap_or_die(struct mode *mode, struct pktconf *cfg)
88 - int ifindex, mtu, ret;
89 + int ifindex, mtu, ret, sock;
90 size_t l, c, r;
91 struct sockaddr_ll s_addr;
92 unsigned long num = 1;
93 @@ -323,7 +343,7 @@ static void tx_tgap_or_die(struct mode *mode, struct pktconf *cfg)
94 printf("\r%lu bytes outgoing\n", mode->stats.tx_bytes);
97 -static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
98 +static void *tx_fire_or_die(void *tr_self)
100 int irq, ifindex, mtu;
101 unsigned int size, it = 0;
102 @@ -334,59 +354,68 @@ static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
103 struct frame_map *hdr;
104 struct counter *cnt;
105 struct randomizer *rnd;
106 + struct worker_struct *tr = tr_self;
108 - if (!mode || !cfg)
109 + if (!tr->mode || !tr->cfg)
110 panic("Panic over invalid args for TX trigger!\n");
111 - if (cfg->len == 0)
112 + if (tr->cfg->len == 0)
113 panic("Panic over invalid args for TX trigger!\n");
114 - if (!device_up_and_running(mode->device))
115 + if (!device_up_and_running(tr->mode->device))
116 panic("Device not up and running!\n");
118 - mtu = device_mtu(mode->device);
119 - for (l = 0; l < cfg->len; ++l) {
120 + mtu = device_mtu(tr->mode->device);
121 + for (l = 0; l < tr->cfg->len; ++l) {
122 /* eth src + eth dst + type == 14, fcs added by driver */
123 - if (cfg->pkts[l].plen > mtu + 14)
124 + if (tr->cfg->pkts[l].plen > mtu + 14)
125 panic("Device MTU < than your packet size!\n");
126 - if (cfg->pkts[l].plen <= 14)
127 + if (tr->cfg->pkts[l].plen <= 14)
128 panic("Device packet size too short!\n");
131 set_memcpy();
132 - sock = pf_socket();
133 + tr->sock = pf_socket();
135 memset(&tx_ring, 0, sizeof(tx_ring));
137 - ifindex = device_ifindex(mode->device);
138 - size = ring_size(mode->device, mode->reserve_size);
139 + ifindex = device_ifindex(tr->mode->device);
140 + size = ring_size(tr->mode->device, tr->mode->reserve_size);
142 - set_packet_loss_discard(sock);
143 - setup_tx_ring_layout(sock, &tx_ring, size, mode->jumbo_support);
144 - create_tx_ring(sock, &tx_ring);
145 - mmap_tx_ring(sock, &tx_ring);
146 + set_packet_loss_discard(tr->sock);
147 + setup_tx_ring_layout(tr->sock, &tx_ring, size, tr->mode->jumbo_support);
148 + create_tx_ring(tr->sock, &tx_ring);
149 + mmap_tx_ring(tr->sock, &tx_ring);
150 alloc_tx_ring_frames(&tx_ring);
151 - bind_tx_ring(sock, &tx_ring, ifindex);
152 + bind_tx_ring(tr->sock, &tx_ring, ifindex);
153 mt_init_by_seed_time();
155 - if (mode->cpu >= 0 && ifindex > 0) {
156 - irq = device_irq_number(mode->device);
157 - device_bind_irq_to_cpu(mode->cpu, irq);
158 - printf("IRQ: %s:%d > CPU%d\n", mode->device, irq,
159 - mode->cpu);
160 + if (tr->cpu == 0 && ifindex > 0) {
161 + irq = device_irq_number(tr->mode->device);
162 + device_bind_irq_to_cpu(tr->cpu, irq);
163 + printf("IRQ: %s:%d > CPU%d\n", tr->mode->device, irq,
164 + tr->cpu);
167 - if (mode->kpull)
168 - interval = mode->kpull;
169 - if (cfg->num > 0)
170 - num = cfg->num;
171 + assert(cpus > 0);
172 + if (tr->cpu == 0 && tr->mode->kpull)
173 + interval = tr->mode->kpull;
174 + if (tr->cfg->num > 0) {
175 + num = tr->cfg->num / cpus;
176 + if (tr->cfg->num % cpus > 0 && tr->cpu == 0) {
177 + num += tr->cfg->num % cpus;
181 - printf("MD: FIRE %s %luus\n\n", mode->rand ? "RND" : "RR", interval);
182 - printf("Running! Hang up with ^C!\n\n");
183 + if (tr->cpu == 0) {
184 + printf("MD: FIRE %s %luus\n\n", tr->mode->rand ?
185 + "RND" : "RR", interval);
186 + printf("Running! Hang up with ^C!\n\n");
188 - itimer.it_interval.tv_sec = 0;
189 - itimer.it_interval.tv_usec = interval;
190 - itimer.it_value.tv_sec = 0;
191 - itimer.it_value.tv_usec = interval;
192 - setitimer(ITIMER_REAL, &itimer, NULL);
193 + itimer.it_interval.tv_sec = 0;
194 + itimer.it_interval.tv_usec = interval;
195 + itimer.it_value.tv_sec = 0;
196 + itimer.it_value.tv_usec = interval;
197 + setitimer(ITIMER_REAL, &itimer, NULL);
200 l = 0;
201 while (likely(sigint == 0) && likely(num > 0)) {
202 @@ -398,53 +427,56 @@ static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
203 out = ((uint8_t *) hdr) + TPACKET_HDRLEN -
204 sizeof(struct sockaddr_ll);
206 - hdr->tp_h.tp_snaplen = cfg->pkts[l].plen;
207 - hdr->tp_h.tp_len = cfg->pkts[l].plen;
208 + hdr->tp_h.tp_snaplen = tr->cfg->pkts[l].plen;
209 + hdr->tp_h.tp_len = tr->cfg->pkts[l].plen;
211 - for (c = 0; c < cfg->pkts[l].clen; ++c) {
212 - cnt = &(cfg->pkts[l].cnt[c]);
213 + for (c = 0; c < tr->cfg->pkts[l].clen; ++c) {
214 + cnt = &(tr->cfg->pkts[l].cnt[c]);
215 cnt->val -= cnt->min;
216 cnt->val = (cnt->val + cnt->inc) %
217 (cnt->max - cnt->min + 1);
218 cnt->val += cnt->min;
219 - cfg->pkts[l].payload[cnt->off] = cnt->val;
220 + tr->cfg->pkts[l].payload[cnt->off] = cnt->val;
223 - for (r = 0; r < cfg->pkts[l].rlen; ++r) {
224 - rnd = &(cfg->pkts[l].rnd[r]);
225 + for (r = 0; r < tr->cfg->pkts[l].rlen; ++r) {
226 + rnd = &(tr->cfg->pkts[l].rnd[r]);
227 rnd->val = lcrand(rnd->val);
228 - cfg->pkts[l].payload[rnd->off] = rnd->val;
229 + tr->cfg->pkts[l].payload[rnd->off] = rnd->val;
232 - __memcpy(out, cfg->pkts[l].payload, cfg->pkts[l].plen);
233 - mode->stats.tx_bytes += cfg->pkts[l].plen;
234 - mode->stats.tx_packets++;
235 + __memcpy(out, tr->cfg->pkts[l].payload,
236 + tr->cfg->pkts[l].plen);
237 + tr->stats.tx_bytes += tr->cfg->pkts[l].plen;
238 + tr->stats.tx_packets++;
240 - if (mode->rand)
241 - l = mt_rand_int32() % cfg->len;
242 + if (tr->mode->rand)
243 + l = mt_rand_int32() % tr->cfg->len;
244 else {
245 l++;
246 - if (l >= cfg->len)
247 + if (l >= tr->cfg->len)
248 l = 0;
251 kernel_may_pull_from_tx(&hdr->tp_h);
252 next_slot(&it, &tx_ring);
254 - if (cfg->num > 0)
255 + if (tr->cfg->num > 0)
256 num--;
257 if (unlikely(sigint == 1))
258 break;
262 - destroy_tx_ring(sock, &tx_ring);
263 - close(sock);
264 + destroy_tx_ring(tr->sock, &tx_ring);
265 + close(tr->sock);
267 fflush(stdout);
268 printf("\n");
269 - printf("\r%lu frames outgoing\n", mode->stats.tx_packets);
270 - printf("\r%lu bytes outgoing\n", mode->stats.tx_bytes);
271 + printf("\r%lu frames outgoing on CPU%d\n", tr->stats.tx_packets, tr->cpu);
272 + printf("\r%lu bytes outgoing on CPU%d\n", tr->stats.tx_bytes, tr->cpu);
273 + fflush(stdout);
274 + pthread_exit(0);
277 #define TYPE_NUM 0
278 @@ -659,6 +691,75 @@ static void cleanup_cfg(struct pktconf *cfg)
279 xfree(cfg->pkts);
282 +static struct pktconf *clone_cfg(struct pktconf *orig)
284 + int i;
285 + struct pktconf *ret = xzmalloc(sizeof(*ret));
286 + ret->num = orig->num;
287 + ret->gap = orig->gap;
288 + ret->len = orig->len;
289 + ret->pkts = xzmalloc(sizeof(struct packet) * orig->len);
290 + for (i = 0; i < ret->len; ++i) {
291 + ret->pkts[i].plen = orig->pkts[i].plen;
292 + ret->pkts[i].clen = orig->pkts[i].clen;
293 + ret->pkts[i].rlen = orig->pkts[i].rlen;
294 + ret->pkts[i].payload = xzmalloc(ret->pkts[i].plen);
295 + if (orig->pkts[i].clen > 0) {
296 + ret->pkts[i].cnt = xzmalloc(sizeof(struct counter) *
297 + orig->pkts[i].clen);
298 + memcpy(ret->pkts[i].cnt, orig->pkts[i].cnt,
299 + sizeof(struct counter) * orig->pkts[i].clen);
301 + if (orig->pkts[i].rlen > 0) {
302 + ret->pkts[i].rnd = xzmalloc(sizeof(struct randomizer) *
303 + orig->pkts[i].rlen);
304 + memcpy(ret->pkts[i].rnd, orig->pkts[i].rnd,
305 + sizeof(struct randomizer) * orig->pkts[i].rlen);
308 + return ret;
311 +static void thread_spawn_or_panic(unsigned int cpus, struct mode *mode,
312 + struct pktconf *cfg)
314 + int i, ret;
315 + cpu_set_t cpuset;
317 + for (i = 0; i < cpus; ++i) {
318 + CPU_ZERO(&cpuset);
319 + threadpool[i].cpu = i % cpus;
320 + CPU_SET(threadpool[i].cpu, &cpuset);
321 + threadpool[i].interval = TX_KERNEL_PULL_INT;
322 + threadpool[i].cfg = clone_cfg(cfg);
323 + threadpool[i].mode = mode;
324 + threadpool[i].sock = 0;
326 + ret = pthread_create(&threadpool[i].trid, NULL,
327 + tx_fire_or_die, &threadpool[i]);
328 + if (ret < 0)
329 + panic("Thread creation failed!\n");
330 + ret = pthread_setaffinity_np(threadpool[i].trid,
331 + sizeof(cpuset), &cpuset);
332 + if (ret < 0)
333 + panic("Thread CPU migration failed!\n");
334 + pthread_detach(threadpool[i].trid);
335 + info("Thread on CPU%d up and running!\n", threadpool[i].cpu);
337 + sleep(1);
340 +static void thread_finish(unsigned int cpus)
342 + int i;
343 + for (i = 0; i < cpus; ++i) {
344 + while (pthread_join(threadpool[i].trid, NULL) < 0)
346 + barrier();
347 + cleanup_cfg(threadpool[i].cfg);
351 static int main_loop(struct mode *mode, char *confname, unsigned long pkts,
352 unsigned long gap)
354 @@ -667,12 +768,20 @@ static int main_loop(struct mode *mode, char *confname, unsigned long pkts,
355 .gap = gap,
356 .len = 0,
358 + cpus = get_number_cpus_online();
360 parse_conf_or_die(confname, &cfg);
361 if (gap > 0)
362 tx_tgap_or_die(mode, &cfg);
363 - else
364 - tx_fire_or_die(mode, &cfg);
365 + else {
366 + threadpool = xzmalloc(sizeof(*threadpool) * cpus);
367 + thread_spawn_or_panic(cpus, mode, &cfg);
368 + while (!sigint)
369 + sleep(1);
370 + thread_finish(cpus);
371 + barrier();
372 + xfree(threadpool);
374 cleanup_cfg(&cfg);
376 return 0;
377 diff --git a/src/trafgen/Makefile b/src/trafgen/Makefile
378 index ef1f1c6..151365a 100644
379 --- a/src/trafgen/Makefile
380 +++ b/src/trafgen/Makefile
381 @@ -7,7 +7,7 @@ include ../definitions.mk
383 INCLUDE = -I..
384 CFLAGS +=
385 -LIBS =
386 +LIBS = -lpthread
388 core-objs = trafgen.o
389 lib-objs = xmalloc.o \