add keepalive
[cor.git] / net / cor / conn.c
blob008727b5846042246431ab6148f0575a0cc4ad46
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 DEFINE_SPINLOCK(cor_bindnodes);
27 static LIST_HEAD(cor_openports);
29 static struct kmem_cache *cor_conn_slab;
30 struct kmem_cache *cor_connid_reuse_slab;
32 atomic_t cor_num_conns;
34 int cor_new_incoming_conn_allowed(struct cor_neighbor *nb)
36 /**
37 * MAX_CONNS is only loosly enforced for now
38 * (cor_num_conns is not checked and incremented at the same time)
40 if (atomic_read(&cor_num_conns) >= MAX_CONNS)
41 return 0;
42 else
43 return 1;
46 static __u64 cor_get_prio_in(__u64 priority)
48 if (PRIORITY_IN_MULITPLIER_PERCENT >= 100)
49 return priority;
51 if (unlikely(priority > U64_MAX/100)) {
52 return (priority / 100) * PRIORITY_IN_MULITPLIER_PERCENT;
53 } else {
54 return (priority * PRIORITY_IN_MULITPLIER_PERCENT) / 100;
58 static __u32 cor_conn_prio_sum_limit(__u32 priority, __u64 priority_sum)
60 __u32 shiftcnt = 0;
62 while ((PRIORITY_SUM_IN_MAX >> shiftcnt) > U32_MAX) {
63 shiftcnt++;
66 return div_u64(((__u64) priority) * (PRIORITY_SUM_IN_MAX >> shiftcnt),
67 (priority_sum >> shiftcnt));
70 __u32 _cor_conn_refresh_priority(struct cor_conn *cn_lx)
72 BUG_ON(cn_lx->is_client == 0);
74 if (cn_lx->sourcetype == SOURCE_IN) {
75 __u32 priority = (__u32)
76 cor_get_prio_in(cn_lx->source.in.priority);
77 __u64 priority_sum = cor_get_prio_in(atomic64_read(
78 &(cn_lx->source.in.nb->priority_sum)));
79 if (PRIORITY_SUM_IN_MAX != U64_MAX &&
80 priority_sum > PRIORITY_SUM_IN_MAX) {
81 return cor_conn_prio_sum_limit(priority, priority_sum);
82 } else {
83 return priority;
85 } else if (cn_lx->sourcetype == SOURCE_SOCK) {
86 return cn_lx->source.sock.priority;
87 } else {
88 BUG();
89 return 0;
93 __u32 cor_conn_refresh_priority(struct cor_conn *cn, int locked)
95 struct cor_conn *cn_reversedir = cor_get_conn_reversedir(cn);
96 __u32 priority = 0;
98 if (likely(locked == 0)) {
99 if (cn->is_client == 0)
100 return cor_conn_refresh_priority(cn_reversedir, 0);
102 spin_lock_bh(&(cn->rcv_lock));
103 spin_lock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
104 } else {
105 BUG_ON(cn->is_client == 0);
108 if (unlikely(cn->isreset != 0) || cn->targettype != TARGET_OUT)
109 goto out;
111 priority = _cor_conn_refresh_priority(cn);
113 if (cn->target.out.priority_send_allowed != 0) {
114 __u16 priority_enc = cor_enc_priority(priority);
115 if (priority_enc != cn->target.out.priority_last)
116 cor_send_priority(cn, priority_enc);
119 out:
120 if (likely(locked == 0)) {
121 spin_unlock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
122 spin_unlock_bh(&(cn->rcv_lock));
125 return priority;
128 static void _cor_set_conn_in_priority(struct cor_conn *src_in_lx,
129 __u32 newpriority)
131 struct cor_neighbor *nb = src_in_lx->source.in.nb;
133 __u32 oldpriority = src_in_lx->source.in.priority;
135 cor_update_atomic_sum(&(nb->priority_sum),
136 oldpriority, newpriority);
138 src_in_lx->source.in.priority = newpriority;
141 void cor_set_conn_in_priority(struct cor_neighbor *nb, __u32 conn_id,
142 struct cor_conn *src_in, __u8 priority_seqno, __u16 priority)
144 __u32 newpriority;
146 if (unlikely(src_in->is_client == 0))
147 return;
149 spin_lock_bh(&(src_in->rcv_lock));
150 spin_lock_bh(&(cor_get_conn_reversedir(src_in)->rcv_lock));
152 if (unlikely(cor_is_conn_in(src_in, nb, conn_id) == 0))
153 goto out;
155 if (src_in->source.in.priority_seqno != priority_seqno)
156 goto out;
157 src_in->source.in.priority_seqno =
158 (src_in->source.in.priority_seqno + 1) & 15;
160 newpriority = (cor_dec_priority(priority)*4)/5;
161 _cor_set_conn_in_priority(src_in, newpriority);
162 cor_conn_refresh_priority(src_in, 1);
164 out:
165 spin_unlock_bh(&(cor_get_conn_reversedir(src_in)->rcv_lock));
166 spin_unlock_bh(&(src_in->rcv_lock));
169 static void cor_connreset_priority(struct cor_conn *cn_lx)
171 if (cn_lx->is_client == 0)
172 return;
174 if (cn_lx->sourcetype == SOURCE_IN)
175 _cor_set_conn_in_priority(cn_lx, 0);
179 static void cor_conn_move_to_idle_list(struct cor_conn *trgt_out_lx)
181 unsigned long iflags;
182 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
184 BUG_ON(trgt_out_lx->data_buf.datasize != 0);
186 trgt_out_lx->target.out.nblist_busy_remaining = 0;
187 trgt_out_lx->target.out.jiffies_last_act = jiffies;
189 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
191 trgt_out_lx->target.out.jiffies_last_act = jiffies;
193 list_del(&(trgt_out_lx->target.out.nb_list));
194 list_add_tail(&(trgt_out_lx->target.out.nb_list),
195 &(nb->snd_conn_idle_list));
197 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
199 trgt_out_lx->target.out.in_nb_busy_list = 0;
202 static void cor_conn_move_to_busy_list(struct cor_conn *trgt_out_lx)
204 unsigned long iflags;
205 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
207 BUG_ON(trgt_out_lx->data_buf.datasize == 0);
209 trgt_out_lx->target.out.nblist_busy_remaining =
210 trgt_out_lx->data_buf.datasize;
212 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
214 trgt_out_lx->target.out.jiffies_last_act = jiffies;
216 list_del(&(trgt_out_lx->target.out.nb_list));
217 list_add_tail(&(trgt_out_lx->target.out.nb_list),
218 &(nb->snd_conn_busy_list));
220 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
222 trgt_out_lx->target.out.in_nb_busy_list = 1;
225 void cor_conn_set_last_act(struct cor_conn *trgt_out_lx)
227 if (unlikely(trgt_out_lx->isreset != 0))
228 return;
230 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
232 if (unlikely(trgt_out_lx->target.out.in_nb_busy_list == 0)) {
233 if (unlikely(trgt_out_lx->data_buf.datasize != 0)) {
234 cor_conn_move_to_busy_list(trgt_out_lx);
235 } else if (unlikely(time_after(jiffies,
236 trgt_out_lx->target.out.jiffies_last_act +
237 HZ * CONN_ACTIVITY_UPDATEINTERVAL_SEC))) {
238 cor_conn_move_to_idle_list(trgt_out_lx);
240 } else {
241 if (unlikely(trgt_out_lx->data_buf.datasize == 0)) {
242 cor_conn_move_to_idle_list(trgt_out_lx);
243 } else if (unlikely(
244 trgt_out_lx->target.out.nblist_busy_remaining ==
245 0)) {
246 cor_conn_move_to_busy_list(trgt_out_lx);
250 if (unlikely(time_after(jiffies,
251 trgt_out_lx->target.out.jiffies_last_act +
252 (CONN_BUSY_INACTIVITY_TIMEOUT_SEC * HZ) / 4))) {
253 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
254 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
255 trgt_out_lx->bufsize.bufsize >>
256 BUFSIZE_SHIFT);
258 if (trgt_out_lx->bufsize.state != BUFSIZE_DECR &&
259 trgt_out_lx->bufsize.state !=
260 BUFSIZE_DECR_FAST) {
261 trgt_out_lx->bufsize.state = BUFSIZE_DECR;
262 trgt_out_lx->bufsize.act.decr.size_start =
263 trgt_out_lx->bufsize.bufsize;
269 static void _cor_free_conn(struct cor_conn *cn)
271 BUG_ON(cn->isreset == 0);
273 if (cn->sourcetype == SOURCE_IN) {
274 WARN_ONCE(list_empty(&(cn->source.in.reorder_queue)) == 0,
275 "cor_free_conn(): cn->source.in.reorder_queue is not empty");
276 WARN_ONCE(list_empty(&(cn->source.in.acks_pending)) == 0,
277 "cor_free_conn():cn->source.in.acks_pending is not empty");
279 WARN_ONCE(cn->source.in.conn_id != 0,
280 "cor_free_conn(): cn->source.in.conn_id is not 0");
281 cor_nb_kref_put(cn->source.in.nb, "conn");
282 cn->source.in.nb = 0;
285 if (cn->targettype == TARGET_OUT) {
286 WARN_ONCE(list_empty(&(cn->target.out.retrans_list)) == 0,
287 "cor_free_conn(): cn->target.out.retrans_list is not empty");
288 WARN_ONCE(cn->target.out.rb.in_queue != RB_INQUEUE_FALSE,
289 "cor_free_conn(): cn->target.out.rb.in_queue is not RB_INQUEUE_FALSE");
290 WARN_ONCE(cn->target.out.conn_id != 0,
291 "cor_free_conn(): cn->target.out.conn_id is not 0");
292 cor_nb_kref_put(cn->target.out.nb, "conn");
293 cn->target.out.nb = 0;
296 WARN_ONCE(cn->data_buf.datasize != 0,
297 "cor_free_conn(): cn->data_buf.datasize is not 0");
298 WARN_ONCE(cn->data_buf.overhead != 0,
299 "cor_free_conn(): cn->data_buf.overhead is not 0");
300 WARN_ONCE(list_empty(&(cn->data_buf.items)) == 0,
301 "cor_free_conn(): cn->data_buf.items is not empty");
302 WARN_ONCE(cn->data_buf.nextread != 0,
303 "cor_free_conn(): cn->data_buf.nextread is not 0");
306 void cor_free_conn(struct kref *ref)
308 struct cor_conn_bidir *cnb = container_of(ref, struct cor_conn_bidir,
309 ref);
311 _cor_free_conn(&(cnb->cli));
312 _cor_free_conn(&(cnb->srv));
314 memset(cnb, 9*16 + 10, sizeof(struct cor_conn_bidir));
315 kmem_cache_free(cor_conn_slab, cnb);
319 * rc == 0 ==> ok
320 * rc == 1 ==> connid_reuse or connid allocation failed
322 int cor_conn_init_out(struct cor_conn *trgt_unconn_ll, struct cor_neighbor *nb,
323 __u32 rcvd_connid, int use_rcvd_connid)
325 unsigned long iflags;
326 struct cor_conn *src_unconn_ll =
327 cor_get_conn_reversedir(trgt_unconn_ll);
328 __u8 tmp;
330 BUG_ON(trgt_unconn_ll->targettype != TARGET_UNCONNECTED);
331 BUG_ON(src_unconn_ll == 0);
332 BUG_ON(src_unconn_ll->sourcetype != SOURCE_UNCONNECTED);
334 memset(&(trgt_unconn_ll->target.out), 0,
335 sizeof(trgt_unconn_ll->target.out));
336 memset(&(src_unconn_ll->source.in), 0,
337 sizeof(src_unconn_ll->source.in));
339 trgt_unconn_ll->targettype = TARGET_OUT;
340 src_unconn_ll->sourcetype = SOURCE_IN;
342 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
343 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
344 goto out_err;
346 if (use_rcvd_connid) {
347 BUG_ON((rcvd_connid & (1 << 31)) == 0);
349 src_unconn_ll->source.in.conn_id = rcvd_connid;
350 if (unlikely(cor_insert_connid(nb, src_unconn_ll) != 0)) {
351 src_unconn_ll->source.in.conn_id = 0;
352 goto out_err;
354 } else {
355 src_unconn_ll->source.in.cir = kmem_cache_alloc(
356 cor_connid_reuse_slab, GFP_ATOMIC);
357 if (unlikely(src_unconn_ll->source.in.cir == 0))
358 goto out_err;
360 memset(src_unconn_ll->source.in.cir, 0,
361 sizeof(struct cor_connid_reuse_item));
363 if (unlikely(cor_connid_alloc(nb, src_unconn_ll))) {
364 kmem_cache_free(cor_connid_reuse_slab,
365 src_unconn_ll->source.in.cir);
366 src_unconn_ll->source.in.cir = 0;
367 goto out_err;
371 list_add_tail(&(trgt_unconn_ll->target.out.nb_list),
372 &(nb->snd_conn_idle_list));
373 cor_conn_kref_get(trgt_unconn_ll, "neighbor_list");
375 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
377 if (0) {
378 out_err:
379 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
380 trgt_unconn_ll->targettype = TARGET_UNCONNECTED;
381 src_unconn_ll->sourcetype = SOURCE_UNCONNECTED;
382 return 1;
385 trgt_unconn_ll->target.out.nb = nb;
386 src_unconn_ll->source.in.nb = nb;
387 cor_nb_kref_get(nb, "conn");
388 cor_nb_kref_get(nb, "conn");
390 cor_conn_set_last_act(trgt_unconn_ll);
392 INIT_LIST_HEAD(&(src_unconn_ll->source.in.reorder_queue));
393 INIT_LIST_HEAD(&(src_unconn_ll->source.in.acks_pending));
394 INIT_LIST_HEAD(&(trgt_unconn_ll->target.out.retrans_list));
396 cor_reset_seqno(trgt_unconn_ll, 0);
397 if (use_rcvd_connid == 0) {
398 get_random_bytes((char *)
399 &(trgt_unconn_ll->target.out.seqno_nextsend),
400 sizeof(
401 trgt_unconn_ll->target.out.seqno_nextsend));
402 trgt_unconn_ll->target.out.seqno_acked =
403 trgt_unconn_ll->target.out.seqno_nextsend;
404 trgt_unconn_ll->target.out.seqno_windowlimit =
405 trgt_unconn_ll->target.out.seqno_nextsend;
406 cor_reset_seqno(trgt_unconn_ll,
407 trgt_unconn_ll->target.out.seqno_nextsend);
409 get_random_bytes((char *)
410 &(src_unconn_ll->source.in.next_seqno),
411 sizeof(src_unconn_ll->source.in.next_seqno));
412 src_unconn_ll->source.in.window_seqnolimit =
413 src_unconn_ll->source.in.next_seqno;
414 src_unconn_ll->source.in.window_seqnolimit_remote =
415 src_unconn_ll->source.in.next_seqno;
418 get_random_bytes((char *) &tmp, 1);
419 trgt_unconn_ll->target.out.priority_seqno = (tmp & 15);
421 src_unconn_ll->source.in.priority_seqno = 0;
423 trgt_unconn_ll->target.out.jiffies_last_act = jiffies;
425 if (src_unconn_ll->is_highlatency)
426 trgt_unconn_ll->target.out.jiffies_idle_since =
427 (jiffies -
428 BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * HZ) <<
429 JIFFIES_LAST_IDLE_SHIFT;
430 else
431 trgt_unconn_ll->target.out.jiffies_idle_since =
432 jiffies << JIFFIES_LAST_IDLE_SHIFT;
434 trgt_unconn_ll->target.out.remote_bufsize_changerate = 64;
436 if (src_unconn_ll->is_client)
437 atomic_inc(&cor_num_conns);
439 if (use_rcvd_connid == 0)
440 cor_update_windowlimit(src_unconn_ll);
442 return 0;
445 void cor_conn_init_sock_source(struct cor_conn *cn)
447 BUG_ON(cn == 0);
448 cn->sourcetype = SOURCE_SOCK;
449 memset(&(cn->source.sock), 0, sizeof(cn->source.sock));
450 cn->source.sock.priority = cor_priority_max();
451 cn->source.sock.snd_speed.jiffies_last_refresh = jiffies;
452 cn->source.sock.snd_speed.flushed = 1;
453 timer_setup(&(cn->source.sock.keepalive_timer),
454 cor_keepalive_req_timerfunc, 0);
457 void cor_conn_init_sock_target(struct cor_conn *cn)
459 BUG_ON(cn == 0);
460 cn->targettype = TARGET_SOCK;
461 memset(&(cn->target.sock), 0, sizeof(cn->target.sock));
462 cor_reset_seqno(cn, 0);
465 static void _cor_alloc_conn(struct cor_conn *cn, __u8 is_highlatency)
467 cn->sourcetype = SOURCE_UNCONNECTED;
468 cn->targettype = TARGET_UNCONNECTED;
470 cn->isreset = 0;
472 spin_lock_init(&(cn->rcv_lock));
474 cor_databuf_init(cn);
476 cor_bufsize_init(cn, 0);
478 if (is_highlatency == 0) {
479 cn->is_highlatency = 0;
480 cn->bufsize.bufsize =
481 (BUFSIZE_INITIAL_LOWLAT << BUFSIZE_SHIFT);
482 } else {
483 cn->is_highlatency = 1;
484 cn->bufsize.bufsize =
485 (BUFSIZE_INITIAL_HIGHLAT << BUFSIZE_SHIFT);
489 struct cor_conn_bidir* cor_alloc_conn(gfp_t allocflags, __u8 is_highlatency)
491 struct cor_conn_bidir *cnb;
493 cnb = kmem_cache_alloc(cor_conn_slab, allocflags);
494 if (unlikely(cnb == 0))
495 return 0;
497 memset(cnb, 0, sizeof(struct cor_conn_bidir));
499 cnb->cli.is_client = 1;
500 kref_init(&(cnb->ref));
502 _cor_alloc_conn(&(cnb->cli), is_highlatency);
503 _cor_alloc_conn(&(cnb->srv), is_highlatency);
505 return cnb;
508 static struct cor_sock *cor_get_corsock_by_port(__be32 port)
510 struct list_head *curr = cor_openports.next;
512 while (curr != &cor_openports) {
513 struct cor_sock *cs = container_of(curr, struct cor_sock,
514 data.listener.lh);
515 BUG_ON(cs->type != CS_TYPE_LISTENER);
516 if (cs->data.listener.port == port)
517 return cs;
519 curr = curr->next;
522 return 0;
525 __u32 cor_list_services(char *buf, __u32 buflen)
527 __u32 cnt = 0;
529 __u32 buf_offset = 4;
531 struct list_head *curr;
532 int rc;
535 * The variable length header rowcount need to be generated after the
536 * data. This is done by reserving the maximum space they could take. If
537 * they end up being smaller, the data is moved so that there is no gap.
540 BUG_ON(buf == 0);
541 BUG_ON(buflen < buf_offset);
543 spin_lock_bh(&cor_bindnodes);
545 curr = cor_openports.next;
546 while (curr != &cor_openports) {
547 struct cor_sock *cs = container_of(curr, struct cor_sock,
548 data.listener.lh);
549 BUG_ON(cs->type != CS_TYPE_LISTENER);
551 if (cs->data.listener.publish_service == 0)
552 goto cont;
554 if (unlikely(buf_offset + 4 < buf_offset) ||
555 buf_offset + 4 > buflen)
556 break;
558 buf[buf_offset] = ((char *) &(cs->data.listener.port))[0];
559 buf[buf_offset+1] = ((char *) &(cs->data.listener.port))[1];
560 buf[buf_offset+2] = ((char *) &(cs->data.listener.port))[2];
561 buf[buf_offset+3] = ((char *) &(cs->data.listener.port))[3];
562 buf_offset += 4;
563 cnt++;
565 cont:
566 curr = curr->next;
569 spin_unlock_bh(&cor_bindnodes);
571 rc = cor_encode_len(buf, 4, cnt);
572 BUG_ON(rc <= 0);
573 BUG_ON(rc > 4);
575 if (likely(rc < 4))
576 memmove(buf + ((__u32) rc), buf+4, buf_offset);
578 return buf_offset - 4 + ((__u32) rc);
582 void cor_set_publish_service(struct cor_sock *cs, __u8 value)
584 BUG_ON (value != 0 && value != 1);
586 mutex_lock(&(cs->lock));
588 cs->publish_service = value;
590 if (cs->type == CS_TYPE_LISTENER) {
591 spin_lock_bh(&cor_bindnodes);
592 cs->data.listener.publish_service = value;
593 spin_unlock_bh(&cor_bindnodes);
596 mutex_unlock(&(cs->lock));
599 void cor_close_port(struct cor_sock *cs)
601 mutex_lock(&(cs->lock));
602 if (unlikely(cs->type != CS_TYPE_LISTENER))
603 goto out;
605 spin_lock_bh(&cor_bindnodes);
607 list_del(&(cs->data.listener.lh));
609 while (list_empty(&(cs->data.listener.conn_queue)) == 0) {
610 struct cor_conn *src_sock_o = container_of(
611 cs->data.listener.conn_queue.next,
612 struct cor_conn, source.sock.cl_list);
613 BUG_ON(src_sock_o->source.sock.in_cl_list == 0);
614 list_del(&(src_sock_o->source.sock.cl_list));
615 src_sock_o->source.sock.in_cl_list = 0;
616 spin_unlock_bh(&cor_bindnodes);
618 cor_reset_conn(src_sock_o);
620 spin_lock_bh(&cor_bindnodes);
621 cor_conn_kref_put(src_sock_o, "conn_queue");
624 spin_unlock_bh(&cor_bindnodes);
625 out:
626 mutex_unlock(&(cs->lock));
629 int cor_open_port(struct cor_sock *cs_l, __be32 port)
631 int rc = 0;
633 spin_lock_bh(&cor_bindnodes);
634 if (cor_get_corsock_by_port(port) != 0) {
635 rc = -EADDRINUSE;
636 goto out;
639 BUG_ON(cs_l->type != CS_TYPE_UNCONNECTED);
641 cs_l->type = CS_TYPE_LISTENER;
642 cs_l->data.listener.port = port;
643 cs_l->data.listener.publish_service = cs_l->publish_service;
645 /* kref is not used here */
646 INIT_LIST_HEAD(&(cs_l->data.listener.conn_queue));
648 list_add_tail((struct list_head *) &(cs_l->data.listener.lh),
649 &cor_openports);
651 out:
652 spin_unlock_bh(&cor_bindnodes);
654 return rc;
658 * rc == 0 connected
659 * rc == 2 port not open
660 * rc == 3 listener queue full
662 int cor_connect_port(struct cor_conn *trgt_unconn_ll, __be32 port)
664 struct cor_conn *src_unconn_ll =
665 cor_get_conn_reversedir(trgt_unconn_ll);
666 struct cor_sock *cs;
667 int rc = 0;
669 spin_lock_bh(&cor_bindnodes);
671 cs = cor_get_corsock_by_port(port);
672 if (cs == 0) {
673 rc = 2;
674 goto out;
677 if (unlikely(cs->data.listener.queue_len >=
678 cs->data.listener.queue_maxlen)) {
679 if (cs->data.listener.queue_maxlen <= 0)
680 rc = 2;
681 else
682 rc = 3;
684 goto out;
687 BUG_ON(trgt_unconn_ll->is_client != 1);
688 cor_conn_init_sock_target(trgt_unconn_ll);
689 cor_conn_init_sock_source(src_unconn_ll);
691 list_add_tail(&(src_unconn_ll->source.sock.cl_list),
692 &(cs->data.listener.conn_queue));
693 src_unconn_ll->source.sock.in_cl_list = 1;
694 cor_conn_kref_get(src_unconn_ll, "conn_queue");
696 cs->data.listener.queue_len++;
697 atomic_set(&(cs->ready_to_accept), 1);
698 barrier();
699 cs->sk.sk_state_change(&(cs->sk));
701 out:
702 spin_unlock_bh(&cor_bindnodes);
703 return rc;
707 * rc == 0 connected
708 * rc == 3 addr not found
709 * rc == 4 ==> connid allocation failed
710 * rc == 4 ==> control msg alloc failed
712 int cor_connect_neigh(struct cor_conn *trgt_unconn_ll, char *addr,
713 __u16 addrlen)
715 struct cor_control_msg_out *cm;
716 struct cor_neighbor *nb = 0;
718 nb = cor_find_neigh(addr, addrlen);
719 if (nb == 0)
720 return 3;
722 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
723 if (unlikely(cm == 0)) {
724 cor_nb_kref_put(nb, "stack");
725 return 4;
728 if (unlikely(cor_conn_init_out(trgt_unconn_ll, nb, 0, 0))) {
729 cor_free_control_msg(cm);
730 cor_nb_kref_put(nb, "stack");
731 return 4;
734 trgt_unconn_ll->target.out.priority_last =
735 _cor_conn_refresh_priority(trgt_unconn_ll);
737 cor_send_connect_nb(cm, trgt_unconn_ll->target.out.conn_id,
738 trgt_unconn_ll->target.out.seqno_nextsend,
739 cor_get_conn_reversedir(trgt_unconn_ll)->
740 source.in.next_seqno,
741 cor_get_conn_reversedir(trgt_unconn_ll));
743 cor_nb_kref_put(nb, "stack");
745 return 0;
748 static void _cor_reset_conn(struct cor_conn *cn_ll, int trgt_out_resetneeded)
750 unsigned long iflags;
752 if (cn_ll->sourcetype == SOURCE_IN) {
753 struct cor_neighbor *nb = cn_ll->source.in.nb;
755 if (cn_ll->source.in.conn_id != 0 &&
756 (cn_ll->source.in.conn_id & (1 << 31)) != 0) {
757 BUG_ON(cn_ll->source.in.cir != 0);
758 } else if (cn_ll->source.in.conn_id != 0 &&
759 (cn_ll->source.in.conn_id & (1 << 31)) == 0) {
760 BUG_ON(cn_ll->source.in.cir == 0);
762 kref_init(&(cn_ll->source.in.cir->ref));
763 cn_ll->source.in.cir->conn_id =
764 cn_ll->source.in.conn_id;
765 cn_ll->source.in.cir->pingcnt =
766 nb->connid_reuse_pingcnt;
768 spin_lock_irqsave(&(nb->connid_reuse_lock), iflags);
769 cor_insert_connid_reuse(nb, cn_ll->source.in.cir);
770 list_add_tail(&(cn_ll->source.in.cir->lh),
771 &(nb->connid_reuse_list));
772 spin_unlock_irqrestore(&(nb->connid_reuse_lock),
773 iflags);
775 cn_ll->source.in.cir = 0;
778 if (cn_ll->source.in.conn_id != 0) {
779 spin_lock_irqsave(&(nb->connid_lock), iflags);
780 rb_erase(&(cn_ll->source.in.rbn), &(nb->connid_rb));
781 spin_unlock_irqrestore(&(nb->connid_lock), iflags);
782 cor_conn_kref_put_bug(cn_ll, "connid_table");
784 cn_ll->source.in.conn_id = 0;
786 cor_free_ack_conns(cn_ll);
789 if (cn_ll->is_client)
790 atomic_dec(&cor_num_conns);
792 cor_reset_ooo_queue(cn_ll);
793 } else if (cn_ll->sourcetype == SOURCE_SOCK) {
794 if (likely(cn_ll->source.sock.cs != 0)) {
795 cor_sk_write_space(cn_ll->source.sock.cs);
796 kref_put(&(cn_ll->source.sock.cs->ref), cor_free_sock);
797 cn_ll->source.sock.cs = 0;
799 if (unlikely(cn_ll->source.sock.in_cl_list != 0)) {
800 list_del(&(cn_ll->source.sock.cl_list));
801 cn_ll->source.sock.in_cl_list = 0;
802 cor_conn_kref_put_bug(cn_ll, "conn_queue");
805 if (cn_ll->source.sock.socktype == SOCKTYPE_MANAGED) {
806 if (del_timer(&(cn_ll->source.sock.keepalive_timer)) !=
808 cor_conn_kref_put(cn_ll, "keepalive_snd_timer");
812 if (cn_ll->targettype == TARGET_UNCONNECTED) {
813 if (cn_ll->target.unconnected.cmdparams != 0) {
814 kfree(cn_ll->target.unconnected.cmdparams);
815 cn_ll->target.unconnected.cmdparams = 0;
817 } else if (cn_ll->targettype == TARGET_OUT) {
818 struct cor_neighbor *nb = cn_ll->target.out.nb;
820 spin_lock_irqsave(&(nb->conn_list_lock), iflags);
821 list_del(&(cn_ll->target.out.nb_list));
822 spin_unlock_irqrestore(&(nb->conn_list_lock), iflags);
823 cor_conn_kref_put_bug(cn_ll, "neighbor_list");
825 if (trgt_out_resetneeded && cn_ll->target.out.conn_id != 0) {
826 cor_send_reset_conn(cn_ll->target.out.nb,
827 cn_ll->target.out.conn_id, 0);
830 cn_ll->target.out.conn_id = 0;
832 cor_cancel_all_conn_retrans(cn_ll);
834 cor_qos_remove_conn(cn_ll);
835 } else if (cn_ll->targettype == TARGET_SOCK) {
836 if (likely(cn_ll->target.sock.cs != 0)) {
837 if (cn_ll->target.sock.socktype == SOCKTYPE_RAW) {
838 cor_sk_data_ready(cn_ll->target.sock.cs);
839 } else {
840 cor_mngdsocket_readfromconn_fromatomic(
841 cn_ll->target.sock.cs);
843 kref_put(&(cn_ll->target.sock.cs->ref), cor_free_sock);
844 cn_ll->target.sock.cs = 0;
845 cn_ll->target.sock.rcv_buf = 0;
849 cor_databuf_ackdiscard(cn_ll);
851 cor_account_bufspace(cn_ll);
853 cor_connreset_priority(cn_ll);
856 void cor_reset_conn_locked(struct cor_conn_bidir *cnb_ll)
858 BUG_ON(cnb_ll->cli.isreset <= 1 && cnb_ll->srv.isreset == 2);
859 BUG_ON(cnb_ll->cli.isreset == 2 && cnb_ll->srv.isreset <= 1);
861 if (cnb_ll->cli.isreset <= 1) {
862 __u8 old_isreset_cli = cnb_ll->cli.isreset;
863 __u8 old_isreset_srv = cnb_ll->srv.isreset;
865 cnb_ll->cli.isreset = 2;
866 cnb_ll->srv.isreset = 2;
868 _cor_reset_conn(&(cnb_ll->cli), old_isreset_cli == 0);
869 _cor_reset_conn(&(cnb_ll->srv), old_isreset_srv == 0);
873 void cor_reset_conn(struct cor_conn *cn)
875 struct cor_conn_bidir *cnb = cor_get_conn_bidir(cn);
877 cor_conn_kref_get(&(cnb->cli), "stack");
878 cor_conn_kref_get(&(cnb->srv), "stack");
880 spin_lock_bh(&(cnb->cli.rcv_lock));
881 spin_lock_bh(&(cnb->srv.rcv_lock));
883 cor_reset_conn_locked(cnb);
885 spin_unlock_bh(&(cnb->srv.rcv_lock));
886 spin_unlock_bh(&(cnb->cli.rcv_lock));
888 cor_conn_kref_put_bug(&(cnb->cli), "stack");
889 cor_conn_kref_put(&(cnb->srv), "stack");
892 static int __init cor_init(void)
894 int rc;
896 struct cor_conn_bidir cb;
897 struct cor_conn c;
900 printk(KERN_ERR "sizeof cor_conn_bidir: %u\n", (__u32) sizeof(cb));
901 printk(KERN_ERR "sizeof conn: %u\n", (__u32) sizeof(c));
902 printk(KERN_ERR " conn.source: %u\n", (__u32) sizeof(c.source));
903 printk(KERN_ERR " conn.source.in: %u\n", (__u32) sizeof(c.source.in));
904 printk(KERN_ERR " conn.source.sock: %u\n", (__u32) sizeof(c.source.sock));
905 printk(KERN_ERR " conn.target: %u\n", (__u32) sizeof(c.target));
906 printk(KERN_ERR " conn.target.out: %u\n",
907 (__u32) sizeof(c.target.out));
908 printk(KERN_ERR " conn.target.sock: %u\n",
909 (__u32) sizeof(c.target.sock));
910 printk(KERN_ERR " conn.data_buf: %u\n", (__u32) sizeof(c.data_buf));
911 printk(KERN_ERR " conn.bufsize: %u\n", (__u32) sizeof(c.bufsize));
913 printk(KERN_ERR "sizeof cor_neighbor: %u\n",
914 (__u32) sizeof(struct cor_neighbor));
916 printk(KERN_ERR "sizeof mutex: %u\n", (__u32) sizeof(struct mutex));
917 printk(KERN_ERR "sizeof spinlock: %u\n", (__u32) sizeof(spinlock_t));
918 printk(KERN_ERR "sizeof kref: %u\n", (__u32) sizeof(struct kref));
919 printk(KERN_ERR "sizeof list_head: %u\n",
920 (__u32) sizeof(struct list_head));
921 printk(KERN_ERR "sizeof rb_root: %u\n", (__u32) sizeof(struct rb_root));
922 printk(KERN_ERR "sizeof rb_node: %u\n", (__u32) sizeof(struct rb_node));
925 rc = cor_util_init();
926 if (unlikely(rc != 0))
927 return rc;
929 cor_conn_slab = kmem_cache_create("cor_conn",
930 sizeof(struct cor_conn_bidir), 8, 0, 0);
931 if (unlikely(cor_conn_slab == 0))
932 return -ENOMEM;
934 cor_connid_reuse_slab = kmem_cache_create("cor_connid_reuse",
935 sizeof(struct cor_connid_reuse_item), 8, 0, 0);
936 if (unlikely(cor_connid_reuse_slab == 0))
937 return -ENOMEM;
940 atomic_set(&cor_num_conns, 0);
941 barrier();
943 rc = cor_forward_init();
944 if (unlikely(rc != 0))
945 return rc;
947 rc = cor_kgen_init();
948 if (unlikely(rc != 0))
949 return rc;
951 rc = cor_rd_init1();
952 if (unlikely(rc != 0))
953 return rc;
955 rc = cor_snd_init();
956 if (unlikely(rc != 0))
957 return rc;
959 rc = cor_neighbor_init();
960 if (unlikely(rc != 0))
961 return rc;
963 rc = cor_neigh_ann_rcv_init();
964 if (unlikely(rc != 0))
965 return rc;
967 rc = cor_dev_init();
968 if (unlikely(rc != 0))
969 return rc;
971 rc = cor_rcv_init();
972 if (unlikely(rc != 0))
973 return rc;
975 rc = cor_sock_managed_init1();
976 if (unlikely(rc != 0))
977 return rc;
979 rc = cor_conn_src_sock_init1();
980 if (unlikely(rc != 0))
981 return rc;
983 rc = cor_sock_init2();
984 if (unlikely(rc != 0))
985 return rc;
987 rc = cor_rd_init2();
988 if (unlikely(rc != 0))
989 return rc;
991 return 0;
994 static void __exit cor_exit(void)
996 cor_rd_exit1();
997 cor_sock_exit1();
998 cor_conn_src_sock_exit1();
999 cor_dev_exit1();
1001 flush_scheduled_work();
1003 cor_rcv_exit2();
1004 cor_neighbor_exit2();
1005 cor_neigh_ann_rcv_exit2();
1006 cor_snd_exit2();
1007 cor_rd_exit2();
1008 cor_kgen_exit2();
1009 cor_forward_exit2();
1011 BUG_ON(atomic_read(&cor_num_conns) != 0);
1013 kmem_cache_destroy(cor_conn_slab);
1014 cor_conn_slab = 0;
1016 kmem_cache_destroy(cor_connid_reuse_slab);
1017 cor_connid_reuse_slab = 0;
1020 module_init(cor_init);
1021 module_exit(cor_exit);
1022 MODULE_LICENSE("GPL");