2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/kthread.h>
20 #include <linux/proc_fs.h>
21 #include <linux/u64_stats_sync.h>
22 #include <linux/prefetch.h>
23 #include <linux/sched.h>
24 #include <linux/hrtimer.h>
25 #include <linux/jiffies.h>
26 #include <linux/kernel_stat.h>
27 #include <linux/interrupt.h>
29 #include "xt_engine.h"
31 #include "xt_fblock.h"
33 struct worker_engine __percpu
*engines
;
34 EXPORT_SYMBOL_GPL(engines
);
35 extern struct proc_dir_entry
*lana_proc_dir
;
37 void cleanup_worker_engines(void);
39 static inline int ppe_queues_have_load(struct worker_engine
*ppe
)
41 /* add new stuff here */
42 if (!skb_queue_empty(&ppe
->inqs
[TYPE_INGRESS
].queue
))
44 if (!skb_queue_empty(&ppe
->inqs
[TYPE_EGRESS
].queue
))
49 static inline int process_packet(struct sk_buff
*skb
, enum path_type dir
)
55 while ((cont
= read_next_idp_from_skb(skb
))) {
56 fb
= __search_fblock(cont
);
59 /* Called in rcu_read_lock context */
60 ret
= fb
->netfb_rx(fb
, skb
, &dir
);
62 if (ret
== PPE_DROPPED
)
63 /* fblock freed skb */
70 static int engine_thread(void *arg
)
72 int ret
, queue
, need_lock
= 0;
74 unsigned long cpu
= smp_processor_id();
75 struct worker_engine
*ppe
= per_cpu_ptr(engines
, cpu
);
77 panic("[lana] Engine scheduled on wrong CPU!\n");
78 printk(KERN_INFO
"[lana] Packet Processing Engine running "
80 if (!rcu_read_lock_held())
82 set_current_state(TASK_INTERRUPTIBLE
);
83 while (likely(!kthread_should_stop())) {
85 if ((queue
= ppe_queues_have_load(ppe
)) < 0) {
86 preempt_enable_no_resched();
90 __set_current_state(TASK_RUNNING
);
91 while ((skb
= skb_dequeue(&ppe
->inqs
[queue
].queue
)) != NULL
) {
92 if (unlikely(skb_is_time_marked_first(skb
)))
93 ppe
->timef
= ktime_get();
96 ret
= process_packet(skb
, ppe
->inqs
[queue
].type
);
99 if (unlikely(skb_is_time_marked_last(skb
)))
100 ppe
->timel
= ktime_get();
102 u64_stats_update_begin(&ppe
->inqs
[queue
].stats
.syncp
);
103 ppe
->inqs
[queue
].stats
.packets
++;
104 ppe
->inqs
[queue
].stats
.bytes
+= skb
->len
;
105 if (ret
== PPE_DROPPED
)
106 ppe
->inqs
[queue
].stats
.dropped
++;
107 else if (unlikely(ret
== PPE_ERROR
)) {
108 ppe
->inqs
[queue
].stats
.errors
++;
111 u64_stats_update_end(&ppe
->inqs
[queue
].stats
.syncp
);
112 preempt_enable_no_resched();
117 set_current_state(TASK_INTERRUPTIBLE
);
119 __set_current_state(TASK_RUNNING
);
120 printk(KERN_INFO
"[lana] Packet Processing Engine stopped "
121 "on CPU%u!\n", smp_processor_id());
125 static int engine_procfs_stats(char *page
, char **start
, off_t offset
,
126 int count
, int *eof
, void *data
)
130 struct worker_engine
*ppe
= data
;
133 len
+= sprintf(page
+ len
, "engine: %p\n", ppe
);
134 len
+= sprintf(page
+ len
, "cpu: %u, numa node: %d\n",
135 ppe
->cpu
, cpu_to_node(ppe
->cpu
));
136 len
+= sprintf(page
+ len
, "hrt: %llu us\n",
137 ktime_us_delta(ppe
->timel
, ppe
->timef
));
138 for (i
= 0; i
< NUM_TYPES
; ++i
) {
140 sstart
= u64_stats_fetch_begin(&ppe
->inqs
[i
].stats
.syncp
);
141 len
+= sprintf(page
+ len
, "queue: %p\n", &ppe
->inqs
[i
]);
142 len
+= sprintf(page
+ len
, " type: %u\n",
144 len
+= sprintf(page
+ len
, " packets: %llu\n",
145 ppe
->inqs
[i
].stats
.packets
);
146 len
+= sprintf(page
+ len
, " bytes: %llu\n",
147 ppe
->inqs
[i
].stats
.bytes
);
148 len
+= sprintf(page
+ len
, " errors: %u\n",
149 ppe
->inqs
[i
].stats
.errors
);
150 len
+= sprintf(page
+ len
, " drops: %llu\n",
151 ppe
->inqs
[i
].stats
.dropped
);
152 } while (u64_stats_fetch_retry(&ppe
->inqs
[i
].stats
.syncp
, sstart
));
154 /* FIXME: fits in page? */
159 static enum hrtimer_restart
engine_timer_handler(struct hrtimer
*self
)
161 struct tasklet_hrtimer
*thr
= container_of(self
, struct tasklet_hrtimer
, timer
);
162 struct worker_engine
*ppe
= container_of(thr
, struct worker_engine
, htimer
);
163 if (ppe
->thread
->state
!= TASK_RUNNING
)
164 wake_up_process(ppe
->thread
);
165 ppe
->ppe_timer_set
= 0;
167 return HRTIMER_NORESTART
;
170 int init_worker_engines(void)
176 engines
= alloc_percpu(struct worker_engine
);
181 for_each_online_cpu(cpu
) {
182 struct worker_engine
*ppe
;
184 if (cpu
== USERSPACECPU
)
186 #endif /* __MIGRATE */
187 ppe
= per_cpu_ptr(engines
, cpu
);
189 memset(&ppe
->inqs
, 0, sizeof(ppe
->inqs
));
190 for (i
= 0; i
< NUM_QUEUES
; ++i
)
191 skb_queue_head_init(&ppe
->inqs
[i
].queue
);
192 memset(name
, 0, sizeof(name
));
193 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
194 ppe
->proc
= create_proc_read_entry(name
, 0400, lana_proc_dir
,
195 engine_procfs_stats
, ppe
);
201 ppe
->ppe_timer_set
= 0;
202 ppe
->thread
= kthread_create_on_node(engine_thread
, NULL
,
203 cpu_to_node(cpu
), name
);
204 if (IS_ERR(ppe
->thread
)) {
205 printk(KERN_ERR
"[lana] Error creationg thread on "
210 kthread_bind(ppe
->thread
, cpu
);
211 wake_up_process(ppe
->thread
);
212 tasklet_hrtimer_init(&ppe
->htimer
, engine_timer_handler
,
213 CLOCK_REALTIME
, HRTIMER_MODE_ABS
);
218 cleanup_worker_engines();
221 EXPORT_SYMBOL_GPL(init_worker_engines
);
223 void cleanup_worker_engines(void)
229 for_each_online_cpu(cpu
) {
230 struct worker_engine
*ppe
;
232 if (cpu
== USERSPACECPU
)
234 #endif /* __MIGRATE */
235 memset(name
, 0, sizeof(name
));
236 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
237 ppe
= per_cpu_ptr(engines
, cpu
);
238 if (!IS_ERR(ppe
->thread
)) {
239 tasklet_hrtimer_cancel(&ppe
->htimer
);
240 kthread_stop(ppe
->thread
);
243 remove_proc_entry(name
, lana_proc_dir
);
246 free_percpu(engines
);
248 EXPORT_SYMBOL_GPL(cleanup_worker_engines
);