2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 DEFINE_SPINLOCK(cor_bindnodes
);
27 static LIST_HEAD(cor_openports
);
29 static struct kmem_cache
*cor_conn_slab
;
30 struct kmem_cache
*cor_connid_reuse_slab
;
32 atomic_t cor_num_conns
;
34 int cor_new_incoming_conn_allowed(struct cor_neighbor
*nb
)
37 * MAX_CONNS is only loosly enforced for now
38 * (cor_num_conns is not checked and incremented at the same time)
40 if (atomic_read(&cor_num_conns
) >= MAX_CONNS
)
46 static __u64
cor_get_prio_in(__u64 priority
)
48 if (PRIORITY_IN_MULITPLIER_PERCENT
>= 100)
51 if (unlikely(priority
> U64_MAX
/100)) {
52 return (priority
/ 100) * PRIORITY_IN_MULITPLIER_PERCENT
;
54 return (priority
* PRIORITY_IN_MULITPLIER_PERCENT
) / 100;
58 static __u32
cor_conn_prio_sum_limit(__u32 priority
, __u64 priority_sum
)
62 while ((PRIORITY_SUM_IN_MAX
>> shiftcnt
) > U32_MAX
) {
66 return div_u64(((__u64
) priority
) * (PRIORITY_SUM_IN_MAX
>> shiftcnt
),
67 (priority_sum
>> shiftcnt
));
70 __u32
_cor_conn_refresh_priority(struct cor_conn
*cn_lx
)
72 BUG_ON(cn_lx
->is_client
== 0);
74 if (cn_lx
->sourcetype
== SOURCE_IN
) {
75 __u32 priority
= (__u32
)
76 cor_get_prio_in(cn_lx
->source
.in
.priority
);
77 __u64 priority_sum
= cor_get_prio_in(atomic64_read(
78 &(cn_lx
->source
.in
.nb
->priority_sum
)));
79 if (PRIORITY_SUM_IN_MAX
!= U64_MAX
&&
80 priority_sum
> PRIORITY_SUM_IN_MAX
) {
81 return cor_conn_prio_sum_limit(priority
, priority_sum
);
85 } else if (cn_lx
->sourcetype
== SOURCE_SOCK
) {
86 return cn_lx
->source
.sock
.priority
;
93 __u32
cor_conn_refresh_priority(struct cor_conn
*cn
, int locked
)
95 struct cor_conn
*cn_reversedir
= cor_get_conn_reversedir(cn
);
98 if (likely(locked
== 0)) {
99 if (cn
->is_client
== 0)
100 return cor_conn_refresh_priority(cn_reversedir
, 0);
102 spin_lock_bh(&(cn
->rcv_lock
));
103 spin_lock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
105 BUG_ON(cn
->is_client
== 0);
108 if (unlikely(cn
->isreset
!= 0) || cn
->targettype
!= TARGET_OUT
)
111 priority
= _cor_conn_refresh_priority(cn
);
113 if (cn
->target
.out
.priority_send_allowed
!= 0) {
114 __u16 priority_enc
= cor_enc_priority(priority
);
115 if (priority_enc
!= cn
->target
.out
.priority_last
)
116 cor_send_priority(cn
, priority_enc
);
120 if (likely(locked
== 0)) {
121 spin_unlock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
122 spin_unlock_bh(&(cn
->rcv_lock
));
128 static void _cor_set_conn_in_priority(struct cor_conn
*src_in_lx
,
131 struct cor_neighbor
*nb
= src_in_lx
->source
.in
.nb
;
133 __u32 oldpriority
= src_in_lx
->source
.in
.priority
;
135 cor_update_atomic_sum(&(nb
->priority_sum
),
136 oldpriority
, newpriority
);
138 src_in_lx
->source
.in
.priority
= newpriority
;
141 void cor_set_conn_in_priority(struct cor_neighbor
*nb
, __u32 conn_id
,
142 struct cor_conn
*src_in
, __u8 priority_seqno
, __u16 priority
)
146 if (unlikely(src_in
->is_client
== 0))
149 spin_lock_bh(&(src_in
->rcv_lock
));
150 spin_lock_bh(&(cor_get_conn_reversedir(src_in
)->rcv_lock
));
152 if (unlikely(cor_is_conn_in(src_in
, nb
, conn_id
) == 0))
155 if (src_in
->source
.in
.priority_seqno
!= priority_seqno
)
157 src_in
->source
.in
.priority_seqno
=
158 (src_in
->source
.in
.priority_seqno
+ 1) & 15;
160 newpriority
= (cor_dec_priority(priority
)*4)/5;
161 _cor_set_conn_in_priority(src_in
, newpriority
);
162 cor_conn_refresh_priority(src_in
, 1);
165 spin_unlock_bh(&(cor_get_conn_reversedir(src_in
)->rcv_lock
));
166 spin_unlock_bh(&(src_in
->rcv_lock
));
169 static void cor_connreset_priority(struct cor_conn
*cn_lx
)
171 if (cn_lx
->is_client
== 0)
174 if (cn_lx
->sourcetype
== SOURCE_IN
)
175 _cor_set_conn_in_priority(cn_lx
, 0);
179 static void cor_conn_move_to_idle_list(struct cor_conn
*trgt_out_lx
)
181 unsigned long iflags
;
182 struct cor_neighbor
*nb
= trgt_out_lx
->target
.out
.nb
;
184 BUG_ON(trgt_out_lx
->data_buf
.datasize
!= 0);
186 trgt_out_lx
->target
.out
.nblist_busy_remaining
= 0;
187 trgt_out_lx
->target
.out
.jiffies_last_act
= jiffies
;
189 spin_lock_irqsave(&(nb
->conn_list_lock
), iflags
);
191 trgt_out_lx
->target
.out
.jiffies_last_act
= jiffies
;
193 list_del(&(trgt_out_lx
->target
.out
.nb_list
));
194 list_add_tail(&(trgt_out_lx
->target
.out
.nb_list
),
195 &(nb
->snd_conn_idle_list
));
197 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
199 trgt_out_lx
->target
.out
.in_nb_busy_list
= 0;
202 static void cor_conn_move_to_busy_list(struct cor_conn
*trgt_out_lx
)
204 unsigned long iflags
;
205 struct cor_neighbor
*nb
= trgt_out_lx
->target
.out
.nb
;
207 BUG_ON(trgt_out_lx
->data_buf
.datasize
== 0);
209 trgt_out_lx
->target
.out
.nblist_busy_remaining
=
210 trgt_out_lx
->data_buf
.datasize
;
212 spin_lock_irqsave(&(nb
->conn_list_lock
), iflags
);
214 trgt_out_lx
->target
.out
.jiffies_last_act
= jiffies
;
216 list_del(&(trgt_out_lx
->target
.out
.nb_list
));
217 list_add_tail(&(trgt_out_lx
->target
.out
.nb_list
),
218 &(nb
->snd_conn_busy_list
));
220 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
222 trgt_out_lx
->target
.out
.in_nb_busy_list
= 1;
225 void cor_conn_set_last_act(struct cor_conn
*trgt_out_lx
)
227 if (unlikely(trgt_out_lx
->isreset
!= 0))
230 BUG_ON(trgt_out_lx
->targettype
!= TARGET_OUT
);
232 if (unlikely(trgt_out_lx
->target
.out
.in_nb_busy_list
== 0)) {
233 if (unlikely(trgt_out_lx
->data_buf
.datasize
!= 0)) {
234 cor_conn_move_to_busy_list(trgt_out_lx
);
235 } else if (unlikely(time_after(jiffies
,
236 trgt_out_lx
->target
.out
.jiffies_last_act
+
237 HZ
* CONN_ACTIVITY_UPDATEINTERVAL_SEC
))) {
238 cor_conn_move_to_idle_list(trgt_out_lx
);
241 if (unlikely(trgt_out_lx
->data_buf
.datasize
== 0)) {
242 cor_conn_move_to_idle_list(trgt_out_lx
);
244 trgt_out_lx
->target
.out
.nblist_busy_remaining
==
246 cor_conn_move_to_busy_list(trgt_out_lx
);
250 if (unlikely(time_after(jiffies
,
251 trgt_out_lx
->target
.out
.jiffies_last_act
+
252 (CONN_BUSY_INACTIVITY_TIMEOUT_SEC
* HZ
) / 4))) {
253 trgt_out_lx
->bufsize
.ignore_rcv_lowbuf
= max(
254 trgt_out_lx
->bufsize
.ignore_rcv_lowbuf
,
255 trgt_out_lx
->bufsize
.bufsize
>>
258 if (trgt_out_lx
->bufsize
.state
!= BUFSIZE_DECR
&&
259 trgt_out_lx
->bufsize
.state
!=
261 trgt_out_lx
->bufsize
.state
= BUFSIZE_DECR
;
262 trgt_out_lx
->bufsize
.act
.decr
.size_start
=
263 trgt_out_lx
->bufsize
.bufsize
;
269 static void _cor_free_conn(struct cor_conn
*cn
)
271 BUG_ON(cn
->isreset
== 0);
273 if (cn
->sourcetype
== SOURCE_IN
) {
274 WARN_ONCE(list_empty(&(cn
->source
.in
.reorder_queue
)) == 0,
275 "cor_free_conn(): cn->source.in.reorder_queue is not empty");
276 WARN_ONCE(list_empty(&(cn
->source
.in
.acks_pending
)) == 0,
277 "cor_free_conn():cn->source.in.acks_pending is not empty");
279 WARN_ONCE(cn
->source
.in
.conn_id
!= 0,
280 "cor_free_conn(): cn->source.in.conn_id is not 0");
281 cor_nb_kref_put(cn
->source
.in
.nb
, "conn");
282 cn
->source
.in
.nb
= 0;
285 if (cn
->targettype
== TARGET_OUT
) {
286 WARN_ONCE(list_empty(&(cn
->target
.out
.retrans_list
)) == 0,
287 "cor_free_conn(): cn->target.out.retrans_list is not empty");
288 WARN_ONCE(cn
->target
.out
.rb
.in_queue
!= RB_INQUEUE_FALSE
,
289 "cor_free_conn(): cn->target.out.rb.in_queue is not RB_INQUEUE_FALSE");
290 WARN_ONCE(cn
->target
.out
.conn_id
!= 0,
291 "cor_free_conn(): cn->target.out.conn_id is not 0");
292 cor_nb_kref_put(cn
->target
.out
.nb
, "conn");
293 cn
->target
.out
.nb
= 0;
296 WARN_ONCE(cn
->data_buf
.datasize
!= 0,
297 "cor_free_conn(): cn->data_buf.datasize is not 0");
298 WARN_ONCE(cn
->data_buf
.overhead
!= 0,
299 "cor_free_conn(): cn->data_buf.overhead is not 0");
300 WARN_ONCE(list_empty(&(cn
->data_buf
.items
)) == 0,
301 "cor_free_conn(): cn->data_buf.items is not empty");
302 WARN_ONCE(cn
->data_buf
.nextread
!= 0,
303 "cor_free_conn(): cn->data_buf.nextread is not 0");
306 void cor_free_conn(struct kref
*ref
)
308 struct cor_conn_bidir
*cnb
= container_of(ref
, struct cor_conn_bidir
,
311 _cor_free_conn(&(cnb
->cli
));
312 _cor_free_conn(&(cnb
->srv
));
314 memset(cnb
, 9*16 + 10, sizeof(struct cor_conn_bidir
));
315 kmem_cache_free(cor_conn_slab
, cnb
);
320 * rc == 1 ==> connid_reuse or connid allocation failed
322 int cor_conn_init_out(struct cor_conn
*trgt_unconn_ll
, struct cor_neighbor
*nb
,
323 __u32 rcvd_connid
, int use_rcvd_connid
)
325 unsigned long iflags
;
326 struct cor_conn
*src_unconn_ll
=
327 cor_get_conn_reversedir(trgt_unconn_ll
);
330 BUG_ON(trgt_unconn_ll
->targettype
!= TARGET_UNCONNECTED
);
331 BUG_ON(src_unconn_ll
== 0);
332 BUG_ON(src_unconn_ll
->sourcetype
!= SOURCE_UNCONNECTED
);
334 memset(&(trgt_unconn_ll
->target
.out
), 0,
335 sizeof(trgt_unconn_ll
->target
.out
));
336 memset(&(src_unconn_ll
->source
.in
), 0,
337 sizeof(src_unconn_ll
->source
.in
));
339 trgt_unconn_ll
->targettype
= TARGET_OUT
;
340 src_unconn_ll
->sourcetype
= SOURCE_IN
;
342 spin_lock_irqsave(&(nb
->conn_list_lock
), iflags
);
343 if (unlikely(cor_get_neigh_state(nb
) == NEIGHBOR_STATE_KILLED
))
346 if (use_rcvd_connid
) {
347 BUG_ON((rcvd_connid
& (1 << 31)) == 0);
349 src_unconn_ll
->source
.in
.conn_id
= rcvd_connid
;
350 if (unlikely(cor_insert_connid(nb
, src_unconn_ll
) != 0)) {
351 src_unconn_ll
->source
.in
.conn_id
= 0;
355 src_unconn_ll
->source
.in
.cir
= kmem_cache_alloc(
356 cor_connid_reuse_slab
, GFP_ATOMIC
);
357 if (unlikely(src_unconn_ll
->source
.in
.cir
== 0))
360 memset(src_unconn_ll
->source
.in
.cir
, 0,
361 sizeof(struct cor_connid_reuse_item
));
363 if (unlikely(cor_connid_alloc(nb
, src_unconn_ll
))) {
364 kmem_cache_free(cor_connid_reuse_slab
,
365 src_unconn_ll
->source
.in
.cir
);
366 src_unconn_ll
->source
.in
.cir
= 0;
371 list_add_tail(&(trgt_unconn_ll
->target
.out
.nb_list
),
372 &(nb
->snd_conn_idle_list
));
373 cor_conn_kref_get(trgt_unconn_ll
, "neighbor_list");
375 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
379 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
380 trgt_unconn_ll
->targettype
= TARGET_UNCONNECTED
;
381 src_unconn_ll
->sourcetype
= SOURCE_UNCONNECTED
;
385 trgt_unconn_ll
->target
.out
.nb
= nb
;
386 src_unconn_ll
->source
.in
.nb
= nb
;
387 cor_nb_kref_get(nb
, "conn");
388 cor_nb_kref_get(nb
, "conn");
390 cor_conn_set_last_act(trgt_unconn_ll
);
392 INIT_LIST_HEAD(&(src_unconn_ll
->source
.in
.reorder_queue
));
393 INIT_LIST_HEAD(&(src_unconn_ll
->source
.in
.acks_pending
));
394 INIT_LIST_HEAD(&(trgt_unconn_ll
->target
.out
.retrans_list
));
396 cor_reset_seqno(trgt_unconn_ll
, 0);
397 if (use_rcvd_connid
== 0) {
398 get_random_bytes((char *)
399 &(trgt_unconn_ll
->target
.out
.seqno_nextsend
),
401 trgt_unconn_ll
->target
.out
.seqno_nextsend
));
402 trgt_unconn_ll
->target
.out
.seqno_acked
=
403 trgt_unconn_ll
->target
.out
.seqno_nextsend
;
404 trgt_unconn_ll
->target
.out
.seqno_windowlimit
=
405 trgt_unconn_ll
->target
.out
.seqno_nextsend
;
406 cor_reset_seqno(trgt_unconn_ll
,
407 trgt_unconn_ll
->target
.out
.seqno_nextsend
);
409 get_random_bytes((char *)
410 &(src_unconn_ll
->source
.in
.next_seqno
),
411 sizeof(src_unconn_ll
->source
.in
.next_seqno
));
412 src_unconn_ll
->source
.in
.window_seqnolimit
=
413 src_unconn_ll
->source
.in
.next_seqno
;
414 src_unconn_ll
->source
.in
.window_seqnolimit_remote
=
415 src_unconn_ll
->source
.in
.next_seqno
;
418 get_random_bytes((char *) &tmp
, 1);
419 trgt_unconn_ll
->target
.out
.priority_seqno
= (tmp
& 15);
421 src_unconn_ll
->source
.in
.priority_seqno
= 0;
423 trgt_unconn_ll
->target
.out
.jiffies_last_act
= jiffies
;
425 if (src_unconn_ll
->is_highlatency
)
426 trgt_unconn_ll
->target
.out
.jiffies_idle_since
=
428 BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS
* HZ
) <<
429 JIFFIES_LAST_IDLE_SHIFT
;
431 trgt_unconn_ll
->target
.out
.jiffies_idle_since
=
432 jiffies
<< JIFFIES_LAST_IDLE_SHIFT
;
434 trgt_unconn_ll
->target
.out
.remote_bufsize_changerate
= 64;
436 if (src_unconn_ll
->is_client
)
437 atomic_inc(&cor_num_conns
);
439 if (use_rcvd_connid
== 0)
440 cor_update_windowlimit(src_unconn_ll
);
445 void cor_conn_init_sock_source(struct cor_conn
*cn
)
448 cn
->sourcetype
= SOURCE_SOCK
;
449 memset(&(cn
->source
.sock
), 0, sizeof(cn
->source
.sock
));
450 cn
->source
.sock
.priority
= cor_priority_max();
451 cn
->source
.sock
.snd_speed
.jiffies_last_refresh
= jiffies
;
452 cn
->source
.sock
.snd_speed
.flushed
= 1;
453 timer_setup(&(cn
->source
.sock
.keepalive_timer
),
454 cor_keepalive_req_timerfunc
, 0);
457 void cor_conn_init_sock_target(struct cor_conn
*cn
)
460 cn
->targettype
= TARGET_SOCK
;
461 memset(&(cn
->target
.sock
), 0, sizeof(cn
->target
.sock
));
462 cor_reset_seqno(cn
, 0);
465 static void _cor_alloc_conn(struct cor_conn
*cn
, __u8 is_highlatency
)
467 cn
->sourcetype
= SOURCE_UNCONNECTED
;
468 cn
->targettype
= TARGET_UNCONNECTED
;
472 spin_lock_init(&(cn
->rcv_lock
));
474 cor_databuf_init(cn
);
476 cor_bufsize_init(cn
, 0);
478 if (is_highlatency
== 0) {
479 cn
->is_highlatency
= 0;
480 cn
->bufsize
.bufsize
=
481 (BUFSIZE_INITIAL_LOWLAT
<< BUFSIZE_SHIFT
);
483 cn
->is_highlatency
= 1;
484 cn
->bufsize
.bufsize
=
485 (BUFSIZE_INITIAL_HIGHLAT
<< BUFSIZE_SHIFT
);
489 struct cor_conn_bidir
* cor_alloc_conn(gfp_t allocflags
, __u8 is_highlatency
)
491 struct cor_conn_bidir
*cnb
;
493 cnb
= kmem_cache_alloc(cor_conn_slab
, allocflags
);
494 if (unlikely(cnb
== 0))
497 memset(cnb
, 0, sizeof(struct cor_conn_bidir
));
499 cnb
->cli
.is_client
= 1;
500 kref_init(&(cnb
->ref
));
502 _cor_alloc_conn(&(cnb
->cli
), is_highlatency
);
503 _cor_alloc_conn(&(cnb
->srv
), is_highlatency
);
508 static struct cor_sock
*cor_get_corsock_by_port(__be32 port
)
510 struct list_head
*curr
= cor_openports
.next
;
512 while (curr
!= &cor_openports
) {
513 struct cor_sock
*cs
= container_of(curr
, struct cor_sock
,
515 BUG_ON(cs
->type
!= CS_TYPE_LISTENER
);
516 if (cs
->data
.listener
.port
== port
)
525 __u32
cor_list_services(char *buf
, __u32 buflen
)
529 __u32 buf_offset
= 4;
531 struct list_head
*curr
;
535 * The variable length header rowcount need to be generated after the
536 * data. This is done by reserving the maximum space they could take. If
537 * they end up being smaller, the data is moved so that there is no gap.
541 BUG_ON(buflen
< buf_offset
);
543 spin_lock_bh(&cor_bindnodes
);
545 curr
= cor_openports
.next
;
546 while (curr
!= &cor_openports
) {
547 struct cor_sock
*cs
= container_of(curr
, struct cor_sock
,
549 BUG_ON(cs
->type
!= CS_TYPE_LISTENER
);
551 if (cs
->data
.listener
.publish_service
== 0)
554 if (unlikely(buf_offset
+ 4 < buf_offset
) ||
555 buf_offset
+ 4 > buflen
)
558 buf
[buf_offset
] = ((char *) &(cs
->data
.listener
.port
))[0];
559 buf
[buf_offset
+1] = ((char *) &(cs
->data
.listener
.port
))[1];
560 buf
[buf_offset
+2] = ((char *) &(cs
->data
.listener
.port
))[2];
561 buf
[buf_offset
+3] = ((char *) &(cs
->data
.listener
.port
))[3];
569 spin_unlock_bh(&cor_bindnodes
);
571 rc
= cor_encode_len(buf
, 4, cnt
);
576 memmove(buf
+ ((__u32
) rc
), buf
+4, buf_offset
);
578 return buf_offset
- 4 + ((__u32
) rc
);
582 void cor_set_publish_service(struct cor_sock
*cs
, __u8 value
)
584 BUG_ON (value
!= 0 && value
!= 1);
586 mutex_lock(&(cs
->lock
));
588 cs
->publish_service
= value
;
590 if (cs
->type
== CS_TYPE_LISTENER
) {
591 spin_lock_bh(&cor_bindnodes
);
592 cs
->data
.listener
.publish_service
= value
;
593 spin_unlock_bh(&cor_bindnodes
);
596 mutex_unlock(&(cs
->lock
));
599 void cor_close_port(struct cor_sock
*cs
)
601 mutex_lock(&(cs
->lock
));
602 if (unlikely(cs
->type
!= CS_TYPE_LISTENER
))
605 spin_lock_bh(&cor_bindnodes
);
607 list_del(&(cs
->data
.listener
.lh
));
609 while (list_empty(&(cs
->data
.listener
.conn_queue
)) == 0) {
610 struct cor_conn
*src_sock_o
= container_of(
611 cs
->data
.listener
.conn_queue
.next
,
612 struct cor_conn
, source
.sock
.cl_list
);
613 BUG_ON(src_sock_o
->source
.sock
.in_cl_list
== 0);
614 list_del(&(src_sock_o
->source
.sock
.cl_list
));
615 src_sock_o
->source
.sock
.in_cl_list
= 0;
616 spin_unlock_bh(&cor_bindnodes
);
618 cor_reset_conn(src_sock_o
);
620 spin_lock_bh(&cor_bindnodes
);
621 cor_conn_kref_put(src_sock_o
, "conn_queue");
624 spin_unlock_bh(&cor_bindnodes
);
626 mutex_unlock(&(cs
->lock
));
629 int cor_open_port(struct cor_sock
*cs_l
, __be32 port
)
633 spin_lock_bh(&cor_bindnodes
);
634 if (cor_get_corsock_by_port(port
) != 0) {
639 BUG_ON(cs_l
->type
!= CS_TYPE_UNCONNECTED
);
641 cs_l
->type
= CS_TYPE_LISTENER
;
642 cs_l
->data
.listener
.port
= port
;
643 cs_l
->data
.listener
.publish_service
= cs_l
->publish_service
;
645 /* kref is not used here */
646 INIT_LIST_HEAD(&(cs_l
->data
.listener
.conn_queue
));
648 list_add_tail((struct list_head
*) &(cs_l
->data
.listener
.lh
),
652 spin_unlock_bh(&cor_bindnodes
);
659 * rc == 2 port not open
660 * rc == 3 listener queue full
662 int cor_connect_port(struct cor_conn
*trgt_unconn_ll
, __be32 port
)
664 struct cor_conn
*src_unconn_ll
=
665 cor_get_conn_reversedir(trgt_unconn_ll
);
669 spin_lock_bh(&cor_bindnodes
);
671 cs
= cor_get_corsock_by_port(port
);
677 if (unlikely(cs
->data
.listener
.queue_len
>=
678 cs
->data
.listener
.queue_maxlen
)) {
679 if (cs
->data
.listener
.queue_maxlen
<= 0)
687 BUG_ON(trgt_unconn_ll
->is_client
!= 1);
688 cor_conn_init_sock_target(trgt_unconn_ll
);
689 cor_conn_init_sock_source(src_unconn_ll
);
691 list_add_tail(&(src_unconn_ll
->source
.sock
.cl_list
),
692 &(cs
->data
.listener
.conn_queue
));
693 src_unconn_ll
->source
.sock
.in_cl_list
= 1;
694 cor_conn_kref_get(src_unconn_ll
, "conn_queue");
696 cs
->data
.listener
.queue_len
++;
697 atomic_set(&(cs
->ready_to_accept
), 1);
699 cs
->sk
.sk_state_change(&(cs
->sk
));
702 spin_unlock_bh(&cor_bindnodes
);
708 * rc == 3 addr not found
709 * rc == 4 ==> connid allocation failed
710 * rc == 4 ==> control msg alloc failed
712 int cor_connect_neigh(struct cor_conn
*trgt_unconn_ll
, char *addr
,
715 struct cor_control_msg_out
*cm
;
716 struct cor_neighbor
*nb
= 0;
718 nb
= cor_find_neigh(addr
, addrlen
);
722 cm
= cor_alloc_control_msg(nb
, ACM_PRIORITY_MED
);
723 if (unlikely(cm
== 0)) {
724 cor_nb_kref_put(nb
, "stack");
728 if (unlikely(cor_conn_init_out(trgt_unconn_ll
, nb
, 0, 0))) {
729 cor_free_control_msg(cm
);
730 cor_nb_kref_put(nb
, "stack");
734 trgt_unconn_ll
->target
.out
.priority_last
=
735 _cor_conn_refresh_priority(trgt_unconn_ll
);
737 cor_send_connect_nb(cm
, trgt_unconn_ll
->target
.out
.conn_id
,
738 trgt_unconn_ll
->target
.out
.seqno_nextsend
,
739 cor_get_conn_reversedir(trgt_unconn_ll
)->
740 source
.in
.next_seqno
,
741 cor_get_conn_reversedir(trgt_unconn_ll
));
743 cor_nb_kref_put(nb
, "stack");
748 static void _cor_reset_conn(struct cor_conn
*cn_ll
, int trgt_out_resetneeded
)
750 unsigned long iflags
;
752 if (cn_ll
->sourcetype
== SOURCE_IN
) {
753 struct cor_neighbor
*nb
= cn_ll
->source
.in
.nb
;
755 if (cn_ll
->source
.in
.conn_id
!= 0 &&
756 (cn_ll
->source
.in
.conn_id
& (1 << 31)) != 0) {
757 BUG_ON(cn_ll
->source
.in
.cir
!= 0);
758 } else if (cn_ll
->source
.in
.conn_id
!= 0 &&
759 (cn_ll
->source
.in
.conn_id
& (1 << 31)) == 0) {
760 BUG_ON(cn_ll
->source
.in
.cir
== 0);
762 kref_init(&(cn_ll
->source
.in
.cir
->ref
));
763 cn_ll
->source
.in
.cir
->conn_id
=
764 cn_ll
->source
.in
.conn_id
;
765 cn_ll
->source
.in
.cir
->pingcnt
=
766 nb
->connid_reuse_pingcnt
;
768 spin_lock_irqsave(&(nb
->connid_reuse_lock
), iflags
);
769 cor_insert_connid_reuse(nb
, cn_ll
->source
.in
.cir
);
770 list_add_tail(&(cn_ll
->source
.in
.cir
->lh
),
771 &(nb
->connid_reuse_list
));
772 spin_unlock_irqrestore(&(nb
->connid_reuse_lock
),
775 cn_ll
->source
.in
.cir
= 0;
778 if (cn_ll
->source
.in
.conn_id
!= 0) {
779 spin_lock_irqsave(&(nb
->connid_lock
), iflags
);
780 rb_erase(&(cn_ll
->source
.in
.rbn
), &(nb
->connid_rb
));
781 spin_unlock_irqrestore(&(nb
->connid_lock
), iflags
);
782 cor_conn_kref_put_bug(cn_ll
, "connid_table");
784 cn_ll
->source
.in
.conn_id
= 0;
786 cor_free_ack_conns(cn_ll
);
789 if (cn_ll
->is_client
)
790 atomic_dec(&cor_num_conns
);
792 cor_reset_ooo_queue(cn_ll
);
793 } else if (cn_ll
->sourcetype
== SOURCE_SOCK
) {
794 if (likely(cn_ll
->source
.sock
.cs
!= 0)) {
795 cor_sk_write_space(cn_ll
->source
.sock
.cs
);
796 kref_put(&(cn_ll
->source
.sock
.cs
->ref
), cor_free_sock
);
797 cn_ll
->source
.sock
.cs
= 0;
799 if (unlikely(cn_ll
->source
.sock
.in_cl_list
!= 0)) {
800 list_del(&(cn_ll
->source
.sock
.cl_list
));
801 cn_ll
->source
.sock
.in_cl_list
= 0;
802 cor_conn_kref_put_bug(cn_ll
, "conn_queue");
805 if (cn_ll
->source
.sock
.socktype
== SOCKTYPE_MANAGED
) {
806 if (del_timer(&(cn_ll
->source
.sock
.keepalive_timer
)) !=
808 cor_conn_kref_put(cn_ll
, "keepalive_snd_timer");
812 if (cn_ll
->targettype
== TARGET_UNCONNECTED
) {
813 if (cn_ll
->target
.unconnected
.cmdparams
!= 0) {
814 kfree(cn_ll
->target
.unconnected
.cmdparams
);
815 cn_ll
->target
.unconnected
.cmdparams
= 0;
817 } else if (cn_ll
->targettype
== TARGET_OUT
) {
818 struct cor_neighbor
*nb
= cn_ll
->target
.out
.nb
;
820 spin_lock_irqsave(&(nb
->conn_list_lock
), iflags
);
821 list_del(&(cn_ll
->target
.out
.nb_list
));
822 spin_unlock_irqrestore(&(nb
->conn_list_lock
), iflags
);
823 cor_conn_kref_put_bug(cn_ll
, "neighbor_list");
825 if (trgt_out_resetneeded
&& cn_ll
->target
.out
.conn_id
!= 0) {
826 cor_send_reset_conn(cn_ll
->target
.out
.nb
,
827 cn_ll
->target
.out
.conn_id
, 0);
830 cn_ll
->target
.out
.conn_id
= 0;
832 cor_cancel_all_conn_retrans(cn_ll
);
834 cor_qos_remove_conn(cn_ll
);
835 } else if (cn_ll
->targettype
== TARGET_SOCK
) {
836 if (likely(cn_ll
->target
.sock
.cs
!= 0)) {
837 if (cn_ll
->target
.sock
.socktype
== SOCKTYPE_RAW
) {
838 cor_sk_data_ready(cn_ll
->target
.sock
.cs
);
840 cor_mngdsocket_readfromconn_fromatomic(
841 cn_ll
->target
.sock
.cs
);
843 kref_put(&(cn_ll
->target
.sock
.cs
->ref
), cor_free_sock
);
844 cn_ll
->target
.sock
.cs
= 0;
845 cn_ll
->target
.sock
.rcv_buf
= 0;
849 cor_databuf_ackdiscard(cn_ll
);
851 cor_account_bufspace(cn_ll
);
853 cor_connreset_priority(cn_ll
);
856 void cor_reset_conn_locked(struct cor_conn_bidir
*cnb_ll
)
858 BUG_ON(cnb_ll
->cli
.isreset
<= 1 && cnb_ll
->srv
.isreset
== 2);
859 BUG_ON(cnb_ll
->cli
.isreset
== 2 && cnb_ll
->srv
.isreset
<= 1);
861 if (cnb_ll
->cli
.isreset
<= 1) {
862 __u8 old_isreset_cli
= cnb_ll
->cli
.isreset
;
863 __u8 old_isreset_srv
= cnb_ll
->srv
.isreset
;
865 cnb_ll
->cli
.isreset
= 2;
866 cnb_ll
->srv
.isreset
= 2;
868 _cor_reset_conn(&(cnb_ll
->cli
), old_isreset_cli
== 0);
869 _cor_reset_conn(&(cnb_ll
->srv
), old_isreset_srv
== 0);
873 void cor_reset_conn(struct cor_conn
*cn
)
875 struct cor_conn_bidir
*cnb
= cor_get_conn_bidir(cn
);
877 cor_conn_kref_get(&(cnb
->cli
), "stack");
878 cor_conn_kref_get(&(cnb
->srv
), "stack");
880 spin_lock_bh(&(cnb
->cli
.rcv_lock
));
881 spin_lock_bh(&(cnb
->srv
.rcv_lock
));
883 cor_reset_conn_locked(cnb
);
885 spin_unlock_bh(&(cnb
->srv
.rcv_lock
));
886 spin_unlock_bh(&(cnb
->cli
.rcv_lock
));
888 cor_conn_kref_put_bug(&(cnb
->cli
), "stack");
889 cor_conn_kref_put(&(cnb
->srv
), "stack");
892 static int __init
cor_init(void)
896 struct cor_conn_bidir cb
;
900 printk(KERN_ERR
"sizeof cor_conn_bidir: %u\n", (__u32
) sizeof(cb
));
901 printk(KERN_ERR
"sizeof conn: %u\n", (__u32
) sizeof(c
));
902 printk(KERN_ERR
" conn.source: %u\n", (__u32
) sizeof(c
.source
));
903 printk(KERN_ERR
" conn.source.in: %u\n", (__u32
) sizeof(c
.source
.in
));
904 printk(KERN_ERR
" conn.source.sock: %u\n", (__u32
) sizeof(c
.source
.sock
));
905 printk(KERN_ERR
" conn.target: %u\n", (__u32
) sizeof(c
.target
));
906 printk(KERN_ERR
" conn.target.out: %u\n",
907 (__u32
) sizeof(c
.target
.out
));
908 printk(KERN_ERR
" conn.target.sock: %u\n",
909 (__u32
) sizeof(c
.target
.sock
));
910 printk(KERN_ERR
" conn.data_buf: %u\n", (__u32
) sizeof(c
.data_buf
));
911 printk(KERN_ERR
" conn.bufsize: %u\n", (__u32
) sizeof(c
.bufsize
));
913 printk(KERN_ERR
"sizeof cor_neighbor: %u\n",
914 (__u32
) sizeof(struct cor_neighbor
));
916 printk(KERN_ERR
"sizeof mutex: %u\n", (__u32
) sizeof(struct mutex
));
917 printk(KERN_ERR
"sizeof spinlock: %u\n", (__u32
) sizeof(spinlock_t
));
918 printk(KERN_ERR
"sizeof kref: %u\n", (__u32
) sizeof(struct kref
));
919 printk(KERN_ERR
"sizeof list_head: %u\n",
920 (__u32
) sizeof(struct list_head
));
921 printk(KERN_ERR
"sizeof rb_root: %u\n", (__u32
) sizeof(struct rb_root
));
922 printk(KERN_ERR
"sizeof rb_node: %u\n", (__u32
) sizeof(struct rb_node
));
925 rc
= cor_util_init();
926 if (unlikely(rc
!= 0))
929 cor_conn_slab
= kmem_cache_create("cor_conn",
930 sizeof(struct cor_conn_bidir
), 8, 0, 0);
931 if (unlikely(cor_conn_slab
== 0))
934 cor_connid_reuse_slab
= kmem_cache_create("cor_connid_reuse",
935 sizeof(struct cor_connid_reuse_item
), 8, 0, 0);
936 if (unlikely(cor_connid_reuse_slab
== 0))
940 atomic_set(&cor_num_conns
, 0);
943 rc
= cor_forward_init();
944 if (unlikely(rc
!= 0))
947 rc
= cor_kgen_init();
948 if (unlikely(rc
!= 0))
952 if (unlikely(rc
!= 0))
956 if (unlikely(rc
!= 0))
959 rc
= cor_neighbor_init();
960 if (unlikely(rc
!= 0))
963 rc
= cor_neigh_ann_rcv_init();
964 if (unlikely(rc
!= 0))
968 if (unlikely(rc
!= 0))
972 if (unlikely(rc
!= 0))
975 rc
= cor_sock_managed_init1();
976 if (unlikely(rc
!= 0))
979 rc
= cor_conn_src_sock_init1();
980 if (unlikely(rc
!= 0))
983 rc
= cor_sock_init2();
984 if (unlikely(rc
!= 0))
988 if (unlikely(rc
!= 0))
994 static void __exit
cor_exit(void)
998 cor_conn_src_sock_exit1();
1001 flush_scheduled_work();
1004 cor_neighbor_exit2();
1005 cor_neigh_ann_rcv_exit2();
1009 cor_forward_exit2();
1011 BUG_ON(atomic_read(&cor_num_conns
) != 0);
1013 kmem_cache_destroy(cor_conn_slab
);
1016 kmem_cache_destroy(cor_connid_reuse_slab
);
1017 cor_connid_reuse_slab
= 0;
1020 module_init(cor_init
);
1021 module_exit(cor_exit
);
1022 MODULE_LICENSE("GPL");