new retransmit part 2
[cor_2_6_31.git] / net / cor / rcv.c
blob08d8a7e5fcb327426fe0f7b19fa850ec2b60bc66
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 void drain_ooo_queue(struct conn *rconn)
38 struct sk_buff *skb;
40 BUG_ON(SOURCE_IN != rconn->sourcetype);
42 skb = rconn->source.in.reorder_queue.next;
44 while ((void *) skb != (void *) &(rconn->source.in.reorder_queue)) {
45 struct skb_procstate *ps = skb_pstate(skb);
46 int drop;
48 if (rconn->source.in.next_seqno != ps->funcstate.rcv2.seqno)
49 break;
51 drop = receive_skb(rconn, skb);
52 if (drop)
53 break;
55 skb_unlink(skb, &(rconn->source.in.reorder_queue));
56 rconn->source.in.ooo_packets--;
57 atomic_dec(&(rconn->source.in.nb->ooo_packets));
58 atomic_dec(&ooo_packets);
60 rconn->source.in.next_seqno += skb->len;
64 static int _conn_rcv_ooo(struct conn *rconn, struct sk_buff *skb)
66 struct skb_procstate *ps = skb_pstate(skb);
67 struct sk_buff_head *reorder_queue = &(rconn->source.in.reorder_queue);
68 struct sk_buff *curr = reorder_queue->next;
70 long ooo;
72 rconn->source.in.ooo_packets++;
73 if (rconn->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
74 goto drop_ooo3;
76 ooo = atomic_inc_return(&(rconn->source.in.nb->ooo_packets));
77 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
78 goto drop_ooo2;
80 ooo = atomic_inc_return(&ooo_packets);
81 if (ooo > MAX_TOTAL_OOO_PACKETS)
82 goto drop_ooo1;
85 while (1) {
86 struct skb_procstate *ps2 = skb_pstate(curr);
88 if ((void *) curr == (void *) reorder_queue) {
89 skb_queue_tail(reorder_queue, skb);
90 break;
93 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
94 skb_insert(curr, skb, reorder_queue);
95 break;
99 if (0) {
100 drop_ooo1:
101 atomic_dec(&ooo_packets);
102 drop_ooo2:
103 atomic_dec(&(rconn->source.in.nb->ooo_packets));
104 drop_ooo3:
105 rconn->source.in.ooo_packets--;
107 return 1;
110 return 0;
113 static void _conn_rcv(struct conn *rconn, struct sk_buff *skb)
115 struct skb_procstate *ps = skb_pstate(skb);
116 struct control_msg_out *cm = alloc_control_msg();
118 int in_order;
119 int drop = 1;
121 BUG_ON(rconn->sourcetype != SOURCE_IN);
123 if (unlikely(cm == 0)) {
124 kfree_skb(skb);
125 return;
128 mutex_lock(&(rconn->rcv_lock));
130 in_order = (rconn->source.in.next_seqno == ps->funcstate.rcv2.seqno);
132 if (in_order == 0) {
133 drop = _conn_rcv_ooo(rconn, skb);
134 } else {
135 rconn->source.in.next_seqno += skb->len;
136 drop = receive_skb(rconn, skb);
139 if (drop) {
140 kfree_skb(skb);
141 free_control_msg(cm);
142 } else {
143 send_ack(cm, rconn->source.in.nb, rconn->source.in.conn_id,
144 ps->funcstate.rcv2.seqno);
147 if (in_order)
148 drain_ooo_queue(rconn);
150 mutex_unlock(&(rconn->rcv_lock));
153 static void conn_rcv(struct sk_buff *skb, __u32 conn_id, __u32 seqno)
155 struct conn *rconn;
156 struct skb_procstate *ps = skb_pstate(skb);
158 ps->funcstate.rcv2.seqno = seqno;
160 rconn = get_conn(conn_id);
162 if (unlikely(rconn == 0)) {
163 printk(KERN_ERR "unknown conn_id when receiving: %d", conn_id);
164 kfree_skb(skb);
165 return;
167 _conn_rcv(rconn, skb);
168 kref_put(&(rconn->ref), free_conn);
171 void conn_rcv_buildskb(char *data, __u32 datalen, __u32 conn_id, __u32 seqno)
173 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
174 char *dst = skb_put(skb, datalen);
175 memcpy(dst, data, datalen);
176 conn_rcv(skb, conn_id, seqno);
179 static void rcv_data(struct sk_buff *skb)
181 __u32 conn_id;
182 __u32 seqno;
184 char *connid_p = cor_pull_skb(skb, 4);
185 char *seqno_p = cor_pull_skb(skb, 4);
187 ((char *)&conn_id)[0] = connid_p[0];
188 ((char *)&conn_id)[1] = connid_p[1];
189 ((char *)&conn_id)[2] = connid_p[2];
190 ((char *)&conn_id)[3] = connid_p[3];
192 ((char *)&seqno)[0] = seqno_p[0];
193 ((char *)&seqno)[1] = seqno_p[1];
194 ((char *)&seqno)[2] = seqno_p[2];
195 ((char *)&seqno)[3] = seqno_p[3];
197 conn_id = be32_to_cpu(conn_id);
198 seqno = be32_to_cpu(seqno);
200 if (conn_id == 0) {
201 struct neighbor *nb = get_neigh_by_mac(skb);
202 if (unlikely(nb == 0))
203 goto drop;
204 kernel_packet(nb, skb, seqno);
205 kref_put(&(nb->ref), neighbor_free);
206 } else {
207 conn_rcv(skb, conn_id, seqno);
210 if (0) {
211 drop:
212 kfree_skb(skb);
216 static void rcv(struct work_struct *work)
218 struct sk_buff *skb = skb_from_pstate(container_of(work,
219 struct skb_procstate, funcstate.rcv.work));
221 __u8 packet_type;
222 char *packet_type_p;
224 atomic_dec(&packets_in_workqueue);
226 packet_type_p = cor_pull_skb(skb, 1);
228 if (unlikely(packet_type_p == 0))
229 goto drop;
231 packet_type = *packet_type_p;
233 if (packet_type == PACKET_TYPE_ANNOUNCE) {
234 rcv_announce(skb);
235 return;
238 if (unlikely(packet_type != PACKET_TYPE_DATA))
239 goto drop;
241 rcv_data(skb);
243 if (0) {
244 drop:
245 kfree_skb(skb);
249 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
250 struct packet_type *pt, struct net_device *orig_dev)
252 struct skb_procstate *ps = skb_pstate(skb);
253 long queuelen;
255 if (skb->pkt_type == PACKET_OTHERHOST)
256 goto drop;
258 BUG_ON(skb->next != 0);
260 queuelen = atomic_inc_return(&packets_in_workqueue);
262 BUG_ON(queuelen <= 0);
264 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
265 atomic_dec(&packets_in_workqueue);
266 goto drop;
269 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
270 queue_work(packet_wq, &(ps->funcstate.rcv.work));
271 return NET_RX_SUCCESS;
273 drop:
274 kfree_skb(skb);
275 return NET_RX_DROP;
278 static struct packet_type ptype_cor = {
279 .type = htons(ETH_P_COR),
280 .dev = 0,
281 .func = queue_rcv_processing
284 int __init cor_rcv_init(void)
286 BUG_ON(sizeof(struct skb_procstate) > 48);
287 packet_wq = create_workqueue("cor_packet");
288 dev_add_pack(&ptype_cor);
289 return 0;
292 MODULE_LICENSE("GPL");