update on engines
[ana-net.git] / src / xt_engine.c
blob5f323ecbb3050f4abed15cd36917a3cbefae0041
1 /*
2 * Lightweight Autonomic Network Architecture
4 * LANA packet processing engines. Incoming packtes are scheduled onto one
5 * of the CPU-affine engines and processed on the Functional Block stack.
6 * There are two queues where packets can be added, one from PHY direction
7 * for incoming packets (ingress) and one from the socket handler direction
8 * for outgoing packets (egress). Support for NUMA-affinity added.
10 * Copyright 2011 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
11 * Swiss federal institute of technology (ETH Zurich)
12 * Subject to the GPL.
15 #include <linux/cpu.h>
16 #include <linux/kernel.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/kthread.h>
20 #include <linux/proc_fs.h>
21 #include <linux/u64_stats_sync.h>
22 #include <linux/prefetch.h>
23 #include <linux/sched.h>
24 #include <linux/hrtimer.h>
25 #include <linux/jiffies.h>
26 #include <linux/kernel_stat.h>
27 #include <linux/interrupt.h>
29 #include "xt_engine.h"
30 #include "xt_skb.h"
31 #include "xt_fblock.h"
33 struct worker_engine __percpu *engines;
34 EXPORT_SYMBOL_GPL(engines);
35 extern struct proc_dir_entry *lana_proc_dir;
37 void cleanup_worker_engines(void);
39 static inline int ppe_queues_have_load(struct worker_engine *ppe)
41 /* add new stuff here */
42 if (!skb_queue_empty(&ppe->inqs[TYPE_INGRESS].queue))
43 return TYPE_INGRESS;
44 if (!skb_queue_empty(&ppe->inqs[TYPE_EGRESS].queue))
45 return TYPE_EGRESS;
46 return -EAGAIN;
49 static inline int process_packet(struct sk_buff *skb, enum path_type dir)
51 int ret = PPE_ERROR;
52 idp_t cont;
53 struct fblock *fb;
54 prefetch(skb->cb);
55 while ((cont = read_next_idp_from_skb(skb))) {
56 fb = __search_fblock(cont);
57 if (unlikely(!fb))
58 return PPE_ERROR;
59 /* Called in rcu_read_lock context */
60 ret = fb->netfb_rx(fb, skb, &dir);
61 put_fblock(fb);
62 if (ret == PPE_DROPPED)
63 /* fblock freed skb */
64 return PPE_DROPPED;
65 prefetch(skb->cb);
67 return ret;
70 static int engine_thread(void *arg)
72 int ret, queue, need_lock = 0;
73 struct sk_buff *skb;
74 unsigned long cpu = smp_processor_id();
75 struct worker_engine *ppe = per_cpu_ptr(engines, cpu);
76 if (ppe->cpu != cpu)
77 panic("[lana] Engine scheduled on wrong CPU!\n");
78 printk(KERN_INFO "[lana] Packet Processing Engine running "
79 "on CPU%lu!\n", cpu);
80 if (!rcu_read_lock_held())
81 need_lock = 1;
82 set_current_state(TASK_INTERRUPTIBLE);
83 while (likely(!kthread_should_stop())) {
84 preempt_disable();
85 if ((queue = ppe_queues_have_load(ppe)) < 0) {
86 preempt_enable_no_resched();
87 schedule();
88 preempt_disable();
90 __set_current_state(TASK_RUNNING);
91 while ((skb = skb_dequeue(&ppe->inqs[queue].queue)) != NULL) {
92 if (unlikely(skb_is_time_marked_first(skb)))
93 ppe->timef = ktime_get();
94 if (need_lock)
95 rcu_read_lock();
96 ret = process_packet(skb, ppe->inqs[queue].type);
97 if (need_lock)
98 rcu_read_unlock();
99 if (unlikely(skb_is_time_marked_last(skb)))
100 ppe->timel = ktime_get();
101 ppe->pkts++;
102 u64_stats_update_begin(&ppe->inqs[queue].stats.syncp);
103 ppe->inqs[queue].stats.packets++;
104 ppe->inqs[queue].stats.bytes += skb->len;
105 if (ret == PPE_DROPPED)
106 ppe->inqs[queue].stats.dropped++;
107 else if (unlikely(ret == PPE_ERROR)) {
108 ppe->inqs[queue].stats.errors++;
109 kfree_skb(skb);
111 u64_stats_update_end(&ppe->inqs[queue].stats.syncp);
112 preempt_enable_no_resched();
113 cond_resched();
114 preempt_disable();
116 preempt_enable();
117 set_current_state(TASK_INTERRUPTIBLE);
119 __set_current_state(TASK_RUNNING);
120 printk(KERN_INFO "[lana] Packet Processing Engine stopped "
121 "on CPU%u!\n", smp_processor_id());
122 return 0;
125 static int engine_procfs_stats(char *page, char **start, off_t offset,
126 int count, int *eof, void *data)
128 int i;
129 off_t len = 0;
130 struct worker_engine *ppe = data;
131 unsigned int sstart;
133 len += sprintf(page + len, "engine: %p\n", ppe);
134 len += sprintf(page + len, "cpu: %u, numa node: %d\n",
135 ppe->cpu, cpu_to_node(ppe->cpu));
136 len += sprintf(page + len, "hrt: %llu us\n",
137 ktime_us_delta(ppe->timel, ppe->timef));
138 for (i = 0; i < NUM_TYPES; ++i) {
139 do {
140 sstart = u64_stats_fetch_begin(&ppe->inqs[i].stats.syncp);
141 len += sprintf(page + len, "queue: %p\n", &ppe->inqs[i]);
142 len += sprintf(page + len, " type: %u\n",
143 ppe->inqs[i].type);
144 len += sprintf(page + len, " packets: %llu\n",
145 ppe->inqs[i].stats.packets);
146 len += sprintf(page + len, " bytes: %llu\n",
147 ppe->inqs[i].stats.bytes);
148 len += sprintf(page + len, " errors: %u\n",
149 ppe->inqs[i].stats.errors);
150 len += sprintf(page + len, " drops: %llu\n",
151 ppe->inqs[i].stats.dropped);
152 } while (u64_stats_fetch_retry(&ppe->inqs[i].stats.syncp, sstart));
154 /* FIXME: fits in page? */
155 *eof = 1;
156 return len;
159 static enum hrtimer_restart engine_timer_handler(struct hrtimer *self)
161 struct tasklet_hrtimer *thr = container_of(self, struct tasklet_hrtimer, timer);
162 struct worker_engine *ppe = container_of(thr, struct worker_engine, htimer);
163 if (ppe->thread->state != TASK_RUNNING)
164 wake_up_process(ppe->thread);
165 ppe->ppe_timer_set = 0;
166 ppe->pkts = 0;
167 return HRTIMER_NORESTART;
170 int init_worker_engines(void)
172 int i, ret = 0;
173 unsigned int cpu;
174 char name[64];
176 engines = alloc_percpu(struct worker_engine);
177 if (!engines)
178 return -ENOMEM;
180 get_online_cpus();
181 for_each_online_cpu(cpu) {
182 struct worker_engine *ppe;
183 #ifdef __MIGRATE
184 if (cpu == USERSPACECPU)
185 continue;
186 #endif /* __MIGRATE */
187 ppe = per_cpu_ptr(engines, cpu);
188 ppe->cpu = cpu;
189 memset(&ppe->inqs, 0, sizeof(ppe->inqs));
190 for (i = 0; i < NUM_QUEUES; ++i)
191 skb_queue_head_init(&ppe->inqs[i].queue);
192 memset(name, 0, sizeof(name));
193 snprintf(name, sizeof(name), "ppe%u", cpu);
194 ppe->proc = create_proc_read_entry(name, 0400, lana_proc_dir,
195 engine_procfs_stats, ppe);
196 if (!ppe->proc) {
197 ret = -ENOMEM;
198 break;
200 ppe->pkts = 0;
201 ppe->ppe_timer_set = 0;
202 ppe->thread = kthread_create_on_node(engine_thread, NULL,
203 cpu_to_node(cpu), name);
204 if (IS_ERR(ppe->thread)) {
205 printk(KERN_ERR "[lana] Error creationg thread on "
206 "node %u!\n", cpu);
207 ret = -EIO;
208 break;
210 kthread_bind(ppe->thread, cpu);
211 wake_up_process(ppe->thread);
212 tasklet_hrtimer_init(&ppe->htimer, engine_timer_handler,
213 CLOCK_REALTIME, HRTIMER_MODE_ABS);
215 put_online_cpus();
217 if (ret < 0)
218 cleanup_worker_engines();
219 return ret;
221 EXPORT_SYMBOL_GPL(init_worker_engines);
223 void cleanup_worker_engines(void)
225 unsigned int cpu;
226 char name[64];
228 get_online_cpus();
229 for_each_online_cpu(cpu) {
230 struct worker_engine *ppe;
231 #ifdef __MIGRATE
232 if (cpu == USERSPACECPU)
233 continue;
234 #endif /* __MIGRATE */
235 memset(name, 0, sizeof(name));
236 snprintf(name, sizeof(name), "ppe%u", cpu);
237 ppe = per_cpu_ptr(engines, cpu);
238 if (!IS_ERR(ppe->thread)) {
239 tasklet_hrtimer_cancel(&ppe->htimer);
240 kthread_stop(ppe->thread);
242 if (ppe->proc)
243 remove_proc_entry(name, lana_proc_dir);
245 put_online_cpus();
246 free_percpu(engines);
248 EXPORT_SYMBOL_GPL(cleanup_worker_engines);