double send_reset race, free_ack_conns on reset, lock both conn sides on reset, skip...
[cor_2_6_31.git] / net / cor / snd.c
blob57b58b04ec24e7183c809184621711721662b338
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 struct kmem_cache *connretrans_slab;
29 struct conn_retrans {
30 /* timeout_list and conn_list share a single ref */
31 struct kref ref;
32 struct list_head timeout_list;
33 struct list_head conn_list;
34 struct htab_entry htab_entry;
35 struct conn *trgt_out_o;
36 __u32 seqno;
37 __u32 length;
38 __u8 ackrcvd;
39 unsigned long timeout;
42 static void free_connretrans(struct kref *ref)
44 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
45 kmem_cache_free(connretrans_slab, cr);
46 kref_put(&(cr->trgt_out_o->ref), free_conn);
49 DEFINE_MUTEX(queues_lock);
50 LIST_HEAD(queues);
51 struct delayed_work qos_resume_work;
52 int qos_resume_scheduled;
54 struct qos_queue {
55 struct list_head queue_list;
57 struct net_device *dev;
59 struct list_head kpackets_waiting;
60 struct list_head conn_retrans_waiting;
61 struct list_head announce_waiting;
62 struct list_head conns_waiting;
65 /* Highest bidder "pays" the credits the second has bid */
66 static int _resume_conns(struct qos_queue *q)
68 struct conn *best = 0;
69 __u64 bestcredit = 0;
70 __u64 secondcredit = 0;
72 int rc;
74 struct list_head *lh = q->conns_waiting.next;
76 while (lh != &(q->conns_waiting)) {
77 struct conn *trgt_out_o = container_of(lh, struct conn,
78 target.out.rb.lh);
79 __u64 credits;
81 lh = lh->next;
83 refresh_conn_credits(trgt_out_o, 0, 0);
85 mutex_lock(&(trgt_out_o->rcv_lock));
87 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
89 if (unlikely(trgt_out_o->isreset != 0)) {
90 trgt_out_o->target.out.rb.in_queue = 0;
91 list_del(&(trgt_out_o->target.out.rb.lh));
92 mutex_unlock(&(trgt_out_o->rcv_lock));
93 kref_put(&(trgt_out_o->ref), free_conn);
95 continue;
98 BUG_ON(trgt_out_o->data_buf.read_remaining == 0);
100 if (may_alloc_control_msg(trgt_out_o->target.out.nb,
101 ACM_PRIORITY_LOW) == 0)
102 continue;
104 if (trgt_out_o->credits <= 0)
105 credits = 0;
106 else
107 credits = multiply_div(trgt_out_o->credits, 1LL << 24,
108 trgt_out_o->data_buf.read_remaining);
109 mutex_unlock(&(trgt_out_o->rcv_lock));
111 if (best == 0 || bestcredit < credits) {
112 secondcredit = bestcredit;
113 best = trgt_out_o;
114 bestcredit = credits;
115 } else if (secondcredit < credits) {
116 secondcredit = credits;
120 if (best == 0)
121 return RC_FLUSH_CONN_OUT_OK;
123 mutex_lock(&(best->rcv_lock));
124 rc = flush_out(best, 1, (__u32) (secondcredit >> 32));
126 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT) {
127 best->target.out.rb.in_queue = 0;
128 list_del(&(best->target.out.rb.lh));
130 mutex_unlock(&(best->rcv_lock));
132 refresh_conn_credits(best, 0, 0);
134 if (rc == RC_FLUSH_CONN_OUT_OK_SENT)
135 wake_sender(best);
137 if (rc == RC_FLUSH_CONN_OUT_OK || rc == RC_FLUSH_CONN_OUT_OK_SENT)
138 kref_put(&(best->ref), free_conn);
140 return rc;
143 static int resume_conns(struct qos_queue *q)
145 while (list_empty(&(q->conns_waiting)) == 0) {
146 int rc = _resume_conns(q);
147 if (rc != RC_FLUSH_CONN_OUT_OK &&
148 rc != RC_FLUSH_CONN_OUT_OK_SENT)
149 return 1;
151 return 0;
154 static int send_retrans(struct neighbor *nb, int fromqos);
156 static int _qos_resume(struct qos_queue *q, int caller)
158 int rc = 0;
160 struct list_head *lh;
162 if (caller == QOS_CALLER_KPACKET)
163 lh = &(q->conn_retrans_waiting);
164 else if (caller == QOS_CALLER_CONN_RETRANS)
165 lh = &(q->kpackets_waiting);
166 else if (caller == QOS_CALLER_ANNOUNCE)
167 lh = &(q->announce_waiting);
168 else
169 BUG();
171 while (list_empty(lh) == 0) {
172 struct list_head *curr = lh->next;
173 struct resume_block *rb = container_of(curr,
174 struct resume_block, lh);
175 rb->in_queue = 0;
176 list_del(curr);
178 if (caller == QOS_CALLER_KPACKET) {
179 struct neighbor *nb = container_of(rb, struct neighbor,
180 rb_kp);
181 rc = send_messages(nb, 1);
182 } else if (caller == QOS_CALLER_CONN_RETRANS) {
183 struct neighbor *nb = container_of(rb, struct neighbor,
184 rb_cr);
185 #warning todo do not send if neighbor is stalled
186 rc = send_retrans(nb, 1);
187 } else if (caller == QOS_CALLER_ANNOUNCE) {
188 struct announce_data *ann = container_of(rb,
189 struct announce_data, rb);
190 rc = send_announce_qos(ann);
191 } else {
192 BUG();
195 if (rc != 0 && rb->in_queue == 0) {
196 rb->in_queue = 1;
197 list_add(curr, lh);
198 } else {
199 if (caller == QOS_CALLER_KPACKET) {
200 kref_put(&(container_of(rb, struct neighbor,
201 rb_kp)->ref), neighbor_free);
202 } else if (caller == QOS_CALLER_CONN_RETRANS) {
203 kref_put(&(container_of(rb, struct neighbor,
204 rb_cr)->ref), neighbor_free);
205 } else if (caller == QOS_CALLER_ANNOUNCE) {
206 kref_put(&(container_of(rb,
207 struct announce_data, rb)->ref),
208 announce_data_free);
209 } else {
210 BUG();
215 if (rc != 0)
216 break;
218 return rc;
221 static void qos_resume(struct work_struct *work)
223 struct list_head *curr;
225 mutex_lock(&(queues_lock));
227 curr = queues.next;
228 while (curr != (&queues)) {
229 struct qos_queue *q = container_of(curr,
230 struct qos_queue, queue_list);
231 int i;
233 for (i=0;i<4;i++) {
234 int rc;
235 if (i == 3)
236 rc = resume_conns(q);
237 else
238 rc = _qos_resume(q, i);
240 if (rc != 0)
241 goto congested;
244 curr = curr->next;
246 if (i == 4 && unlikely(q->dev == 0)) {
247 list_del(&(q->queue_list));
248 kfree(q);
252 qos_resume_scheduled = 0;
254 if (0) {
255 congested:
256 schedule_delayed_work(&(qos_resume_work), 1);
259 mutex_unlock(&(queues_lock));
262 static struct qos_queue *get_queue(struct net_device *dev)
264 struct list_head *curr = queues.next;
265 while (curr != (&queues)) {
266 struct qos_queue *q = container_of(curr,
267 struct qos_queue, queue_list);
268 if (q->dev == dev)
269 return q;
271 return 0;
274 #warning todo content of the queue?
275 int destroy_queue(struct net_device *dev)
277 struct qos_queue *q;
279 mutex_lock(&(queues_lock));
281 q = get_queue(dev);
283 if (q == 0) {
284 mutex_unlock(&(queues_lock));
285 return 1;
288 q->dev = 0;
290 dev_put(dev);
292 mutex_unlock(&(queues_lock));
294 return 0;
297 int create_queue(struct net_device *dev)
299 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
301 if (q == 0) {
302 printk(KERN_ERR "cor: unable to allocate memory for device "
303 "queue, not enabling device");
304 return 1;
307 q->dev = dev;
308 dev_hold(dev);
310 INIT_LIST_HEAD(&(q->kpackets_waiting));
311 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
312 INIT_LIST_HEAD(&(q->announce_waiting));
313 INIT_LIST_HEAD(&(q->conns_waiting));
315 mutex_lock(&(queues_lock));
316 list_add(&(q->queue_list), &queues);
317 mutex_unlock(&(queues_lock));
319 return 0;
322 void qos_enqueue(struct net_device *dev, struct resume_block *rb, int caller)
324 struct qos_queue *q;
326 mutex_lock(&(queues_lock));
328 if (rb->in_queue)
329 goto out;
331 q = get_queue(dev);
332 if (unlikely(q == 0))
333 goto out;
335 rb->in_queue = 1;
337 if (caller == QOS_CALLER_KPACKET) {
338 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
339 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
340 } else if (caller == QOS_CALLER_CONN_RETRANS) {
341 list_add(&(rb->lh), &(q->kpackets_waiting));
342 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
343 } else if (caller == QOS_CALLER_ANNOUNCE) {
344 list_add(&(rb->lh), &(q->announce_waiting));
345 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
346 } else if (caller == QOS_CALLER_CONN) {
347 struct conn *trgt_out = container_of(rb, struct conn,
348 target.out.rb);
349 mutex_lock(&(trgt_out->rcv_lock));
350 #warning todo targettype might have changed
351 BUG_ON(trgt_out->targettype != TARGET_OUT);
352 list_add(&(rb->lh), &(q->conns_waiting));
353 kref_get(&(trgt_out->ref));
354 mutex_unlock(&(trgt_out->rcv_lock));
355 } else {
356 BUG();
359 if (qos_resume_scheduled == 0) {
360 schedule_delayed_work(&(qos_resume_work), 1);
361 qos_resume_scheduled = 1;
364 out:
365 mutex_unlock(&(queues_lock));
368 void qos_remove_conn(struct conn *cn)
370 int kref = 0;
371 mutex_lock(&(queues_lock));
372 mutex_lock(&(cn->rcv_lock));
373 if (cn->targettype != TARGET_OUT)
374 goto out;
375 if (cn->target.out.rb.in_queue == 0)
376 goto out;
378 cn->target.out.rb.in_queue = 0;
379 list_del(&(cn->target.out.rb.lh));
380 kref = 1;
382 out:
383 mutex_unlock(&(cn->rcv_lock));
384 mutex_unlock(&(queues_lock));
386 if (kref)
387 kref_put(&(cn->ref), free_conn);
390 static int may_send_conn_retrans(struct neighbor *nb)
392 struct qos_queue *q;
393 int rc = 0;
395 mutex_lock(&(queues_lock));
397 q = get_queue(nb->dev);
398 if (unlikely(q == 0))
399 goto out;
401 rc = (list_empty(&(q->kpackets_waiting)));
403 out:
404 mutex_unlock(&(queues_lock));
406 return rc;
409 static int may_send_conn(struct conn *trgt_out_l)
411 struct qos_queue *q;
412 int rc = 0;
414 #warning todo this may deadlock, use atomic_ops instead, modify get_queue (move pointer to neighbor?)
415 mutex_lock(&(queues_lock));
417 q = get_queue(trgt_out_l->target.out.nb->dev);
418 if (unlikely(q == 0))
419 goto out;
421 rc = (list_empty(&(q->kpackets_waiting)) &&
422 list_empty(&(q->conn_retrans_waiting)) &&
423 list_empty(&(q->announce_waiting)) &&
424 list_empty(&(q->conns_waiting)));
426 out:
427 mutex_unlock(&(queues_lock));
429 return rc;
433 struct sk_buff *create_packet(struct neighbor *nb, int size,
434 gfp_t alloc_flags, __u32 conn_id, __u32 seqno)
436 struct sk_buff *ret;
437 char *dest;
439 ret = alloc_skb(size + 9 + LL_ALLOCATED_SPACE(nb->dev), alloc_flags);
440 if (unlikely(0 == ret))
441 return 0;
443 ret->protocol = htons(ETH_P_COR);
444 ret->dev = nb->dev;
446 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
447 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
448 nb->dev->dev_addr, ret->len) < 0))
449 return 0;
450 skb_reset_network_header(ret);
452 dest = skb_put(ret, 9);
453 BUG_ON(0 == dest);
455 dest[0] = PACKET_TYPE_DATA;
456 dest += 1;
458 put_u32(dest, conn_id, 1);
459 dest += 4;
460 put_u32(dest, seqno, 1);
461 dest += 4;
463 return ret;
466 static void set_conn_retrans_timeout(struct conn_retrans *cr)
468 struct neighbor *nb = cr->trgt_out_o->target.out.nb;
469 cr->timeout = jiffies + usecs_to_jiffies(100000 +
470 ((__u32) atomic_read(&(nb->latency))) +
471 ((__u32) atomic_read(&(nb->max_remote_cmsg_delay))));
474 static struct conn_retrans *readd_conn_retrans(struct conn_retrans *cr,
475 struct neighbor *nb, __u32 length, int *dontsend)
477 unsigned long iflags;
479 struct conn_retrans *ret = 0;
481 spin_lock_irqsave(&(nb->retrans_lock), iflags);
483 if (unlikely(cr->ackrcvd)) {
484 *dontsend = 1;
485 goto out;
486 } else
487 *dontsend = 0;
489 if (unlikely(cr->length > length)) {
490 ret = kmem_cache_alloc(connretrans_slab, GFP_ATOMIC);
491 if (unlikely(ret == 0)) {
492 cr->timeout = jiffies + 1;
493 goto out;
496 memset(ret, 0, sizeof (struct conn_retrans));
497 ret->trgt_out_o = cr->trgt_out_o;
498 kref_get(&(cr->trgt_out_o->ref));
499 ret->seqno = cr->seqno + length;
500 ret->length = cr->length - length;
501 kref_init(&(ret->ref));
503 list_add(&(ret->timeout_list), &(nb->retrans_list_conn));
504 list_add(&(ret->conn_list), &(cr->conn_list));
506 cr->length = length;
507 } else {
508 list_del(&(cr->timeout_list));
509 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
510 set_conn_retrans_timeout(cr);
512 BUG_ON(cr->length != length);
515 out:
516 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
518 return ret;
521 void cancel_retrans(struct conn *trgt_out_l)
523 unsigned long iflags;
524 struct neighbor *nb = trgt_out_l->target.out.nb;
526 spin_lock_irqsave(&(nb->retrans_lock), iflags);
528 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
529 struct conn_retrans *cr = container_of(
530 trgt_out_l->target.out.retrans_list.next,
531 struct conn_retrans, conn_list);
532 BUG_ON(cr->trgt_out_o != trgt_out_l);
534 list_del(&(cr->timeout_list));
535 list_del(&(cr->conn_list));
536 cr->ackrcvd = 1;
537 kref_put(&(cr->ref), free_connretrans);
539 #warning reschedule timer
541 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
544 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr)
546 int targetmss = mss(nb);
547 int dontsend;
548 int queuefull = 0;
550 mutex_lock(&(cr->trgt_out_o->rcv_lock));
552 BUG_ON(cr->trgt_out_o->targettype != TARGET_OUT);
553 BUG_ON(cr->trgt_out_o->target.out.nb != nb);
555 kref_get(&(cr->trgt_out_o->ref));
557 if (unlikely(cr->trgt_out_o->isreset != 0)) {
558 cancel_retrans(cr->trgt_out_o);
559 goto out;
562 while (cr->length >= targetmss) {
563 struct sk_buff *skb;
564 char *dst;
565 struct conn_retrans *cr2;
566 int rc;
568 if (may_send_conn_retrans(nb) == 0)
569 goto qos_enqueue;
571 skb = create_packet(nb, targetmss, GFP_KERNEL,
572 cr->trgt_out_o->target.out.conn_id, cr->seqno);
573 if (unlikely(skb == 0)) {
574 cr->timeout = jiffies + 1;
575 goto out;
578 cr2 = readd_conn_retrans(cr, nb, targetmss, &dontsend);
579 if (unlikely(unlikely(dontsend) || unlikely(cr2 == 0 &&
580 unlikely(cr->length > targetmss)))) {
581 kfree_skb(skb);
582 goto out;
585 dst = skb_put(skb, targetmss);
587 databuf_pullold(cr->trgt_out_o, cr->seqno, dst, targetmss);
588 rc = dev_queue_xmit(skb);
590 if (rc != 0) {
591 unsigned long iflags;
593 spin_lock_irqsave(&(nb->retrans_lock), iflags);
594 if (unlikely(cr->ackrcvd)) {
595 dontsend = 1;
596 } else {
597 list_del(&(cr->timeout_list));
598 list_add(&(cr->timeout_list),
599 &(nb->retrans_list_conn));
601 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
602 if (dontsend == 0)
603 goto qos_enqueue;
606 cr = cr2;
608 if (likely(cr == 0))
609 goto out;
612 if (unlikely(cr->length <= 0)) {
613 BUG();
614 } else {
615 struct control_msg_out *cm;
616 char *buf = kmalloc(cr->length, GFP_KERNEL);
618 if (unlikely(buf == 0)) {
619 cr->timeout = jiffies + 1;
620 goto out;
623 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
624 if (unlikely(cm == 0)) {
625 cr->timeout = jiffies + 1;
626 kfree(buf);
627 goto out;
630 databuf_pullold(cr->trgt_out_o, cr->seqno, buf, cr->length);
632 if (unlikely(readd_conn_retrans(cr, nb, cr->length, &dontsend)
633 != 0))
634 BUG();
636 if (likely(dontsend == 0)) {
637 send_conndata(cm, cr->trgt_out_o->target.out.conn_id,
638 cr->seqno, buf, buf, cr->length);
642 if (0) {
643 qos_enqueue:
644 queuefull = 1;
646 out:
647 mutex_unlock(&(cr->trgt_out_o->rcv_lock));
649 kref_put(&(cr->trgt_out_o->ref), free_conn);
651 return queuefull;
654 static int send_retrans(struct neighbor *nb, int fromqos)
656 unsigned long iflags;
658 struct conn_retrans *cr = 0;
660 int nbstate;
661 int rescheduled = 0;
662 int queuefull = 0;
664 spin_lock_irqsave(&(nb->state_lock), iflags);
665 nbstate = nb->state;
666 spin_unlock_irqrestore(&(nb->state_lock), iflags);
668 while (1) {
669 spin_lock_irqsave(&(nb->retrans_lock), iflags);
671 if (list_empty(&(nb->retrans_list_conn))) {
672 nb->retrans_timer_conn_running = 0;
673 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
674 break;
677 cr = container_of(nb->retrans_list_conn.next,
678 struct conn_retrans, timeout_list);
680 #warning todo lock
681 BUG_ON(cr->trgt_out_o->targettype != TARGET_OUT);
683 if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
684 list_del(&(cr->timeout_list));
685 list_del(&(cr->conn_list));
686 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
688 kref_put(&(cr->ref), free_connretrans);
689 continue;
692 BUG_ON(nb != cr->trgt_out_o->target.out.nb);
694 #warning todo check window limit
696 if (time_after(cr->timeout, jiffies)) {
697 schedule_delayed_work(&(nb->retrans_timer_conn),
698 cr->timeout - jiffies);
699 rescheduled = 1;
700 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
701 break;
704 kref_get(&(cr->ref));
705 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
706 queuefull = _send_retrans(nb, cr);
707 kref_put(&(cr->ref), free_connretrans);
708 if (queuefull) {
709 rescheduled = 1;
710 if (fromqos == 0)
711 qos_enqueue(nb->dev, &(nb->rb_cr),
712 QOS_CALLER_CONN_RETRANS);
713 break;
717 if (rescheduled == 0)
718 kref_put(&(nb->ref), neighbor_free);
720 return queuefull;
723 void retransmit_conn_timerfunc(struct work_struct *work)
725 struct neighbor *nb = container_of(to_delayed_work(work),
726 struct neighbor, retrans_timer_conn);
728 send_retrans(nb, 0);
731 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
732 struct conn *trgt_out, __u32 seqno_ooo, __u32 length)
734 unsigned long iflags;
735 struct list_head *curr;
737 if (unlikely(length == 0))
738 return;
740 mutex_lock(&(trgt_out->rcv_lock));
742 if (unlikely(trgt_out->targettype != TARGET_OUT))
743 goto out;
744 if (unlikely(trgt_out->target.out.nb != nb))
745 goto out;
746 if (unlikely(trgt_out->target.out.conn_id != conn_id))
747 goto out;
749 spin_lock_irqsave(&(nb->retrans_lock), iflags);
751 curr = trgt_out->target.out.retrans_list.next;
753 while (curr != &(trgt_out->target.out.retrans_list)) {
754 struct conn_retrans *cr = container_of(curr,
755 struct conn_retrans, conn_list);
757 if (((__s32)(cr->seqno + cr->length - seqno_ooo)) > 0)
758 goto cont;
760 if (((__s32)(cr->seqno + cr->length - seqno_ooo - length)) >0) {
761 if (((__s32)(cr->seqno - seqno_ooo - length)) < 0) {
762 __u32 newseqno = seqno_ooo + length;
763 cr->length -= (newseqno - cr->seqno);
764 cr->seqno = newseqno;
767 break;
770 if (((__s32)(cr->seqno - seqno_ooo)) < 0 &&
771 ((__s32)(cr->seqno + cr->length - seqno_ooo -
772 length)) <= 0) {
773 __u32 diff = seqno_ooo + length - cr->seqno -
774 cr->length;
775 cr->seqno += diff;
776 cr->length -= diff;
777 } else {
778 list_del(&(cr->timeout_list));
779 list_del(&(cr->conn_list));
780 cr->ackrcvd = 1;
781 kref_put(&(cr->ref), free_connretrans);
784 cont:
785 curr = curr->next;
788 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list))) == 0) {
789 struct conn_retrans *cr = container_of(
790 trgt_out->target.out.retrans_list.next,
791 struct conn_retrans, conn_list);
792 if (unlikely(((__s32) (cr->seqno -
793 trgt_out->target.out.seqno_acked)) > 0))
794 trgt_out->target.out.seqno_acked = cr->seqno;
797 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
799 out:
800 mutex_unlock(&(trgt_out->rcv_lock));
803 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
804 __u32 seqno, int setwindow, __u8 window)
806 int flush = 0;
807 unsigned long iflags;
809 mutex_lock(&(trgt_out->rcv_lock));
811 #warning todo reset check?
812 if (unlikely(trgt_out->targettype != TARGET_OUT))
813 goto out;
814 if (unlikely(trgt_out->target.out.nb != nb))
815 goto out;
816 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
817 goto out;
819 if (unlikely(((__s32)(seqno - trgt_out->target.out.seqno_nextsend)) > 0)
821 ((__s32)(seqno - trgt_out->target.out.seqno_acked)) < 0)
822 goto out;
824 if (setwindow) {
825 __u32 windowdec = dec_log_64_11(window);
826 if (unlikely(seqno == trgt_out->target.out.seqno_acked &&
827 ((__s32) (seqno + windowdec -
828 trgt_out->target.out.seqno_windowlimit )) <= 0))
829 goto skipwindow;
831 trgt_out->target.out.seqno_windowlimit = seqno + windowdec;
832 flush = 1;
835 skipwindow:
836 if (seqno == trgt_out->target.out.seqno_acked)
837 goto out;
839 spin_lock_irqsave(&(nb->retrans_lock), iflags);
841 trgt_out->target.out.seqno_acked = seqno;
843 while (list_empty(&(trgt_out->target.out.retrans_list)) == 0) {
844 struct conn_retrans *cr = container_of(
845 trgt_out->target.out.retrans_list.next,
846 struct conn_retrans, conn_list);
848 if (((__s32)(cr->seqno + cr->length - seqno)) > 0) {
849 if (((__s32)(cr->seqno - seqno)) < 0) {
850 cr->length -= (seqno - cr->seqno);
851 cr->seqno = seqno;
853 break;
856 list_del(&(cr->timeout_list));
857 list_del(&(cr->conn_list));
858 cr->ackrcvd = 1;
859 kref_put(&(cr->ref), free_connretrans);
862 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
863 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
865 out:
866 mutex_unlock(&(trgt_out->rcv_lock));
868 if (flush)
869 flush_buf(trgt_out);
872 static void schedule_retransmit_conn(struct conn_retrans *cr,
873 struct conn *trgt_out_l, __u32 seqno, __u32 len)
875 unsigned long iflags;
877 struct neighbor *nb = trgt_out_l->target.out.nb;
879 int first;
881 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
883 memset(cr, 0, sizeof (struct conn_retrans));
884 cr->trgt_out_o = trgt_out_l;
885 kref_get(&(trgt_out_l->ref));
886 cr->seqno = seqno;
887 cr->length = len;
888 kref_init(&(cr->ref));
889 set_conn_retrans_timeout(cr);
891 spin_lock_irqsave(&(nb->retrans_lock), iflags);
893 first = unlikely(list_empty(&(nb->retrans_list_conn)));
894 list_add_tail(&(cr->timeout_list), &(nb->retrans_list_conn));
896 list_add_tail(&(cr->conn_list), &(trgt_out_l->target.out.retrans_list));
898 if (unlikely(unlikely(first) &&
899 unlikely(nb->retrans_timer_conn_running == 0))) {
900 schedule_delayed_work(&(nb->retrans_timer_conn),
901 cr->timeout - jiffies);
902 nb->retrans_timer_conn_running = 1;
903 kref_get(&(nb->ref));
906 spin_unlock_irqrestore(&(nb->retrans_lock), iflags);
909 static __u32 get_windowlimit(struct conn *trgt_out_l)
911 __s32 windowlimit = (__s32)(trgt_out_l->target.out.seqno_windowlimit -
912 trgt_out_l->target.out.seqno_nextsend);
913 if (unlikely(windowlimit < 0))
914 return 0;
915 return windowlimit;
918 #warning todo reset connections which are SOURCE_NONE and are stuck for too long
919 int flush_out(struct conn *trgt_out_l, int fromqos, __u32 creditsperbyte)
921 int targetmss;
922 __u32 seqno;
923 int sent = 0;
925 BUG_ON(trgt_out_l->targettype != TARGET_OUT);
927 targetmss = mss(trgt_out_l->target.out.nb);
929 if (unlikely(trgt_out_l->target.out.conn_id == 0))
930 return RC_FLUSH_CONN_OUT_OK;
932 if (unlikely(trgt_out_l->isreset != 0))
933 return RC_FLUSH_CONN_OUT_OK;
935 if (unlikely(trgt_out_l->sourcetype == SOURCE_SOCK &&
936 trgt_out_l->source.sock.delay_flush != 0))
937 return RC_FLUSH_CONN_OUT_OK;
939 if (fromqos == 0 && may_send_conn(trgt_out_l) == 0)
940 return RC_FLUSH_CONN_OUT_CONG;
942 while (trgt_out_l->data_buf.read_remaining >= targetmss &&
943 get_windowlimit(trgt_out_l) >= targetmss) {
944 struct conn_retrans *cr;
945 struct sk_buff *skb;
946 char *dst;
947 int rc;
949 if (unlikely(creditsperbyte * targetmss >
950 trgt_out_l->credits))
951 return RC_FLUSH_CONN_OUT_CREDITS;
953 seqno = trgt_out_l->target.out.seqno_nextsend;
954 skb = create_packet(trgt_out_l->target.out.nb, targetmss, GFP_ATOMIC,
955 trgt_out_l->target.out.conn_id, seqno);
956 if (unlikely(skb == 0))
957 return RC_FLUSH_CONN_OUT_OOM;
959 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
960 if (unlikely(cr == 0)) {
961 kfree_skb(skb);
962 return RC_FLUSH_CONN_OUT_OOM;
965 dst = skb_put(skb, targetmss);
967 databuf_pull(trgt_out_l, dst, targetmss);
969 rc = dev_queue_xmit(skb);
970 if (rc != 0) {
971 databuf_unpull(trgt_out_l, targetmss);
972 kmem_cache_free(connretrans_slab, cr);
973 return RC_FLUSH_CONN_OUT_CONG;
976 trgt_out_l->credits -= creditsperbyte * targetmss;
977 trgt_out_l->target.out.seqno_nextsend += targetmss;
978 schedule_retransmit_conn(cr, trgt_out_l, seqno, targetmss);
979 sent = 1;
982 if (trgt_out_l->data_buf.read_remaining > 0 && (trgt_out_l->tos ==
983 TOS_LATENCY || trgt_out_l->target.out.seqno_nextsend ==
984 trgt_out_l->target.out.seqno_acked)) {
985 struct control_msg_out *cm;
986 struct conn_retrans *cr;
987 __u32 len = trgt_out_l->data_buf.read_remaining;
988 __s32 windowlimit = get_windowlimit(trgt_out_l);
989 char *buf;
991 if (windowlimit == 0)
992 goto out;
994 if (windowlimit < len/2 &&
995 trgt_out_l->target.out.seqno_nextsend !=
996 trgt_out_l->target.out.seqno_acked)
997 goto out;
999 if (len > windowlimit)
1000 len = windowlimit;
1002 buf = kmalloc(len, GFP_KERNEL);
1004 if (unlikely(creditsperbyte * len > trgt_out_l->credits))
1005 return RC_FLUSH_CONN_OUT_CREDITS;
1007 if (unlikely(buf == 0))
1008 return RC_FLUSH_CONN_OUT_OOM;
1010 cm = alloc_control_msg(trgt_out_l->target.out.nb,
1011 ACM_PRIORITY_LOW);
1012 if (unlikely(cm == 0)) {
1013 kfree(buf);
1014 return RC_FLUSH_CONN_OUT_OOM;
1017 cr = kmem_cache_alloc(connretrans_slab, GFP_KERNEL);
1018 if (unlikely(cr == 0)) {
1019 kfree(buf);
1020 free_control_msg(cm);
1021 return RC_FLUSH_CONN_OUT_CONG;
1024 databuf_pull(trgt_out_l, buf, len);
1026 seqno = trgt_out_l->target.out.seqno_nextsend;
1027 trgt_out_l->credits -= creditsperbyte * len;
1028 trgt_out_l->target.out.seqno_nextsend += len;
1030 schedule_retransmit_conn(cr, trgt_out_l, seqno, len);
1032 send_conndata(cm, trgt_out_l->target.out.conn_id, seqno, buf,
1033 buf, len);
1034 sent = 1;
1037 out:
1038 if (sent)
1039 return RC_FLUSH_CONN_OUT_OK_SENT;
1041 return RC_FLUSH_CONN_OUT_OK;
1044 int __init cor_snd_init(void)
1046 connretrans_slab = kmem_cache_create("cor_connretrans",
1047 sizeof(struct conn_retrans), 8, 0, 0);
1049 if (unlikely(connretrans_slab == 0))
1050 return 1;
1052 INIT_DELAYED_WORK(&(qos_resume_work), qos_resume);
1053 qos_resume_scheduled = 0;
1055 return 0;
1058 MODULE_LICENSE("GPL");