2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/wait.h>
20 #include <linux/kthread.h>
21 #include <linux/proc_fs.h>
22 #include <linux/u64_stats_sync.h>
23 #include <linux/prefetch.h>
24 #include <linux/sched.h>
26 #include "xt_engine.h"
28 #include "xt_fblock.h"
30 struct worker_engine __percpu
*engines
;
31 EXPORT_SYMBOL_GPL(engines
);
32 extern struct proc_dir_entry
*lana_proc_dir
;
34 void cleanup_worker_engines(void);
36 static inline struct ppe_queue
*first_ppe_queue(struct worker_engine
*ppe
)
38 return ppe
->inqs
.head
;
41 static inline struct ppe_queue
*next_filled_ppe_queue(struct ppe_queue
*ppeq
)
46 } while (skb_queue_empty(&ppeq
->queue
));
51 static inline int ppe_queues_have_load(struct worker_engine
*ppe
)
53 return atomic64_read(&ppe
->load
) != 0;
56 static inline void ppe_queues_reduce_load(struct worker_engine
*ppe
)
58 atomic64_dec(&ppe
->load
);
61 static int process_packet(struct sk_buff
*skb
, enum path_type dir
)
63 int ret
= PPE_DROPPED
;
67 while ((cont
= read_next_idp_from_skb(skb
))) {
68 fb
= search_fblock(cont
);
73 ret
= fb
->ops
->netfb_rx(fb
, skb
, &dir
);
75 if (ret
== PPE_DROPPED
)
82 static int engine_thread(void *arg
)
86 struct ppe_queue
*ppeq
;
87 struct worker_engine
*ppe
= per_cpu_ptr(engines
,
90 if (ppe
->cpu
!= smp_processor_id())
91 panic("[lana] Engine scheduled on wrong CPU!\n");
92 printk(KERN_INFO
"[lana] Packet Processing Engine running "
93 "on CPU%u!\n", smp_processor_id());
95 ppeq
= first_ppe_queue(ppe
);
97 wait_event_interruptible(ppe
->wait_queue
,
98 (kthread_should_stop() ||
99 ppe_queues_have_load(ppe
)));
100 if (unlikely(kthread_should_stop()))
103 ppeq
= next_filled_ppe_queue(ppeq
);
104 ppe_queues_reduce_load(ppe
);
105 skb
= skb_dequeue(&ppeq
->queue
);
106 ret
= process_packet(skb
, ppeq
->type
);
108 u64_stats_update_begin(&ppeq
->stats
.syncp
);
109 ppeq
->stats
.packets
++;
110 ppeq
->stats
.bytes
+= skb
->len
;
111 u64_stats_update_end(&ppeq
->stats
.syncp
);
112 if (unlikely(ret
== PPE_DROPPED
)) {
113 u64_stats_update_begin(&ppeq
->stats
.syncp
);
114 ppeq
->stats
.dropped
++;
115 u64_stats_update_end(&ppeq
->stats
.syncp
);
116 } else if (unlikely(ret
== PPE_ERROR
)) {
117 ppeq
->stats
.errors
++;
123 printk(KERN_INFO
"[lana] Packet Processing Engine stopped "
124 "on CPU%u!\n", smp_processor_id());
128 static int engine_procfs_stats(char *page
, char **start
, off_t offset
,
129 int count
, int *eof
, void *data
)
133 struct worker_engine
*ppe
= data
;
136 len
+= sprintf(page
+ len
, "engine: %p\n", ppe
);
137 len
+= sprintf(page
+ len
, "cpu: %u, numa node: %d\n",
138 ppe
->cpu
, cpu_to_node(ppe
->cpu
));
139 len
+= sprintf(page
+ len
, "load: %lld\n",
140 atomic64_read(&ppe
->load
));
141 for (i
= 0; i
< NUM_TYPES
; ++i
) {
143 sstart
= u64_stats_fetch_begin(&ppe
->inqs
.ptrs
[i
]->stats
.syncp
);
144 len
+= sprintf(page
+ len
, "queue: %p\n",
146 len
+= sprintf(page
+ len
, " type: %u\n",
147 ppe
->inqs
.ptrs
[i
]->type
);
148 len
+= sprintf(page
+ len
, " packets: %llu\n",
149 ppe
->inqs
.ptrs
[i
]->stats
.packets
);
150 len
+= sprintf(page
+ len
, " bytes: %llu\n",
151 ppe
->inqs
.ptrs
[i
]->stats
.bytes
);
152 len
+= sprintf(page
+ len
, " errors: %u\n",
153 ppe
->inqs
.ptrs
[i
]->stats
.errors
);
154 len
+= sprintf(page
+ len
, " drops: %llu\n",
155 ppe
->inqs
.ptrs
[i
]->stats
.dropped
);
156 } while (u64_stats_fetch_retry(&ppe
->inqs
.ptrs
[i
]->stats
.syncp
, sstart
));
158 /* FIXME: fits in page? */
163 static inline void add_to_ppe_squeue(struct ppe_squeue
*qs
,
168 qs
->ptrs
[q
->type
] = q
;
171 static void finish_ppe_squeue(struct ppe_squeue
*qs
)
173 struct ppe_queue
*q
= qs
->head
;
179 static int init_ppe_squeue(struct ppe_squeue
*queues
, unsigned int cpu
)
182 struct ppe_queue
*tmp
;
184 for (i
= 0; i
< NUM_TYPES
; ++i
) {
185 tmp
= kzalloc_node(sizeof(*tmp
), GFP_KERNEL
,
189 tmp
->type
= (enum path_type
) i
;
191 skb_queue_head_init(&tmp
->queue
);
192 add_to_ppe_squeue(queues
, tmp
);
195 finish_ppe_squeue(queues
);
199 static void cleanup_ppe_squeue(struct ppe_squeue
*queues
)
203 for (i
= 0; i
< NUM_TYPES
; ++i
) {
205 kfree(queues
->ptrs
[i
]);
206 queues
->ptrs
[i
] = NULL
;
211 int init_worker_engines(void)
216 struct sched_param param
= { .sched_priority
= MAX_RT_PRIO
-1 };
218 engines
= alloc_percpu(struct worker_engine
);
223 for_each_online_cpu(cpu
) {
224 struct worker_engine
*ppe
;
225 ppe
= per_cpu_ptr(engines
, cpu
);
227 ppe
->inqs
.head
= NULL
;
228 memset(&ppe
->inqs
, 0, sizeof(ppe
->inqs
));
229 ret
= init_ppe_squeue(&ppe
->inqs
, ppe
->cpu
);
232 atomic64_set(&ppe
->load
, 0);
233 memset(name
, 0, sizeof(name
));
234 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
235 ppe
->proc
= create_proc_read_entry(name
, 0400, lana_proc_dir
,
236 engine_procfs_stats
, ppe
);
242 init_waitqueue_head(&ppe
->wait_queue
);
243 ppe
->thread
= kthread_create_on_node(engine_thread
, NULL
,
244 cpu_to_node(cpu
), name
);
245 if (IS_ERR(ppe
->thread
)) {
246 printk(KERN_ERR
"[lana] Error creationg thread on "
252 kthread_bind(ppe
->thread
, cpu
);
253 sched_setscheduler(ppe
->thread
, SCHED_FIFO
, ¶m
);
254 wake_up_process(ppe
->thread
);
259 cleanup_worker_engines();
262 EXPORT_SYMBOL_GPL(init_worker_engines
);
264 void cleanup_worker_engines(void)
270 for_each_online_cpu(cpu
) {
271 struct worker_engine
*ppe
;
272 memset(name
, 0, sizeof(name
));
273 snprintf(name
, sizeof(name
), "ppe%u", cpu
);
274 ppe
= per_cpu_ptr(engines
, cpu
);
275 if (!IS_ERR(ppe
->thread
))
276 kthread_stop(ppe
->thread
);
278 remove_proc_entry(name
, lana_proc_dir
);
279 cleanup_ppe_squeue(&ppe
->inqs
);
282 free_percpu(engines
);
284 EXPORT_SYMBOL_GPL(cleanup_worker_engines
);