2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/wait.h>
20 #include <linux/kthread.h>
21 #include <linux/proc_fs.h>
22 #include <linux/u64_stats_sync.h>
23 #include <linux/prefetch.h>
24 #include <linux/sched.h>
25 #include <linux/hrtimer.h>
27 #include "xt_engine.h"
29 #include "xt_fblock.h"
31 struct worker_engine __percpu
*engines
;
32 EXPORT_SYMBOL_GPL(engines
);
33 extern struct proc_dir_entry
*lana_proc_dir
;
35 void cleanup_worker_engines(void);
37 static inline struct ppe_queue
*first_ppe_queue(struct worker_engine
*ppe
)
39 return ppe
->inqs
.head
;
42 static inline struct ppe_queue
*next_filled_ppe_queue(struct ppe_queue
*ppeq
)
46 if (!skb_queue_empty(&ppeq
->queue
))
48 if (!skb_queue_empty(&ppeq
->next
->queue
))
50 ppeq
= ppeq
->next
->next
;
54 static inline int ppe_queues_have_load(struct worker_engine
*ppe
)
56 /* add new stuff here */
57 if (!skb_queue_empty(&ppe
->inqs
.ptrs
[TYPE_INGRESS
]->queue
))
59 if (!skb_queue_empty(&ppe
->inqs
.ptrs
[TYPE_EGRESS
]->queue
))
64 static inline int process_packet(struct sk_buff
*skb
, enum path_type dir
)
66 int ret
= PPE_DROPPED
;
70 while ((cont
= read_next_idp_from_skb(skb
))) {
71 fb
= __search_fblock(cont
);
74 /* Called in rcu_read_lock context */
75 ret
= fb
->ops
->netfb_rx(fb
, skb
, &dir
);
77 if (ret
== PPE_DROPPED
)
84 static int engine_thread(void *arg
)
86 int ret
, need_lock
= 0;
88 struct ppe_queue
*ppeq
;
89 struct worker_engine
*ppe
= per_cpu_ptr(engines
,
92 if (ppe
->cpu
!= smp_processor_id())
93 panic("[lana] Engine scheduled on wrong CPU!\n");
94 printk(KERN_INFO
"[lana] Packet Processing Engine running "
95 "on CPU%u!\n", smp_processor_id());
97 if (!rcu_read_lock_held())
99 ppeq
= first_ppe_queue(ppe
);
100 while (likely(!kthread_should_stop())) {
101 if (!ppe_queues_have_load(ppe
)) {
102 wait_event_interruptible_timeout(ppe
->wait_queue
,
103 (kthread_should_stop() ||
104 ppe_queues_have_load(ppe
)), 1);
108 ppeq
= next_filled_ppe_queue(ppeq
);
109 skb
= skb_dequeue(&ppeq
->queue
);
110 if (unlikely(skb_is_time_marked_first(skb
)))
111 ppe
->timef
= ktime_get();
114 ret
= process_packet(skb
, ppeq
->type
);
117 if (unlikely(skb_is_time_marked_last(skb
)))
118 ppe
->timel
= ktime_get();
120 u64_stats_update_begin(&ppeq
->stats
.syncp
);
121 ppeq
->stats
.packets
++;
122 ppeq
->stats
.bytes
+= skb
->len
;
123 if (ret
== PPE_DROPPED
)
124 ppeq
->stats
.dropped
++;
125 else if (unlikely(ret
== PPE_ERROR
))
126 ppeq
->stats
.errors
++;
127 u64_stats_update_end(&ppeq
->stats
.syncp
);
130 printk(KERN_INFO
"[lana] Packet Processing Engine stopped "
131 "on CPU%u!\n", smp_processor_id());
135 static int engine_procfs_stats(char *page
, char **start
, off_t offset
,
136 int count
, int *eof
, void *data
)
140 struct worker_engine
*ppe
= data
;
143 len
+= sprintf(page
+ len
, "engine: %p\n", ppe
);
144 len
+= sprintf(page
+ len
, "cpu: %u, numa node: %d\n",
145 ppe
->cpu
, cpu_to_node(ppe
->cpu
));
146 len
+= sprintf(page
+ len
, "hrt: %llu us\n",
147 ktime_us_delta(ppe
->timel
, ppe
->timef
));
148 for (i
= 0; i
< NUM_TYPES
; ++i
) {
150 sstart
= u64_stats_fetch_begin(&ppe
->inqs
.ptrs
[i
]->stats
.syncp
);
151 len
+= sprintf(page
+ len
, "queue: %p\n",
153 len
+= sprintf(page
+ len
, " type: %u\n",
154 ppe
->inqs
.ptrs
[i
]->type
);
155 len
+= sprintf(page
+ len
, " packets: %llu\n",
156 ppe
->inqs
.ptrs
[i
]->stats
.packets
);
157 len
+= sprintf(page
+ len
, " bytes: %llu\n",
158 ppe
->inqs
.ptrs
[i
]->stats
.bytes
);
159 len
+= sprintf(page
+ len
, " errors: %u\n",
160 ppe
->inqs
.ptrs
[i
]->stats
.errors
);
161 len
+= sprintf(page
+ len
, " drops: %llu\n",
162 ppe
->inqs
.ptrs
[i
]->stats
.dropped
);
163 } while (u64_stats_fetch_retry(&ppe
->inqs
.ptrs
[i
]->stats
.syncp
, sstart
));
165 /* FIXME: fits in page? */
170 static inline void add_to_ppe_squeue(struct ppe_squeue
*qs
,
175 qs
->ptrs
[q
->type
] = q
;
178 static void finish_ppe_squeue(struct ppe_squeue
*qs
)
180 struct ppe_queue
*q
= qs
->head
;
186 static int init_ppe_squeue(struct ppe_squeue
*queues
, unsigned int cpu
)
189 struct ppe_queue
*tmp
;
191 for (i
= 0; i
< NUM_TYPES
; ++i
) {
192 tmp
= kzalloc_node(sizeof(*tmp
), GFP_KERNEL
,
196 tmp
->type
= (enum path_type
) i
;
198 skb_queue_head_init(&tmp
->queue
);
199 add_to_ppe_squeue(queues
, tmp
);
202 finish_ppe_squeue(queues
);
206 static void cleanup_ppe_squeue(struct ppe_squeue
*queues
)
210 for (i
= 0; i
< NUM_TYPES
; ++i
) {
212 kfree(queues
->ptrs
[i
]);
213 queues
->ptrs
[i
] = NULL
;
218 int init_worker_engines(void)
223 struct sched_param param
= { .sched_priority
= MAX_RT_PRIO
-1 };
225 engines
= alloc_percpu(struct worker_engine
);
230 for_each_online_cpu(cpu
) {
231 struct worker_engine
*ppe
;
232 ppe
= per_cpu_ptr(engines
, cpu
);
234 ppe
->inqs
.head
= NULL
;
235 memset(&ppe
->inqs
, 0, sizeof(ppe
->inqs
));
236 ret
= init_ppe_squeue(&ppe
->inqs
, ppe
->cpu
);
239 memset(name
, 0, sizeof(name
));
240 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
241 ppe
->proc
= create_proc_read_entry(name
, 0400, lana_proc_dir
,
242 engine_procfs_stats
, ppe
);
248 init_waitqueue_head(&ppe
->wait_queue
);
249 ppe
->thread
= kthread_create_on_node(engine_thread
, NULL
,
250 cpu_to_node(cpu
), name
);
251 if (IS_ERR(ppe
->thread
)) {
252 printk(KERN_ERR
"[lana] Error creationg thread on "
258 kthread_bind(ppe
->thread
, cpu
);
259 sched_setscheduler(ppe
->thread
, SCHED_FIFO
, ¶m
);
260 wake_up_process(ppe
->thread
);
265 cleanup_worker_engines();
268 EXPORT_SYMBOL_GPL(init_worker_engines
);
270 void cleanup_worker_engines(void)
276 for_each_online_cpu(cpu
) {
277 struct worker_engine
*ppe
;
278 memset(name
, 0, sizeof(name
));
279 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
280 ppe
= per_cpu_ptr(engines
, cpu
);
281 if (!IS_ERR(ppe
->thread
))
282 kthread_stop(ppe
->thread
);
284 remove_proc_entry(name
, lana_proc_dir
);
285 cleanup_ppe_squeue(&ppe
->inqs
);
288 free_percpu(engines
);
290 EXPORT_SYMBOL_GPL(cleanup_worker_engines
);