use idp instead of pointer
[ana-net.git] / src / xt_engine.c
blob751fc602d7de563647f202fbc421a3ad502be829
1 /*
2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
12 * Subject to the GPL.
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/wait.h>
20 #include <linux/kthread.h>
21 #include <linux/proc_fs.h>
22 #include <linux/u64_stats_sync.h>
23 #include <linux/prefetch.h>
24 #include <linux/sched.h>
26 #include "xt_engine.h"
27 #include "xt_skb.h"
28 #include "xt_fblock.h"
30 struct worker_engine __percpu *engines;
31 EXPORT_SYMBOL_GPL(engines);
32 extern struct proc_dir_entry *lana_proc_dir;
34 void cleanup_worker_engines(void);
36 static inline struct ppe_queue *first_ppe_queue(struct worker_engine *ppe)
38 return ppe->inqs.head;
41 static inline struct ppe_queue *next_filled_ppe_queue(struct ppe_queue *ppeq)
43 do {
44 ppeq = ppeq->next;
45 prefetch(ppeq->next);
46 } while (skb_queue_empty(&ppeq->queue));
48 return ppeq;
51 static inline int ppe_queues_have_load(struct worker_engine *ppe)
53 return atomic64_read(&ppe->load) != 0;
56 static inline void ppe_queues_reduce_load(struct worker_engine *ppe)
58 atomic64_dec(&ppe->load);
61 static int process_packet(struct sk_buff *skb, enum path_type dir)
63 int ret = PPE_DROPPED;
64 idp_t cont;
65 struct fblock *fb;
67 while ((cont = read_next_idp_from_skb(skb))) {
68 fb = search_fblock(cont);
69 if (unlikely(!fb)) {
70 ret = PPE_ERROR;
71 break;
73 ret = fb->ops->netfb_rx(fb, skb, &dir);
74 put_fblock(fb);
75 if (ret == PPE_DROPPED)
76 break;
79 return ret;
82 static int engine_thread(void *arg)
84 int ret;
85 struct sk_buff *skb;
86 struct ppe_queue *ppeq;
87 struct worker_engine *ppe = per_cpu_ptr(engines,
88 smp_processor_id());
90 if (ppe->cpu != smp_processor_id())
91 panic("[lana] Engine scheduled on wrong CPU!\n");
92 printk(KERN_INFO "[lana] Packet Processing Engine running "
93 "on CPU%u!\n", smp_processor_id());
95 ppeq = first_ppe_queue(ppe);
96 while (1) {
97 wait_event_interruptible(ppe->wait_queue,
98 (kthread_should_stop() ||
99 ppe_queues_have_load(ppe)));
100 if (unlikely(kthread_should_stop()))
101 break;
103 ppeq = next_filled_ppe_queue(ppeq);
104 ppe_queues_reduce_load(ppe);
105 skb = skb_dequeue(&ppeq->queue);
106 ret = process_packet(skb, ppeq->type);
108 u64_stats_update_begin(&ppeq->stats.syncp);
109 ppeq->stats.packets++;
110 ppeq->stats.bytes += skb->len;
111 u64_stats_update_end(&ppeq->stats.syncp);
112 if (unlikely(ret == PPE_DROPPED)) {
113 u64_stats_update_begin(&ppeq->stats.syncp);
114 ppeq->stats.dropped++;
115 u64_stats_update_end(&ppeq->stats.syncp);
116 } else if (unlikely(ret == PPE_ERROR)) {
117 ppeq->stats.errors++;
120 kfree_skb(skb);
123 printk(KERN_INFO "[lana] Packet Processing Engine stopped "
124 "on CPU%u!\n", smp_processor_id());
125 return 0;
128 static int engine_procfs_stats(char *page, char **start, off_t offset,
129 int count, int *eof, void *data)
131 int i;
132 off_t len = 0;
133 struct worker_engine *ppe = data;
134 unsigned int sstart;
136 len += sprintf(page + len, "engine: %p\n", ppe);
137 len += sprintf(page + len, "cpu: %u, numa node: %d\n",
138 ppe->cpu, cpu_to_node(ppe->cpu));
139 len += sprintf(page + len, "load: %lld\n",
140 atomic64_read(&ppe->load));
141 for (i = 0; i < NUM_TYPES; ++i) {
142 do {
143 sstart = u64_stats_fetch_begin(&ppe->inqs.ptrs[i]->stats.syncp);
144 len += sprintf(page + len, "queue: %p\n",
145 ppe->inqs.ptrs[i]);
146 len += sprintf(page + len, " type: %u\n",
147 ppe->inqs.ptrs[i]->type);
148 len += sprintf(page + len, " packets: %llu\n",
149 ppe->inqs.ptrs[i]->stats.packets);
150 len += sprintf(page + len, " bytes: %llu\n",
151 ppe->inqs.ptrs[i]->stats.bytes);
152 len += sprintf(page + len, " errors: %u\n",
153 ppe->inqs.ptrs[i]->stats.errors);
154 len += sprintf(page + len, " drops: %llu\n",
155 ppe->inqs.ptrs[i]->stats.dropped);
156 } while (u64_stats_fetch_retry(&ppe->inqs.ptrs[i]->stats.syncp, sstart));
158 /* FIXME: fits in page? */
159 *eof = 1;
160 return len;
163 static inline void add_to_ppe_squeue(struct ppe_squeue *qs,
164 struct ppe_queue *q)
166 q->next = qs->head;
167 qs->head = q;
168 qs->ptrs[q->type] = q;
171 static void finish_ppe_squeue(struct ppe_squeue *qs)
173 struct ppe_queue *q = qs->head;
174 while (q->next)
175 q = q->next;
176 q->next = qs->head;
179 static int init_ppe_squeue(struct ppe_squeue *queues, unsigned int cpu)
181 int i;
182 struct ppe_queue *tmp;
184 for (i = 0; i < NUM_TYPES; ++i) {
185 tmp = kzalloc_node(sizeof(*tmp), GFP_KERNEL,
186 cpu_to_node(cpu));
187 if (!tmp)
188 return -ENOMEM;
189 tmp->type = (enum path_type) i;
190 tmp->next = NULL;
191 skb_queue_head_init(&tmp->queue);
192 add_to_ppe_squeue(queues, tmp);
195 finish_ppe_squeue(queues);
196 return 0;
199 static void cleanup_ppe_squeue(struct ppe_squeue *queues)
201 int i;
203 for (i = 0; i < NUM_TYPES; ++i) {
204 if (queues->ptrs[i])
205 kfree(queues->ptrs[i]);
206 queues->ptrs[i] = NULL;
208 queues->head = NULL;
211 int init_worker_engines(void)
213 int ret = 0;
214 unsigned int cpu;
215 char name[64];
216 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
218 engines = alloc_percpu(struct worker_engine);
219 if (!engines)
220 return -ENOMEM;
222 get_online_cpus();
223 for_each_online_cpu(cpu) {
224 struct worker_engine *ppe;
225 ppe = per_cpu_ptr(engines, cpu);
226 ppe->cpu = cpu;
227 ppe->inqs.head = NULL;
228 memset(&ppe->inqs, 0, sizeof(ppe->inqs));
229 ret = init_ppe_squeue(&ppe->inqs, ppe->cpu);
230 if (ret < 0)
231 break;
232 atomic64_set(&ppe->load, 0);
233 memset(name, 0, sizeof(name));
234 snprintf(name, sizeof(name), "ppe%u", cpu);
235 ppe->proc = create_proc_read_entry(name, 0400, lana_proc_dir,
236 engine_procfs_stats, ppe);
237 if (!ppe->proc) {
238 ret = -ENOMEM;
239 break;
242 init_waitqueue_head(&ppe->wait_queue);
243 ppe->thread = kthread_create_on_node(engine_thread, NULL,
244 cpu_to_node(cpu), name);
245 if (IS_ERR(ppe->thread)) {
246 printk(KERN_ERR "[lana] Error creationg thread on "
247 "node %u!\n", cpu);
248 ret = -EIO;
249 break;
252 kthread_bind(ppe->thread, cpu);
253 sched_setscheduler(ppe->thread, SCHED_FIFO, &param);
254 wake_up_process(ppe->thread);
256 put_online_cpus();
258 if (ret < 0)
259 cleanup_worker_engines();
260 return ret;
262 EXPORT_SYMBOL_GPL(init_worker_engines);
264 void cleanup_worker_engines(void)
266 unsigned int cpu;
267 char name[64];
269 get_online_cpus();
270 for_each_online_cpu(cpu) {
271 struct worker_engine *ppe;
272 memset(name, 0, sizeof(name));
273 snprintf(name, sizeof(name), "ppe%u", cpu);
274 ppe = per_cpu_ptr(engines, cpu);
275 if (!IS_ERR(ppe->thread))
276 kthread_stop(ppe->thread);
277 if (ppe->proc)
278 remove_proc_entry(name, lana_proc_dir);
279 cleanup_ppe_squeue(&ppe->inqs);
281 put_online_cpus();
282 free_percpu(engines);
284 EXPORT_SYMBOL_GPL(cleanup_worker_engines);