conn rcv_lock converted to spinlock, struct cor_sock created, kernel_packet skb_clone...
[cor_2_6_31.git] / net / cor / snd.c
blob5429ce6c5c4374b6fc00ad6d8184f1f5bc775a62
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *trgt_out_o;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->trgt_out_o->ref), free_conn);
49 DEFINE_SPINLOCK(queues_lock);
50 LIST_HEAD(queues);
51 struct timer_list qos_resume_timer;
52 struct tasklet_struct qos_resume_task;
53 int qos_resume_scheduled;
55 void free_qos(struct kref *ref)
57 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
58 kfree(q);
61 /* Highest bidder "pays" the credits the second has bid */
62 static int _resume_conns(struct qos_queue *q)
64 struct conn *best = 0;
65 __u64 bestcredit = 0;
66 __u64 secondcredit = 0;
68 int rc;
70 struct list_head *lh = q->conns_waiting.next;
72 while (lh != &(q->conns_waiting)) {
73 struct conn *trgt_out_o = container_of(lh, struct conn,
74 target.out.rb.lh);
75 __u64 credits;
77 lh = lh->next;
79 refresh_conn_credits(trgt_out_o, 0, 0);
81 spin_lock_bh(&(trgt_out_o->rcv_lock));
83 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
85 if (unlikely(trgt_out_o->isreset != 0)) {
86 trgt_out_o->target.out.rb.in_queue = 0;
87 list_del(&(trgt_out_o->target.out.rb.lh));
88 spin_unlock_bh(&(trgt_out_o->rcv_lock));
89 kref_put(&(trgt_out_o->ref), free_conn);
91 continue;
94 BUG_ON(trgt_out_o->data_buf.read_remaining == 0);
96 if (may_alloc_control_msg(trgt_out_o->target.out.nb,
97 ACM_PRIORITY_LOW) == 0)
98 continue;
100 if (trgt_out_o->credits <= 0)
101 credits = 0;
102 else
103 credits = multiply_div(trgt_out_o->credits, 1LL << 24,
104 trgt_out_o->data_buf.read_remaining);
105 spin_unlock_bh(&(trgt_out_o->rcv_lock));
107 if (best == 0 || bestcredit < credits) {
108 secondcredit = bestcredit;
109 best = trgt_out_o;
110 bestcredit = credits;
111 } else if (secondcredit < credits) {
112 secondcredit = credits;
116 if (best == 0)
117 return RC_FLUSH_CONN_OUT_OK;
119 spin_lock_bh(&(best->rcv_lock));
120 rc = flush_out(best, 1, (__u32) (secondcredit >> 32));
122 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT) {
123 best->target.out.rb.in_queue = 0;
124 list_del(&(best->target.out.rb.lh));
126 spin_unlock_bh(&(best->rcv_lock));
128 refresh_conn_credits(best, 0, 0);
130 if (rc == RC_FLUSH_CONN_OUT_OK_SENT)
131 wake_sender(best);
133 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT)
134 kref_put(&(best->ref), free_conn);
136 return rc;
139 static int resume_conns(struct qos_queue *q)
141 while (list_empty(&(q->conns_waiting)) == 0) {
142 int rc = _resume_conns(q);
143 if (rc != RC_FLUSH_CONN_OUT_OK &&
144 rc != RC_FLUSH_CONN_OUT_OK_SENT)
145 return 1;
147 return 0;
150 static int send_retrans(struct neighbor *nb, int fromqos);
152 static int __qos_resume(struct qos_queue *q, int caller)
154 unsigned long iflags;
155 int rc = 0;
156 struct list_head *lh;
158 spin_lock_irqsave(&(q->qlock), iflags);
160 if (caller == QOS_CALLER_KPACKET)
161 lh = &(q->conn_retrans_waiting);
162 else if (caller == QOS_CALLER_CONN_RETRANS)
163 lh = &(q->kpackets_waiting);
164 else if (caller == QOS_CALLER_ANNOUNCE)
165 lh = &(q->announce_waiting);
166 else
167 BUG();
169 while (list_empty(lh) == 0) {
170 struct list_head *curr = lh->next;
171 struct resume_block *rb = container_of(curr,
172 struct resume_block, lh);
173 rb->in_queue = 0;
174 list_del(curr);
176 if (caller == QOS_CALLER_KPACKET) {
177 struct neighbor *nb = container_of(rb, struct neighbor,
178 rb_kp);
179 kref_put(&(nb->ref), neighbor_free);
180 spin_unlock_irqrestore(&(q->qlock), iflags);
181 rc = send_messages(nb, 1);
182 kref_put(&(nb->ref), neighbor_free);
183 spin_lock_irqsave(&(q->qlock), iflags);
184 } else if (caller == QOS_CALLER_CONN_RETRANS) {
185 struct neighbor *nb = container_of(rb, struct neighbor,
186 rb_cr);
187 #warning todo do not send if neighbor is stalled
188 kref_put(&(nb->ref), neighbor_free);
189 spin_unlock_irqrestore(&(q->qlock), iflags);
190 rc = send_retrans(nb, 1);
191 kref_put(&(nb->ref), neighbor_free);
192 spin_lock_irqsave(&(q->qlock), iflags);
193 } else if (caller == QOS_CALLER_ANNOUNCE) {
194 struct announce_data *ann = container_of(rb,
195 struct announce_data, rb);
196 kref_get(&(ann->ref));
197 spin_unlock_irqrestore(&(q->qlock), iflags);
198 rc = send_announce_qos(ann);
199 kref_put(&(ann->ref), announce_data_free);
200 spin_lock_irqsave(&(q->qlock), iflags);
201 } else {
202 BUG();
205 if (rc != 0) {
206 if (rb->in_queue == 0) {
207 rb->in_queue = 1;
208 list_add(curr, lh);
210 break;
213 if (caller == QOS_CALLER_KPACKET) {
214 kref_put(&(container_of(rb, struct neighbor,
215 rb_kp)->ref), neighbor_free);
216 } else if (caller == QOS_CALLER_CONN_RETRANS) {
217 kref_put(&(container_of(rb, struct neighbor,
218 rb_cr)->ref), neighbor_free);
219 } else if (caller == QOS_CALLER_ANNOUNCE) {
220 kref_put(&(container_of(rb,
221 struct announce_data, rb)->ref),
222 announce_data_free);
223 } else {
224 BUG();
228 spin_unlock_irqrestore(&(q->qlock), iflags);
230 return rc;
233 static int _qos_resume(struct qos_queue *q)
235 int rc = 0;
236 unsigned long iflags;
237 int i;
239 spin_lock_irqsave(&(q->qlock), iflags);
241 for (i=0;i<4 && rc == 0;i++) {
242 struct list_head *lh;
243 int rc;
245 if (i == QOS_CALLER_KPACKET)
246 lh = &(q->conn_retrans_waiting);
247 else if (i == QOS_CALLER_CONN_RETRANS)
248 lh = &(q->kpackets_waiting);
249 else if (i == QOS_CALLER_ANNOUNCE)
250 lh = &(q->announce_waiting);
251 else if (i == QOS_CALLER_CONN)
252 lh = &(q->conns_waiting);
253 else
254 BUG();
256 if (list_empty(lh))
257 continue;
259 spin_unlock_irqrestore(&(q->qlock), iflags);
260 if (i == QOS_CALLER_CONN)
261 rc = resume_conns(q);
262 else
263 rc = __qos_resume(q, i);
264 spin_lock_irqsave(&(q->qlock), iflags);
266 i = 0;
269 spin_unlock_irqrestore(&(q->qlock), iflags);
271 return rc;
274 void qos_resume_taskfunc(unsigned long arg)
276 struct list_head *curr;
278 int congested = 0;
280 spin_lock_bh(&(queues_lock));
282 curr = queues.next;
283 while (curr != (&queues)) {
284 struct qos_queue *q = container_of(curr,
285 struct qos_queue, queue_list);
287 if (_qos_resume(q))
288 congested = 1;
290 curr = curr->next;
293 if (congested) {
294 mod_timer(&(qos_resume_timer), jiffies + 1);
295 } else {
296 qos_resume_scheduled = 0;
299 spin_unlock_bh(&(queues_lock));
302 void qos_resume_timerfunc(unsigned long arg)
304 tasklet_schedule(&qos_resume_task);
307 struct qos_queue *get_queue(struct net_device *dev)
309 struct qos_queue *ret = 0;
310 struct list_head *curr;
312 spin_lock_bh(&(queues_lock));
313 curr = queues.next;
314 while (curr != (&queues)) {
315 struct qos_queue *q = container_of(curr,
316 struct qos_queue, queue_list);
317 if (q->dev == dev) {
318 ret = q;
319 break;
322 spin_unlock_bh(&(queues_lock));
323 return ret;
326 static void _destroy_queue(struct qos_queue *q, int caller)
328 struct list_head *lh;
330 if (caller == QOS_CALLER_KPACKET)
331 lh = &(q->conn_retrans_waiting);
332 else if (caller == QOS_CALLER_CONN_RETRANS)
333 lh = &(q->kpackets_waiting);
334 else if (caller == QOS_CALLER_ANNOUNCE)
335 lh = &(q->announce_waiting);
336 else
337 BUG();
339 while (list_empty(lh) == 0) {
340 struct list_head *curr = lh->next;
341 struct resume_block *rb = container_of(curr,
342 struct resume_block, lh);
343 rb->in_queue = 0;
344 list_del(curr);
346 if (caller == QOS_CALLER_KPACKET) {
347 kref_put(&(container_of(rb, struct neighbor,
348 rb_kp)->ref), neighbor_free);
349 } else if (caller == QOS_CALLER_CONN_RETRANS) {
350 kref_put(&(container_of(rb, struct neighbor,
351 rb_cr)->ref), neighbor_free);
352 } else if (caller == QOS_CALLER_ANNOUNCE) {
353 kref_put(&(container_of(rb,
354 struct announce_data, rb)->ref),
355 announce_data_free);
356 } else {
357 BUG();
362 int destroy_queue(struct net_device *dev)
364 int unlink;
365 unsigned long iflags;
366 struct qos_queue *q = get_queue(dev);
367 if (q == 0)
368 return 1;
370 spin_lock_irqsave(&(q->qlock), iflags);
371 unlink = (q->dev != 0);
372 q->dev = 0;
373 _destroy_queue(q, QOS_CALLER_KPACKET);
374 _destroy_queue(q, QOS_CALLER_CONN_RETRANS);
375 _destroy_queue(q, QOS_CALLER_ANNOUNCE);
376 spin_unlock_irqrestore(&(q->qlock), iflags);
378 if (unlink) {
379 dev_put(dev);
380 spin_lock_bh(&(queues_lock));
381 list_del(&(q->queue_list));
382 spin_unlock_bh(&(queues_lock));
383 kref_put(&(q->ref), free_qos);
385 kref_put(&(q->ref), free_qos);
387 return 0;
390 int create_queue(struct net_device *dev)
392 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
394 if (q == 0) {
395 printk(KERN_ERR "cor: unable to allocate memory for device "
396 "queue, not enabling device");
397 return 1;
400 spin_lock_init(&(q->qlock));
402 q->dev = dev;
403 dev_hold(dev);
405 INIT_LIST_HEAD(&(q->kpackets_waiting));
406 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
407 INIT_LIST_HEAD(&(q->announce_waiting));
408 INIT_LIST_HEAD(&(q->conns_waiting));
410 spin_lock_bh(&(queues_lock));
411 list_add(&(q->queue_list), &queues);
412 spin_unlock_bh(&(queues_lock));
414 return 0;
417 void qos_enqueue(struct qos_queue *q, struct resume_block *rb, int caller)
419 unsigned long iflags;
421 spin_lock_irqsave(&(q->qlock), iflags);
423 if (rb->in_queue)
424 goto out;
426 rb->in_queue = 1;
428 if (caller == QOS_CALLER_KPACKET) {
429 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
430 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
431 } else if (caller == QOS_CALLER_CONN_RETRANS) {
432 list_add(&(rb->lh), &(q->kpackets_waiting));
433 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
434 } else if (caller == QOS_CALLER_ANNOUNCE) {
435 list_add(&(rb->lh), &(q->announce_waiting));
436 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
437 } else if (caller == QOS_CALLER_CONN) {
438 list_add(&(rb->lh), &(q->conns_waiting));
439 kref_get(&(container_of(rb, struct conn, target.out.rb)->ref));
440 } else {
441 BUG();
444 if (qos_resume_scheduled == 0) {
445 mod_timer(&(qos_resume_timer), jiffies + 1);
446 qos_resume_scheduled = 1;
449 out:
450 spin_unlock_irqrestore(&(q->qlock), iflags);
453 void qos_remove_conn(struct conn *trgt_out_l)
455 unsigned long iflags;
456 struct qos_queue *q;
458 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
460 if (trgt_out_l->target.out.rb.in_queue == 0)
461 return;
463 q = trgt_out_l->target.out.nb->queue;
464 BUG_ON(q == 0);
466 trgt_out_l->target.out.rb.in_queue = 0;
467 spin_lock_irqsave(&(q->qlock), iflags);
468 list_del(&(trgt_out_l->target.out.rb.lh));
469 spin_unlock_irqrestore(&(q->qlock), iflags);
471 kref_put(&(trgt_out_l->ref), free_conn);
474 static void qos_enqueue_conn(struct conn *trgt_out_l)
476 qos_enqueue(trgt_out_l->target.out.nb->queue,
477 &(trgt_out_l->target.out.rb), QOS_CALLER_CONN);
480 static int may_send_conn_retrans(struct neighbor *nb)
482 unsigned long iflags;
483 int rc;
485 BUG_ON(nb->queue == 0);
487 spin_lock_irqsave(&(nb->queue->qlock), iflags);
488 rc = (list_empty(&(nb->queue->kpackets_waiting)));
489 spin_unlock_irqrestore(&(nb->queue->qlock), iflags);
491 return rc;
494 static int may_send_conn(struct conn *trgt_out_l)
496 unsigned long iflags;
497 struct qos_queue *q = trgt_out_l->target.out.nb->queue;
498 int rc;
500 BUG_ON(q == 0);
502 spin_lock_irqsave(&(q->qlock), iflags);
503 rc = (list_empty(&(q->kpackets_waiting)) &&
504 list_empty(&(q->conn_retrans_waiting)) &&
505 list_empty(&(q->announce_waiting)) &&
506 list_empty(&(q->conns_waiting)));
507 spin_unlock_irqrestore(&(q->qlock), iflags);
509 return rc;
513 struct sk_buff *create_packet(struct neighbor *nb, int size,
514 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
516 struct sk_buff *ret;
517 char *dest;
519 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
520 if (unlikely(0 == ret))
521 return 0;
523 ret->protocol = htons(ETH_P_COR);
524 ret->dev = nb->dev;
526 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
527 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
528 nb->dev->dev_addr, ret->len) < 0))
529 return 0;
530 skb_reset_network_header(ret);
532 dest = skb_put(ret, 9);
533 BUG_ON(0 == dest);
535 dest[0] = PACKET_TYPE_DATA;
536 dest += 1;
538 put_u32(dest, conn_id, 1);
539 dest += 4;
540 put_u32(dest, seqno, 1);
541 dest += 4;
543 return ret;
546 static void set_conn_retrans_timeout(struct conn_retrans *cr)
548 struct neighbor *nb = cr->trgt_out_o->target.out.nb;
549 cr->timeout = jiffies + usecs_to_jiffies(100000 +
550 ((__u32) atomic_read(&(nb->latency))) +
551 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
554 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
555 struct neighbor *nb, __u32 length, int *dontsend)
557 struct conn_retrans *ret = 0;
559 spin_lock_bh(&(nb->retrans_lock));
561 if (unlikely(cr->ackrcvd)) {
562 *dontsend = 1;
563 goto out;
564 } else
565 *dontsend = 0;
567 if (unlikely(cr->length > length)) {
568 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
569 if (unlikely(ret == 0)) {
570 cr->timeout = jiffies + 1;
571 goto out;
574 memset(ret, 0, sizeof (struct conn_retrans));
575 ret->trgt_out_o = cr->trgt_out_o;
576 kref_get(&(cr->trgt_out_o->ref));
577 ret->seqno = cr->seqno + length;
578 ret->length = cr->length - length;
579 kref_init(&(ret->ref));
581 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
582 list_add(&(ret->conn_list), &(cr->conn_list));
584 cr->length = length;
585 } else {
586 list_del(&(cr->timeout_list));
587 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
588 set_conn_retrans_timeout(cr);
590 BUG_ON(cr->length != length);
593 out:
594 spin_unlock_bh(&(nb->retrans_lock));
596 return ret;
599 void cancel_retrans(struct conn *trgt_out_l)
601 struct neighbor *nb = trgt_out_l->target.out.nb;
603 spin_lock_bh(&(nb->retrans_lock));
605 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
606 struct conn_retrans *cr = container_of(
607 trgt_out_l->target.out.retrans_list.next,
608 struct conn_retrans, conn_list);
609 BUG_ON(cr->trgt_out_o != trgt_out_l);
611 list_del(&(cr->timeout_list));
612 list_del(&(cr->conn_list));
613 cr->ackrcvd = 1;
614 kref_put(&(cr->ref), free_connretrans);
616 #warning reschedule timer
618 spin_unlock_bh(&(nb->retrans_lock));
621 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
623 int targetmss = mss(nb);
624 int dontsend;
625 int queuefull = 0;
627 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
629 BUG_ON(cr->trgt_out_o->targettype != TARGET_OUT);
630 BUG_ON(cr->trgt_out_o->target.out.nb != nb);
632 kref_get(&(cr->trgt_out_o->ref));
634 if (unlikely(cr->trgt_out_o->isreset != 0)) {
635 cancel_retrans(cr->trgt_out_o);
636 goto out;
639 while (cr->length >= targetmss) {
640 struct sk_buff *skb;
641 char *dst;
642 struct conn_retrans *cr2;
643 int rc;
645 if (may_send_conn_retrans(nb) == 0)
646 goto qos_enqueue;
648 skb = create_packet(nb, targetmss, GFP_ATOMIC,
649 cr->trgt_out_o->target.out.conn_id, cr->seqno);
650 if (unlikely(skb == 0)) {
651 cr->timeout = jiffies + 1;
652 goto out;
655 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
656 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
657 unlikely(cr->length > targetmss)))) {
658 kfree_skb(skb);
659 goto out;
662 dst = skb_put(skb, targetmss);
664 databuf_pullold(cr->trgt_out_o, cr->seqno, dst, targetmss);
665 rc = dev_queue_xmit(skb);
667 if (rc != 0) {
668 spin_lock_bh(&(nb->retrans_lock));
669 if (unlikely(cr->ackrcvd)) {
670 dontsend = 1;
671 } else {
672 list_del(&(cr->timeout_list));
673 list_add(&(cr->timeout_list),
674 &(nb->retrans_list_conn));
676 spin_unlock_bh(&(nb->retrans_lock));
677 if (dontsend == 0)
678 goto qos_enqueue;
681 cr = cr2;
683 if (likely(cr == 0))
684 goto out;
687 if (unlikely(cr->length <= 0)) {
688 BUG();
689 } else {
690 struct control_msg_out *cm;
691 char *buf = kmalloc(cr->length, GFP_ATOMIC);
693 if (unlikely(buf == 0)) {
694 cr->timeout = jiffies + 1;
695 goto out;
698 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
699 if (unlikely(cm == 0)) {
700 cr->timeout = jiffies + 1;
701 kfree(buf);
702 goto out;
705 databuf_pullold(cr->trgt_out_o, cr->seqno, buf, cr->length);
707 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
708 != 0))
709 BUG();
711 if (likely(dontsend == 0)) {
712 send_conndata(cm, cr->trgt_out_o->target.out.conn_id,
713 cr->seqno, buf, buf, cr->length);
717 if (0) {
718 qos_enqueue:
719 queuefull = 1;
721 out:
722 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
724 kref_put(&(cr->trgt_out_o->ref), free_conn);
726 return queuefull;
729 static int send_retrans(struct neighbor *nb, int fromqos)
731 unsigned long iflags;
733 struct conn_retrans *cr = 0;
735 int nbstate;
736 int rescheduled = 0;
737 int queuefull = 0;
739 spin_lock_irqsave(&(nb->state_lock), iflags);
740 nbstate = nb->state;
741 spin_unlock_irqrestore(&(nb->state_lock), iflags);
743 while (1) {
744 spin_lock_bh(&(nb->retrans_lock));
746 if (list_empty(&(nb->retrans_list_conn))) {
747 nb->retrans_timer_conn_running = 0;
748 spin_unlock_bh(&(nb->retrans_lock));
749 break;
752 cr = container_of(nb->retrans_list_conn.next,
753 struct conn_retrans, timeout_list);
755 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
756 list_del(&(cr->timeout_list));
757 list_del(&(cr->conn_list));
758 spin_unlock_bh(&(nb->retrans_lock));
760 kref_put(&(cr->ref), free_connretrans);
761 continue;
764 #warning todo check window limit
766 if (time_after(cr->timeout, jiffies)) {
767 mod_timer(&(nb->retrans_timer_conn), cr->timeout);
768 if (fromqos)
769 kref_get(&(nb->ref));
770 rescheduled = 1;
771 spin_unlock_bh(&(nb->retrans_lock));
772 break;
775 kref_get(&(cr->ref));
776 spin_unlock_bh(&(nb->retrans_lock));
777 queuefull = _send_retrans(nb, cr);
778 kref_put(&(cr->ref), free_connretrans);
779 if (queuefull) {
780 if (fromqos == 0)
781 qos_enqueue(nb->queue, &(nb->rb_cr),
782 QOS_CALLER_CONN_RETRANS);
783 break;
787 if (rescheduled == 0 && fromqos == 0)
788 kref_put(&(nb->ref), neighbor_free);
790 return queuefull;
793 void retransmit_conn_taskfunc(unsigned long arg)
795 struct neighbor *nb = (struct neighbor *) arg;
796 send_retrans(nb, 0);
799 void retransmit_conn_timerfunc(unsigned long arg)
801 struct neighbor *nb = (struct neighbor *) arg;
802 tasklet_schedule(&(nb->retrans_task_conn));
805 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
806 struct conn *trgt_out, __u32 seqno_ooo, __u32 length)
808 struct list_head *curr;
810 if (unlikely(length == 0))
811 return;
813 spin_lock_bh(&(trgt_out->rcv_lock));
815 if (unlikely(trgt_out->targettype != TARGET_OUT))
816 goto out;
817 if (unlikely(trgt_out->target.out.nb != nb))
818 goto out;
819 if (unlikely(trgt_out->target.out.conn_id != conn_id))
820 goto out;
822 spin_lock_bh(&(nb->retrans_lock));
824 curr = trgt_out->target.out.retrans_list.next;
826 while (curr != &(trgt_out->target.out.retrans_list)) {
827 struct conn_retrans *cr = container_of(curr,
828 struct conn_retrans, conn_list);
830 if (((__s32)(cr->seqno + cr->length - seqno_ooo)) > 0)
831 goto cont;
833 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) >0) {
834 if (((__s32)(cr->seqno - seqno_ooo - length)) < 0) {
835 __u32 newseqno = seqno_ooo + length;
836 cr->length -= (newseqno - cr->seqno);
837 cr->seqno = newseqno;
840 break;
843 if (((__s32)(cr->seqno - seqno_ooo)) < 0 &&
844 ((__s32)(cr->seqno + cr->length - seqno_ooo -
845 length)) <= 0) {
846 __u32 diff = seqno_ooo + length - cr->seqno -
847 cr->length;
848 cr->seqno += diff;
849 cr->length -= diff;
850 } else {
851 list_del(&(cr->timeout_list));
852 list_del(&(cr->conn_list));
853 cr->ackrcvd = 1;
854 kref_put(&(cr->ref), free_connretrans);
857 cont:
858 curr = curr->next;
861 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list))) == 0) {
862 struct conn_retrans *cr = container_of(
863 trgt_out->target.out.retrans_list.next,
864 struct conn_retrans, conn_list);
865 if (unlikely(((__s32) (cr->seqno -
866 trgt_out->target.out.seqno_acked)) > 0))
867 trgt_out->target.out.seqno_acked = cr->seqno;
870 spin_unlock_bh(&(nb->retrans_lock));
872 out:
873 spin_unlock_bh(&(trgt_out->rcv_lock));
876 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
877 __u32 seqno, int setwindow, __u8 window)
879 int flush = 0;
881 spin_lock_bh(&(trgt_out->rcv_lock));
883 if (unlikely(trgt_out->isreset != 0))
884 goto out;
885 if (unlikely(trgt_out->targettype != TARGET_OUT))
886 goto out;
887 if (unlikely(trgt_out->target.out.nb != nb))
888 goto out;
889 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
890 goto out;
892 if (unlikely(((__s32)(seqno - trgt_out->target.out.seqno_nextsend)) > 0)
894 ((__s32)(seqno - trgt_out->target.out.seqno_acked)) < 0)
895 goto out;
897 if (setwindow) {
898 __u32 windowdec = dec_log_64_11(window);
899 if (unlikely(seqno == trgt_out->target.out.seqno_acked &&
900 ((__s32) (seqno + windowdec -
901 trgt_out->target.out.seqno_windowlimit )) <= 0))
902 goto skipwindow;
904 trgt_out->target.out.seqno_windowlimit = seqno + windowdec;
905 flush = 1;
908 skipwindow:
909 if (seqno == trgt_out->target.out.seqno_acked)
910 goto out;
912 spin_lock_bh(&(nb->retrans_lock));
914 trgt_out->target.out.seqno_acked = seqno;
916 while (list_empty(&(trgt_out->target.out.retrans_list)) == 0) {
917 struct conn_retrans *cr = container_of(
918 trgt_out->target.out.retrans_list.next,
919 struct conn_retrans, conn_list);
921 if (((__s32)(cr->seqno + cr->length - seqno)) > 0) {
922 if (((__s32)(cr->seqno - seqno)) < 0) {
923 cr->length -= (seqno - cr->seqno);
924 cr->seqno = seqno;
926 break;
929 list_del(&(cr->timeout_list));
930 list_del(&(cr->conn_list));
931 cr->ackrcvd = 1;
932 kref_put(&(cr->ref), free_connretrans);
935 spin_unlock_bh(&(nb->retrans_lock));
936 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
938 out:
939 spin_unlock_bh(&(trgt_out->rcv_lock));
941 if (flush)
942 flush_buf(trgt_out);
945 static void schedule_retransmit_conn(struct conn_retrans *cr,
946 struct conn *trgt_out_l, __u32 seqno, __u32 len)
948 struct neighbor *nb = trgt_out_l->target.out.nb;
950 int first;
952 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
954 memset(cr, 0, sizeof (struct conn_retrans));
955 cr->trgt_out_o = trgt_out_l;
956 kref_get(&(trgt_out_l->ref));
957 cr->seqno = seqno;
958 cr->length = len;
959 kref_init(&(cr->ref));
960 set_conn_retrans_timeout(cr);
962 spin_lock_bh(&(nb->retrans_lock));
964 first = unlikely(list_empty(&(nb->retrans_list_conn)));
965 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
967 list_add_tail(&(cr->conn_list), &(trgt_out_l->target.out.retrans_list));
969 if (unlikely(unlikely(first) &&
970 unlikely(nb->retrans_timer_conn_running == 0))) {
971 mod_timer(&(nb->retrans_timer_conn), cr->timeout);
972 nb->retrans_timer_conn_running = 1;
973 kref_get(&(nb->ref));
976 spin_unlock_bh(&(nb->retrans_lock));
979 static __u32 get_windowlimit(struct conn *trgt_out_l)
981 __s32 windowlimit = (__s32)(trgt_out_l->target.out.seqno_windowlimit -
982 trgt_out_l->target.out.seqno_nextsend);
983 if (unlikely(windowlimit < 0))
984 return 0;
985 return windowlimit;
988 #warning todo reset connections which are SOURCE_NONE and are stuck for too long
989 int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte)
991 int targetmss;
992 __u32 seqno;
993 int sent = 0;
995 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
997 targetmss = mss(trgt_out_l->target.out.nb);
999 if (unlikely(trgt_out_l->target.out.conn_id == 0))
1000 return RC_FLUSH_CONN_OUT_OK;
1002 if (unlikely(trgt_out_l->isreset != 0))
1003 return RC_FLUSH_CONN_OUT_OK;
1005 if (unlikely(trgt_out_l->sourcetype == SOURCE_SOCK &&
1006 trgt_out_l->source.sock.delay_flush != 0))
1007 return RC_FLUSH_CONN_OUT_OK;
1009 if (fromqos == 0 && may_send_conn(trgt_out_l) == 0) {
1010 qos_enqueue_conn(trgt_out_l);
1011 return RC_FLUSH_CONN_OUT_CONG;
1014 while (trgt_out_l->data_buf.read_remaining >= targetmss &&
1015 get_windowlimit(trgt_out_l) >= targetmss) {
1016 struct conn_retrans *cr;
1017 struct sk_buff *skb;
1018 char *dst;
1019 int rc;
1021 if (unlikely(creditsperbyte * targetmss >
1022 trgt_out_l->credits))
1023 return RC_FLUSH_CONN_OUT_CREDITS;
1025 seqno = trgt_out_l->target.out.seqno_nextsend;
1026 skb = create_packet(trgt_out_l->target.out.nb, targetmss,
1027 GFP_ATOMIC, trgt_out_l->target.out.conn_id,
1028 seqno);
1029 if (unlikely(skb == 0)) {
1030 qos_enqueue_conn(trgt_out_l);
1031 return RC_FLUSH_CONN_OUT_OOM;
1034 cr = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
1035 if (unlikely(cr == 0)) {
1036 kfree_skb(skb);
1037 qos_enqueue_conn(trgt_out_l);
1038 return RC_FLUSH_CONN_OUT_OOM;
1041 dst = skb_put(skb, targetmss);
1043 databuf_pull(trgt_out_l, dst, targetmss);
1045 rc = dev_queue_xmit(skb);
1046 if (rc != 0) {
1047 databuf_unpull(trgt_out_l, targetmss);
1048 kmem_cache_free(connretrans_slab, cr);
1049 qos_enqueue_conn(trgt_out_l);
1050 return RC_FLUSH_CONN_OUT_CONG;
1053 trgt_out_l->credits -= creditsperbyte * targetmss;
1054 trgt_out_l->target.out.seqno_nextsend += targetmss;
1055 schedule_retransmit_conn(cr, trgt_out_l, seqno, targetmss);
1056 sent = 1;
1059 if (trgt_out_l->data_buf.read_remaining > 0 && (trgt_out_l->tos ==
1060 TOS_LATENCY || trgt_out_l->target.out.seqno_nextsend ==
1061 trgt_out_l->target.out.seqno_acked)) {
1062 struct control_msg_out *cm;
1063 struct conn_retrans *cr;
1064 __u32 len = trgt_out_l->data_buf.read_remaining;
1065 __s32 windowlimit = get_windowlimit(trgt_out_l);
1066 char *buf;
1068 if (windowlimit == 0)
1069 goto out;
1071 if (windowlimit < len/2 &&
1072 trgt_out_l->target.out.seqno_nextsend !=
1073 trgt_out_l->target.out.seqno_acked)
1074 goto out;
1076 if (len > windowlimit)
1077 len = windowlimit;
1079 buf = kmalloc(len, GFP_ATOMIC);
1081 if (unlikely(creditsperbyte * len > trgt_out_l->credits))
1082 return RC_FLUSH_CONN_OUT_CREDITS;
1084 if (unlikely(buf == 0)) {
1085 qos_enqueue_conn(trgt_out_l);
1086 return RC_FLUSH_CONN_OUT_OOM;
1089 cm = alloc_control_msg(trgt_out_l->target.out.nb,
1090 ACM_PRIORITY_LOW);
1091 if (unlikely(cm == 0)) {
1092 kfree(buf);
1093 qos_enqueue_conn(trgt_out_l);
1094 return RC_FLUSH_CONN_OUT_OOM;
1097 cr = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
1098 if (unlikely(cr == 0)) {
1099 kfree(buf);
1100 free_control_msg(cm);
1101 qos_enqueue_conn(trgt_out_l);
1102 return RC_FLUSH_CONN_OUT_CONG;
1105 databuf_pull(trgt_out_l, buf, len);
1107 seqno = trgt_out_l->target.out.seqno_nextsend;
1108 trgt_out_l->credits -= creditsperbyte * len;
1109 trgt_out_l->target.out.seqno_nextsend += len;
1111 schedule_retransmit_conn(cr, trgt_out_l, seqno, len);
1113 send_conndata(cm, trgt_out_l->target.out.conn_id, seqno, buf,
1114 buf, len);
1115 sent = 1;
1118 out:
1119 if (sent)
1120 return RC_FLUSH_CONN_OUT_OK_SENT;
1122 return RC_FLUSH_CONN_OUT_OK;
1125 int __init cor_snd_init(void)
1127 connretrans_slab = kmem_cache_create("cor_connretrans",
1128 sizeof(struct conn_retrans), 8, 0, 0);
1130 if (unlikely(connretrans_slab == 0))
1131 return 1;
1133 init_timer(&qos_resume_timer);
1134 qos_resume_timer.function = qos_resume_timerfunc;
1135 qos_resume_timer.data = 0;
1136 tasklet_init(&qos_resume_task, qos_resume_taskfunc, 0);
1138 qos_resume_scheduled = 0;
1140 return 0;
1143 MODULE_LICENSE("GPL");