move kfree_skb into fb_dummy
[ana-net.git] / src / xt_engine.c
blob4447bbcb3ccfd9f481f75f0c45bfd38c3a0aca1e
1 /*
2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
12 * Subject to the GPL.
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/wait.h>
20 #include <linux/kthread.h>
21 #include <linux/proc_fs.h>
22 #include <linux/u64_stats_sync.h>
23 #include <linux/prefetch.h>
24 #include <linux/sched.h>
25 #include <linux/hrtimer.h>
27 #include "xt_engine.h"
28 #include "xt_skb.h"
29 #include "xt_fblock.h"
31 struct worker_engine __percpu *engines;
32 EXPORT_SYMBOL_GPL(engines);
33 extern struct proc_dir_entry *lana_proc_dir;
35 void cleanup_worker_engines(void);
37 static inline struct ppe_queue *first_ppe_queue(struct worker_engine *ppe)
39 return ppe->inqs.head;
42 static inline struct ppe_queue *next_filled_ppe_queue(struct ppe_queue *ppeq)
44 ppeq = ppeq->next;
45 testq:
46 if (!skb_queue_empty(&ppeq->queue))
47 return ppeq;
48 if (!skb_queue_empty(&ppeq->next->queue))
49 return ppeq->next;
50 ppeq = ppeq->next->next;
51 goto testq;
54 static inline int ppe_queues_have_load(struct worker_engine *ppe)
56 /* add new stuff here */
57 if (!skb_queue_empty(&ppe->inqs.ptrs[TYPE_INGRESS]->queue))
58 return 1;
59 if (!skb_queue_empty(&ppe->inqs.ptrs[TYPE_EGRESS]->queue))
60 return 1;
61 return 0;
64 static inline int process_packet(struct sk_buff *skb, enum path_type dir)
66 int ret = PPE_DROPPED;
67 idp_t cont;
68 struct fblock *fb;
69 prefetch(skb->cb);
70 while ((cont = read_next_idp_from_skb(skb))) {
71 fb = __search_fblock(cont);
72 if (unlikely(!fb))
73 return PPE_ERROR;
74 /* Called in rcu_read_lock context */
75 ret = fb->ops->netfb_rx(fb, skb, &dir);
76 put_fblock(fb);
77 if (ret == PPE_DROPPED)
78 return PPE_DROPPED;
79 prefetch(skb->cb);
81 return ret;
84 static int engine_thread(void *arg)
86 int ret, need_lock = 0;
87 struct sk_buff *skb;
88 struct ppe_queue *ppeq;
89 struct worker_engine *ppe = per_cpu_ptr(engines,
90 smp_processor_id());
92 if (ppe->cpu != smp_processor_id())
93 panic("[lana] Engine scheduled on wrong CPU!\n");
94 printk(KERN_INFO "[lana] Packet Processing Engine running "
95 "on CPU%u!\n", smp_processor_id());
97 if (!rcu_read_lock_held())
98 need_lock = 1;
99 ppeq = first_ppe_queue(ppe);
100 while (likely(!kthread_should_stop())) {
101 if (!ppe_queues_have_load(ppe)) {
102 wait_event_interruptible_timeout(ppe->wait_queue,
103 (kthread_should_stop() ||
104 ppe_queues_have_load(ppe)), 1);
105 continue;
108 ppeq = next_filled_ppe_queue(ppeq);
109 skb = skb_dequeue(&ppeq->queue);
110 if (unlikely(skb_is_time_marked_first(skb)))
111 ppe->timef = ktime_get();
112 if (need_lock)
113 rcu_read_lock();
114 ret = process_packet(skb, ppeq->type);
115 if (need_lock)
116 rcu_read_unlock();
117 if (unlikely(skb_is_time_marked_last(skb)))
118 ppe->timel = ktime_get();
120 u64_stats_update_begin(&ppeq->stats.syncp);
121 ppeq->stats.packets++;
122 ppeq->stats.bytes += skb->len;
123 if (ret == PPE_DROPPED)
124 ppeq->stats.dropped++;
125 else if (unlikely(ret == PPE_ERROR))
126 ppeq->stats.errors++;
127 u64_stats_update_end(&ppeq->stats.syncp);
130 printk(KERN_INFO "[lana] Packet Processing Engine stopped "
131 "on CPU%u!\n", smp_processor_id());
132 return 0;
135 static int engine_procfs_stats(char *page, char **start, off_t offset,
136 int count, int *eof, void *data)
138 int i;
139 off_t len = 0;
140 struct worker_engine *ppe = data;
141 unsigned int sstart;
143 len += sprintf(page + len, "engine: %p\n", ppe);
144 len += sprintf(page + len, "cpu: %u, numa node: %d\n",
145 ppe->cpu, cpu_to_node(ppe->cpu));
146 len += sprintf(page + len, "hrt: %llu us\n",
147 ktime_us_delta(ppe->timel, ppe->timef));
148 for (i = 0; i < NUM_TYPES; ++i) {
149 do {
150 sstart = u64_stats_fetch_begin(&ppe->inqs.ptrs[i]->stats.syncp);
151 len += sprintf(page + len, "queue: %p\n",
152 ppe->inqs.ptrs[i]);
153 len += sprintf(page + len, " type: %u\n",
154 ppe->inqs.ptrs[i]->type);
155 len += sprintf(page + len, " packets: %llu\n",
156 ppe->inqs.ptrs[i]->stats.packets);
157 len += sprintf(page + len, " bytes: %llu\n",
158 ppe->inqs.ptrs[i]->stats.bytes);
159 len += sprintf(page + len, " errors: %u\n",
160 ppe->inqs.ptrs[i]->stats.errors);
161 len += sprintf(page + len, " drops: %llu\n",
162 ppe->inqs.ptrs[i]->stats.dropped);
163 } while (u64_stats_fetch_retry(&ppe->inqs.ptrs[i]->stats.syncp, sstart));
165 /* FIXME: fits in page? */
166 *eof = 1;
167 return len;
170 static inline void add_to_ppe_squeue(struct ppe_squeue *qs,
171 struct ppe_queue *q)
173 q->next = qs->head;
174 qs->head = q;
175 qs->ptrs[q->type] = q;
178 static void finish_ppe_squeue(struct ppe_squeue *qs)
180 struct ppe_queue *q = qs->head;
181 while (q->next)
182 q = q->next;
183 q->next = qs->head;
186 static int init_ppe_squeue(struct ppe_squeue *queues, unsigned int cpu)
188 int i;
189 struct ppe_queue *tmp;
191 for (i = 0; i < NUM_TYPES; ++i) {
192 tmp = kzalloc_node(sizeof(*tmp), GFP_KERNEL,
193 cpu_to_node(cpu));
194 if (!tmp)
195 return -ENOMEM;
196 tmp->type = (enum path_type) i;
197 tmp->next = NULL;
198 skb_queue_head_init(&tmp->queue);
199 add_to_ppe_squeue(queues, tmp);
202 finish_ppe_squeue(queues);
203 return 0;
206 static void cleanup_ppe_squeue(struct ppe_squeue *queues)
208 int i;
210 for (i = 0; i < NUM_TYPES; ++i) {
211 if (queues->ptrs[i])
212 kfree(queues->ptrs[i]);
213 queues->ptrs[i] = NULL;
215 queues->head = NULL;
218 int init_worker_engines(void)
220 int ret = 0;
221 unsigned int cpu;
222 char name[64];
223 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
225 engines = alloc_percpu(struct worker_engine);
226 if (!engines)
227 return -ENOMEM;
229 get_online_cpus();
230 for_each_online_cpu(cpu) {
231 struct worker_engine *ppe;
232 ppe = per_cpu_ptr(engines, cpu);
233 ppe->cpu = cpu;
234 ppe->inqs.head = NULL;
235 memset(&ppe->inqs, 0, sizeof(ppe->inqs));
236 ret = init_ppe_squeue(&ppe->inqs, ppe->cpu);
237 if (ret < 0)
238 break;
239 memset(name, 0, sizeof(name));
240 snprintf(name, sizeof(name), "ppe%u", cpu);
241 ppe->proc = create_proc_read_entry(name, 0400, lana_proc_dir,
242 engine_procfs_stats, ppe);
243 if (!ppe->proc) {
244 ret = -ENOMEM;
245 break;
248 init_waitqueue_head(&ppe->wait_queue);
249 ppe->thread = kthread_create_on_node(engine_thread, NULL,
250 cpu_to_node(cpu), name);
251 if (IS_ERR(ppe->thread)) {
252 printk(KERN_ERR "[lana] Error creationg thread on "
253 "node %u!\n", cpu);
254 ret = -EIO;
255 break;
258 kthread_bind(ppe->thread, cpu);
259 sched_setscheduler(ppe->thread, SCHED_FIFO, &param);
260 wake_up_process(ppe->thread);
262 put_online_cpus();
264 if (ret < 0)
265 cleanup_worker_engines();
266 return ret;
268 EXPORT_SYMBOL_GPL(init_worker_engines);
270 void cleanup_worker_engines(void)
272 unsigned int cpu;
273 char name[64];
275 get_online_cpus();
276 for_each_online_cpu(cpu) {
277 struct worker_engine *ppe;
278 memset(name, 0, sizeof(name));
279 snprintf(name, sizeof(name), "ppe%u", cpu);
280 ppe = per_cpu_ptr(engines, cpu);
281 if (!IS_ERR(ppe->thread))
282 kthread_stop(ppe->thread);
283 if (ppe->proc)
284 remove_proc_entry(name, lana_proc_dir);
285 cleanup_ppe_squeue(&ppe->inqs);
287 put_online_cpus();
288 free_percpu(engines);
290 EXPORT_SYMBOL_GPL(cleanup_worker_engines);