1 diff --git a/src/trafgen.c b/src/trafgen.c
2 index 2980cff..04b30d0 100644
6 * Chapter 'The Stairs of Cirith Ungol'.
18 #include <net/ethernet.h>
21 @@ -88,7 +90,7 @@ struct stats {
26 + struct stats stats; /* for: tgap */
30 @@ -101,10 +103,25 @@ struct mode {
31 #define CPU_UNKNOWN -1
32 #define CPU_NOTOUCH -2
34 +struct worker_struct {
38 + struct itimerval itimer;
39 + unsigned long interval;
40 + struct pktconf *cfg;
45 sig_atomic_t sigint = 0;
47 static const char *short_options = "d:c:n:t:vJhS:HQb:B:rk:";
49 +static struct worker_struct *threadpool;
51 +static unsigned int cpus __read_mostly = 0;
53 static struct option long_options[] = {
54 {"dev", required_argument, 0, 'd'},
55 {"conf", required_argument, 0, 'c'},
56 @@ -124,8 +141,7 @@ static struct option long_options[] = {
59 static struct itimerval itimer;
61 -static unsigned long interval = TX_KERNEL_PULL_INT;
62 +static unsigned long interval __read_mostly = TX_KERNEL_PULL_INT;
64 static inline uint8_t lcrand(uint8_t val)
66 @@ -147,12 +163,16 @@ static void signal_handler(int number)
68 static void timer_elapsed(int number)
71 itimer.it_interval.tv_sec = 0;
72 itimer.it_interval.tv_usec = interval;
73 itimer.it_value.tv_sec = 0;
74 itimer.it_value.tv_usec = interval;
76 - pull_and_flush_tx_ring(sock);
77 + for (i = 0; i < cpus; ++i) {
78 + pull_and_flush_tx_ring(threadpool[i].sock);
81 setitimer(ITIMER_REAL, &itimer, NULL);
84 @@ -235,7 +255,7 @@ static void version(void)
86 static void tx_tgap_or_die(struct mode *mode, struct pktconf *cfg)
88 - int ifindex, mtu, ret;
89 + int ifindex, mtu, ret, sock;
91 struct sockaddr_ll s_addr;
92 unsigned long num = 1;
93 @@ -323,7 +343,7 @@ static void tx_tgap_or_die(struct mode *mode, struct pktconf *cfg)
94 printf("\r%lu bytes outgoing\n", mode->stats.tx_bytes);
97 -static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
98 +static void *tx_fire_or_die(void *tr_self)
100 int irq, ifindex, mtu;
101 unsigned int size, it = 0;
102 @@ -334,59 +354,68 @@ static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
103 struct frame_map *hdr;
105 struct randomizer *rnd;
106 + struct worker_struct *tr = tr_self;
109 + if (!tr->mode || !tr->cfg)
110 panic("Panic over invalid args for TX trigger!\n");
112 + if (tr->cfg->len == 0)
113 panic("Panic over invalid args for TX trigger!\n");
114 - if (!device_up_and_running(mode->device))
115 + if (!device_up_and_running(tr->mode->device))
116 panic("Device not up and running!\n");
118 - mtu = device_mtu(mode->device);
119 - for (l = 0; l < cfg->len; ++l) {
120 + mtu = device_mtu(tr->mode->device);
121 + for (l = 0; l < tr->cfg->len; ++l) {
122 /* eth src + eth dst + type == 14, fcs added by driver */
123 - if (cfg->pkts[l].plen > mtu + 14)
124 + if (tr->cfg->pkts[l].plen > mtu + 14)
125 panic("Device MTU < than your packet size!\n");
126 - if (cfg->pkts[l].plen <= 14)
127 + if (tr->cfg->pkts[l].plen <= 14)
128 panic("Device packet size too short!\n");
132 - sock = pf_socket();
133 + tr->sock = pf_socket();
135 memset(&tx_ring, 0, sizeof(tx_ring));
137 - ifindex = device_ifindex(mode->device);
138 - size = ring_size(mode->device, mode->reserve_size);
139 + ifindex = device_ifindex(tr->mode->device);
140 + size = ring_size(tr->mode->device, tr->mode->reserve_size);
142 - set_packet_loss_discard(sock);
143 - setup_tx_ring_layout(sock, &tx_ring, size, mode->jumbo_support);
144 - create_tx_ring(sock, &tx_ring);
145 - mmap_tx_ring(sock, &tx_ring);
146 + set_packet_loss_discard(tr->sock);
147 + setup_tx_ring_layout(tr->sock, &tx_ring, size, tr->mode->jumbo_support);
148 + create_tx_ring(tr->sock, &tx_ring);
149 + mmap_tx_ring(tr->sock, &tx_ring);
150 alloc_tx_ring_frames(&tx_ring);
151 - bind_tx_ring(sock, &tx_ring, ifindex);
152 + bind_tx_ring(tr->sock, &tx_ring, ifindex);
153 mt_init_by_seed_time();
155 - if (mode->cpu >= 0 && ifindex > 0) {
156 - irq = device_irq_number(mode->device);
157 - device_bind_irq_to_cpu(mode->cpu, irq);
158 - printf("IRQ: %s:%d > CPU%d\n", mode->device, irq,
160 + if (tr->cpu == 0 && ifindex > 0) {
161 + irq = device_irq_number(tr->mode->device);
162 + device_bind_irq_to_cpu(tr->cpu, irq);
163 + printf("IRQ: %s:%d > CPU%d\n", tr->mode->device, irq,
168 - interval = mode->kpull;
172 + if (tr->cpu == 0 && tr->mode->kpull)
173 + interval = tr->mode->kpull;
174 + if (tr->cfg->num > 0) {
175 + num = tr->cfg->num / cpus;
176 + if (tr->cfg->num % cpus > 0 && tr->cpu == 0) {
177 + num += tr->cfg->num % cpus;
181 - printf("MD: FIRE %s %luus\n\n", mode->rand ? "RND" : "RR", interval);
182 - printf("Running! Hang up with ^C!\n\n");
183 + if (tr->cpu == 0) {
184 + printf("MD: FIRE %s %luus\n\n", tr->mode->rand ?
185 + "RND" : "RR", interval);
186 + printf("Running! Hang up with ^C!\n\n");
188 - itimer.it_interval.tv_sec = 0;
189 - itimer.it_interval.tv_usec = interval;
190 - itimer.it_value.tv_sec = 0;
191 - itimer.it_value.tv_usec = interval;
192 - setitimer(ITIMER_REAL, &itimer, NULL);
193 + itimer.it_interval.tv_sec = 0;
194 + itimer.it_interval.tv_usec = interval;
195 + itimer.it_value.tv_sec = 0;
196 + itimer.it_value.tv_usec = interval;
197 + setitimer(ITIMER_REAL, &itimer, NULL);
201 while (likely(sigint == 0) && likely(num > 0)) {
202 @@ -398,53 +427,56 @@ static void tx_fire_or_die(struct mode *mode, struct pktconf *cfg)
203 out = ((uint8_t *) hdr) + TPACKET_HDRLEN -
204 sizeof(struct sockaddr_ll);
206 - hdr->tp_h.tp_snaplen = cfg->pkts[l].plen;
207 - hdr->tp_h.tp_len = cfg->pkts[l].plen;
208 + hdr->tp_h.tp_snaplen = tr->cfg->pkts[l].plen;
209 + hdr->tp_h.tp_len = tr->cfg->pkts[l].plen;
211 - for (c = 0; c < cfg->pkts[l].clen; ++c) {
212 - cnt = &(cfg->pkts[l].cnt[c]);
213 + for (c = 0; c < tr->cfg->pkts[l].clen; ++c) {
214 + cnt = &(tr->cfg->pkts[l].cnt[c]);
215 cnt->val -= cnt->min;
216 cnt->val = (cnt->val + cnt->inc) %
217 (cnt->max - cnt->min + 1);
218 cnt->val += cnt->min;
219 - cfg->pkts[l].payload[cnt->off] = cnt->val;
220 + tr->cfg->pkts[l].payload[cnt->off] = cnt->val;
223 - for (r = 0; r < cfg->pkts[l].rlen; ++r) {
224 - rnd = &(cfg->pkts[l].rnd[r]);
225 + for (r = 0; r < tr->cfg->pkts[l].rlen; ++r) {
226 + rnd = &(tr->cfg->pkts[l].rnd[r]);
227 rnd->val = lcrand(rnd->val);
228 - cfg->pkts[l].payload[rnd->off] = rnd->val;
229 + tr->cfg->pkts[l].payload[rnd->off] = rnd->val;
232 - __memcpy(out, cfg->pkts[l].payload, cfg->pkts[l].plen);
233 - mode->stats.tx_bytes += cfg->pkts[l].plen;
234 - mode->stats.tx_packets++;
235 + __memcpy(out, tr->cfg->pkts[l].payload,
236 + tr->cfg->pkts[l].plen);
237 + tr->stats.tx_bytes += tr->cfg->pkts[l].plen;
238 + tr->stats.tx_packets++;
241 - l = mt_rand_int32() % cfg->len;
242 + if (tr->mode->rand)
243 + l = mt_rand_int32() % tr->cfg->len;
247 + if (l >= tr->cfg->len)
251 kernel_may_pull_from_tx(&hdr->tp_h);
252 next_slot(&it, &tx_ring);
255 + if (tr->cfg->num > 0)
257 if (unlikely(sigint == 1))
262 - destroy_tx_ring(sock, &tx_ring);
264 + destroy_tx_ring(tr->sock, &tx_ring);
269 - printf("\r%lu frames outgoing\n", mode->stats.tx_packets);
270 - printf("\r%lu bytes outgoing\n", mode->stats.tx_bytes);
271 + printf("\r%lu frames outgoing on CPU%d\n", tr->stats.tx_packets, tr->cpu);
272 + printf("\r%lu bytes outgoing on CPU%d\n", tr->stats.tx_bytes, tr->cpu);
278 @@ -659,6 +691,75 @@ static void cleanup_cfg(struct pktconf *cfg)
282 +static struct pktconf *clone_cfg(struct pktconf *orig)
285 + struct pktconf *ret = xzmalloc(sizeof(*ret));
286 + ret->num = orig->num;
287 + ret->gap = orig->gap;
288 + ret->len = orig->len;
289 + ret->pkts = xzmalloc(sizeof(struct packet) * orig->len);
290 + for (i = 0; i < ret->len; ++i) {
291 + ret->pkts[i].plen = orig->pkts[i].plen;
292 + ret->pkts[i].clen = orig->pkts[i].clen;
293 + ret->pkts[i].rlen = orig->pkts[i].rlen;
294 + ret->pkts[i].payload = xzmalloc(ret->pkts[i].plen);
295 + if (orig->pkts[i].clen > 0) {
296 + ret->pkts[i].cnt = xzmalloc(sizeof(struct counter) *
297 + orig->pkts[i].clen);
298 + memcpy(ret->pkts[i].cnt, orig->pkts[i].cnt,
299 + sizeof(struct counter) * orig->pkts[i].clen);
301 + if (orig->pkts[i].rlen > 0) {
302 + ret->pkts[i].rnd = xzmalloc(sizeof(struct randomizer) *
303 + orig->pkts[i].rlen);
304 + memcpy(ret->pkts[i].rnd, orig->pkts[i].rnd,
305 + sizeof(struct randomizer) * orig->pkts[i].rlen);
311 +static void thread_spawn_or_panic(unsigned int cpus, struct mode *mode,
312 + struct pktconf *cfg)
317 + for (i = 0; i < cpus; ++i) {
319 + threadpool[i].cpu = i % cpus;
320 + CPU_SET(threadpool[i].cpu, &cpuset);
321 + threadpool[i].interval = TX_KERNEL_PULL_INT;
322 + threadpool[i].cfg = clone_cfg(cfg);
323 + threadpool[i].mode = mode;
324 + threadpool[i].sock = 0;
326 + ret = pthread_create(&threadpool[i].trid, NULL,
327 + tx_fire_or_die, &threadpool[i]);
329 + panic("Thread creation failed!\n");
330 + ret = pthread_setaffinity_np(threadpool[i].trid,
331 + sizeof(cpuset), &cpuset);
333 + panic("Thread CPU migration failed!\n");
334 + pthread_detach(threadpool[i].trid);
335 + info("Thread on CPU%d up and running!\n", threadpool[i].cpu);
340 +static void thread_finish(unsigned int cpus)
343 + for (i = 0; i < cpus; ++i) {
344 + while (pthread_join(threadpool[i].trid, NULL) < 0)
347 + cleanup_cfg(threadpool[i].cfg);
351 static int main_loop(struct mode *mode, char *confname, unsigned long pkts,
354 @@ -667,12 +768,20 @@ static int main_loop(struct mode *mode, char *confname, unsigned long pkts,
358 + cpus = get_number_cpus_online();
360 parse_conf_or_die(confname, &cfg);
362 tx_tgap_or_die(mode, &cfg);
364 - tx_fire_or_die(mode, &cfg);
366 + threadpool = xzmalloc(sizeof(*threadpool) * cpus);
367 + thread_spawn_or_panic(cpus, mode, &cfg);
370 + thread_finish(cpus);
377 diff --git a/src/trafgen/Makefile b/src/trafgen/Makefile
378 index ef1f1c6..151365a 100644
379 --- a/src/trafgen/Makefile
380 +++ b/src/trafgen/Makefile
381 @@ -7,7 +7,7 @@ include ../definitions.mk
388 core-objs = trafgen.o
389 lib-objs = xmalloc.o \