split dev_queue
[cor.git] / net / cor / conn.c
blob21ca087b10c6bd695280e37f8717084da3186be6
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
18 #include "cor.h"
20 DEFINE_SPINLOCK(cor_bindnodes);
22 static LIST_HEAD(cor_openports);
24 static struct kmem_cache *cor_conn_slab;
25 static struct kmem_cache *cor_conn_ed_slab;
26 struct kmem_cache *cor_connid_reuse_slab;
28 atomic_t cor_num_conns;
30 int cor_new_incoming_conn_allowed(struct cor_neighbor *nb)
32 /**
33 * MAX_CONNS is only loosly enforced for now
34 * (cor_num_conns is not checked and incremented at the same time)
36 if (atomic_read(&cor_num_conns) >= MAX_CONNS)
37 return 0;
38 else
39 return 1;
42 static __u32 cor_get_prio_in(struct cor_conn *cn_lx)
44 __u64 priority = cor_dec_priority(cn_lx->src.in.priority);
45 __u64 ret;
47 if (PRIORITY_IN_MULITPLIER_PERCENT >= 100)
48 ret = priority;
49 else if (unlikely(priority > U64_MAX / 100))
50 ret = (priority / 100) * PRIORITY_IN_MULITPLIER_PERCENT;
51 else
52 ret = (priority * PRIORITY_IN_MULITPLIER_PERCENT) / 100;
54 BUG_ON(ret > U32_MAX);
56 return (__u32) ret;
59 static __u32 cor_conn_prio_sum_limit(__u32 priority, __u64 priority_sum)
61 __u32 shiftcnt = 0;
63 while ((PRIORITY_SUM_IN_MAX >> shiftcnt) > U32_MAX) {
64 shiftcnt++;
67 return div_u64(((__u64) priority) * (PRIORITY_SUM_IN_MAX >> shiftcnt),
68 (priority_sum >> shiftcnt));
71 __u32 _cor_conn_refresh_priority(struct cor_conn *cn_lx)
73 BUG_ON(cn_lx->is_client == 0);
75 if (cn_lx->sourcetype == SOURCE_IN) {
76 __u32 priority = cor_get_prio_in(cn_lx);
77 __u64 priority_sum = atomic64_read(
78 &cn_lx->src.in.nb->priority_sum);
79 if (priority_sum > PRIORITY_SUM_IN_MAX) {
80 return cor_conn_prio_sum_limit(priority, priority_sum);
81 } else {
82 return priority;
84 } else if (cn_lx->sourcetype == SOURCE_SOCK) {
85 return cn_lx->src.sock.ed->priority;
86 } else {
87 BUG();
88 return 0;
92 __u32 cor_conn_refresh_priority(struct cor_conn *cn, int locked)
94 struct cor_conn *cn_reversedir = cor_get_conn_reversedir(cn);
95 __u32 priority = 0;
97 if (likely(locked == 0)) {
98 if (cn->is_client == 0)
99 return cor_conn_refresh_priority(cn_reversedir, 0);
101 spin_lock_bh(&cn->rcv_lock);
102 spin_lock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
103 } else {
104 BUG_ON(cn->is_client == 0);
107 if (unlikely(cn->isreset != 0) || cn->targettype != TARGET_OUT)
108 goto out;
110 priority = _cor_conn_refresh_priority(cn);
112 if (cn->trgt.out.priority_send_allowed != 0) {
113 __u16 priority_enc = cor_enc_priority(priority);
115 if (priority_enc != cn->trgt.out.priority_last ||
116 cn->is_highlatency_send_needed != 0)
117 cor_send_priority(cn, priority_enc);
120 out:
121 if (likely(locked == 0)) {
122 spin_unlock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
123 spin_unlock_bh(&cn->rcv_lock);
126 return priority;
129 static void _cor_set_conn_in_priority(struct cor_conn *src_in_lx,
130 __u16 newpriority)
132 struct cor_neighbor *nb = src_in_lx->src.in.nb;
133 __u16 oldpriority = src_in_lx->src.in.priority;
135 cor_update_atomic_sum(&nb->priority_sum, cor_dec_priority(oldpriority),
136 cor_dec_priority(newpriority));
137 src_in_lx->src.in.priority = newpriority;
140 void cor_set_conn_is_highlatency(struct cor_conn *cn, __u8 is_highlatency,
141 int locked, int call_refresh_priority)
143 BUG_ON(cn->is_client == 0);
145 if (locked == 0) {
146 spin_lock_bh(&cn->rcv_lock);
147 spin_lock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
150 if (unlikely(cn->isreset != 0))
151 goto out;
153 if (cn->is_highlatency != is_highlatency) {
154 cn->is_highlatency = is_highlatency;
155 cor_get_conn_reversedir(cn)->is_highlatency = is_highlatency;
156 cn->is_highlatency_send_needed = 1;
158 if (cn->targettype == TARGET_OUT)
159 cor_get_conn_idletime(cn);
161 if (cor_get_conn_reversedir(cn)->targettype == TARGET_OUT)
162 cor_get_conn_idletime(cor_get_conn_reversedir(cn));
164 if (call_refresh_priority)
165 cor_conn_refresh_priority(cn, 1);
168 out:
169 if (locked == 0) {
170 spin_unlock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
171 spin_unlock_bh(&cn->rcv_lock);
175 void cor_set_conn_in_priority(struct cor_neighbor *nb, __u32 conn_id,
176 struct cor_conn *src_in, __u8 priority_seqno, __u16 priority,
177 __u8 is_highlatency)
179 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
181 BUG_ON(priority > 4095);
183 if (unlikely(src_in->is_client == 0))
184 return;
186 spin_lock_bh(&src_in->rcv_lock);
187 spin_lock_bh(&trgt_out->rcv_lock);
189 if (unlikely(cor_is_conn_in(src_in, nb, conn_id) == 0))
190 goto out;
192 if (src_in->src.in.priority_seqno != priority_seqno)
193 goto out;
194 src_in->src.in.priority_seqno =
195 (src_in->src.in.priority_seqno + 1) & 15;
197 cor_set_conn_is_highlatency(src_in, is_highlatency, 1, 0);
199 _cor_set_conn_in_priority(src_in, priority);
200 cor_conn_refresh_priority(src_in, 1);
202 out:
203 spin_unlock_bh(&trgt_out->rcv_lock);
204 spin_unlock_bh(&src_in->rcv_lock);
207 static void cor_connreset_priority(struct cor_conn *cn_lx)
209 if (cn_lx->is_client == 0)
210 return;
212 if (cn_lx->sourcetype == SOURCE_IN)
213 _cor_set_conn_in_priority(cn_lx, 0);
217 static void cor_conn_move_to_idle_list(struct cor_conn *trgt_out_lx)
219 unsigned long iflags;
220 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
222 BUG_ON(trgt_out_lx->data_buf.datasize != 0);
224 trgt_out_lx->trgt.out.nblist_busy_remaining = 0;
225 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
227 spin_lock_irqsave(&nb->conn_list_lock, iflags);
229 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
231 list_del(&trgt_out_lx->trgt.out.nb_list);
232 list_add_tail(&trgt_out_lx->trgt.out.nb_list, &nb->snd_conn_idle_list);
234 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
236 trgt_out_lx->trgt.out.in_nb_busy_list = 0;
239 static void cor_conn_move_to_busy_list(struct cor_conn *trgt_out_lx)
241 unsigned long iflags;
242 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
244 BUG_ON(trgt_out_lx->data_buf.datasize == 0);
246 trgt_out_lx->trgt.out.nblist_busy_remaining =
247 trgt_out_lx->data_buf.datasize;
249 spin_lock_irqsave(&nb->conn_list_lock, iflags);
251 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
253 list_del(&trgt_out_lx->trgt.out.nb_list);
254 list_add_tail(&trgt_out_lx->trgt.out.nb_list,
255 &nb->snd_conn_busy_list);
257 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
259 trgt_out_lx->trgt.out.in_nb_busy_list = 1;
262 void cor_conn_set_last_act(struct cor_conn *trgt_out_lx)
264 if (unlikely(trgt_out_lx->isreset != 0))
265 return;
267 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
269 if (unlikely(trgt_out_lx->trgt.out.in_nb_busy_list == 0)) {
270 if (unlikely(trgt_out_lx->data_buf.datasize != 0)) {
271 cor_conn_move_to_busy_list(trgt_out_lx);
272 } else if (unlikely(time_after(jiffies,
273 trgt_out_lx->trgt.out.jiffies_last_act +
274 HZ * CONN_ACTIVITY_UPDATEINTERVAL_SEC))) {
275 cor_conn_move_to_idle_list(trgt_out_lx);
277 } else {
278 if (unlikely(trgt_out_lx->data_buf.datasize == 0)) {
279 cor_conn_move_to_idle_list(trgt_out_lx);
280 } else if (unlikely(
281 trgt_out_lx->trgt.out.nblist_busy_remaining ==
282 0)) {
283 cor_conn_move_to_busy_list(trgt_out_lx);
287 if (unlikely(time_after(jiffies,
288 trgt_out_lx->trgt.out.jiffies_last_act +
289 (CONN_BUSY_INACTIVITY_TIMEOUT_SEC * HZ) / 4))) {
290 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
291 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
292 trgt_out_lx->bufsize.bufsize >>
293 BUFSIZE_SHIFT);
295 if (trgt_out_lx->bufsize.state != BUFSIZE_DECR &&
296 trgt_out_lx->bufsize.state !=
297 BUFSIZE_DECR_FAST) {
298 trgt_out_lx->bufsize.state = BUFSIZE_DECR;
299 trgt_out_lx->bufsize.act.decr.size_start =
300 trgt_out_lx->bufsize.bufsize;
306 static void _cor_free_conn(struct cor_conn *cn)
308 BUG_ON(cn->isreset == 0);
310 if (cn->sourcetype == SOURCE_IN) {
311 WARN_ONCE(list_empty(&cn->src.in.reorder_queue) == 0,
312 "cor_free_conn(): cn->src.in.reorder_queue is not empty");
313 WARN_ONCE(list_empty(&cn->src.in.acks_pending) == 0,
314 "cor_free_conn():cn->src.in.acks_pending is not empty");
316 WARN_ONCE(cn->src.in.conn_id != 0,
317 "cor_free_conn(): cn->src.in.conn_id is not 0");
318 cor_nb_kref_put(cn->src.in.nb, "conn");
319 cn->src.in.nb = 0;
320 } else if (cn->sourcetype == SOURCE_SOCK) {
321 BUG_ON(cn->src.sock.ed == 0);
322 BUG_ON(cn->src.sock.ed->src_sock != cn);
323 memset(cn->src.sock.ed, 9 * 16 + 10,
324 sizeof(struct cor_conn_src_sock_extradata));
325 kmem_cache_free(cor_conn_ed_slab, cn->src.sock.ed);
326 cn->src.sock.ed = 0;
329 if (cn->targettype == TARGET_OUT) {
330 WARN_ONCE(list_empty(&cn->trgt.out.retrans_list) == 0,
331 "cor_free_conn(): cn->trgt.out.retrans_list is not empty");
332 WARN_ONCE(cn->trgt.out.rb.in_queue != RB_INQUEUE_FALSE,
333 "cor_free_conn(): cn->trgt.out.rb.in_queue is not RB_INQUEUE_FALSE");
334 WARN_ONCE(cn->trgt.out.conn_id != 0,
335 "cor_free_conn(): cn->trgt.out.conn_id is not 0");
336 cor_nb_kref_put(cn->trgt.out.nb, "conn");
337 cn->trgt.out.nb = 0;
340 WARN_ONCE(cn->data_buf.datasize != 0,
341 "cor_free_conn(): cn->data_buf.datasize is not 0");
342 WARN_ONCE(cn->data_buf.overhead != 0,
343 "cor_free_conn(): cn->data_buf.overhead is not 0");
344 WARN_ONCE(list_empty(&cn->data_buf.items) == 0,
345 "cor_free_conn(): cn->data_buf.items is not empty");
346 WARN_ONCE(cn->data_buf.nextread != 0,
347 "cor_free_conn(): cn->data_buf.nextread is not 0");
350 void cor_free_conn(struct kref *ref)
352 struct cor_conn_bidir *cnb = container_of(ref, struct cor_conn_bidir,
353 ref);
355 _cor_free_conn(&cnb->cli);
356 _cor_free_conn(&cnb->srv);
358 memset(cnb, 9 * 16 + 10, sizeof(struct cor_conn_bidir));
359 kmem_cache_free(cor_conn_slab, cnb);
363 * rc == 0 ==> ok
364 * rc == 1 ==> connid_reuse or connid allocation failed
366 int cor_conn_init_out(struct cor_conn *trgt_unconn_ll, struct cor_neighbor *nb,
367 __u32 rcvd_connid, int use_rcvd_connid)
369 unsigned long iflags;
370 struct cor_conn *src_unconn_ll =
371 cor_get_conn_reversedir(trgt_unconn_ll);
372 __u8 tmp;
374 BUG_ON(trgt_unconn_ll->targettype != TARGET_UNCONNECTED);
375 BUG_ON(src_unconn_ll == 0);
376 BUG_ON(src_unconn_ll->sourcetype != SOURCE_UNCONNECTED);
378 memset(&trgt_unconn_ll->trgt.out, 0,
379 sizeof(trgt_unconn_ll->trgt.out));
380 memset(&src_unconn_ll->src.in, 0, sizeof(src_unconn_ll->src.in));
382 trgt_unconn_ll->targettype = TARGET_OUT;
383 src_unconn_ll->sourcetype = SOURCE_IN;
385 spin_lock_irqsave(&nb->conn_list_lock, iflags);
386 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
387 goto out_err;
389 if (use_rcvd_connid) {
390 BUG_ON((rcvd_connid & (1 << 31)) == 0);
392 src_unconn_ll->src.in.conn_id = rcvd_connid;
393 if (unlikely(cor_insert_connid(nb, src_unconn_ll) != 0)) {
394 src_unconn_ll->src.in.conn_id = 0;
395 goto out_err;
397 } else {
398 if (unlikely(cor_connid_alloc(nb, src_unconn_ll))) {
399 goto out_err;
403 list_add_tail(&trgt_unconn_ll->trgt.out.nb_list,
404 &nb->snd_conn_idle_list);
405 cor_conn_kref_get(trgt_unconn_ll, "neighbor_list");
407 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
409 if (0) {
410 out_err:
411 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
412 trgt_unconn_ll->targettype = TARGET_UNCONNECTED;
413 src_unconn_ll->sourcetype = SOURCE_UNCONNECTED;
414 return 1;
417 trgt_unconn_ll->trgt.out.nb = nb;
418 src_unconn_ll->src.in.nb = nb;
419 cor_nb_kref_get(nb, "conn");
420 cor_nb_kref_get(nb, "conn");
422 cor_conn_set_last_act(trgt_unconn_ll);
424 INIT_LIST_HEAD(&src_unconn_ll->src.in.reorder_queue);
425 INIT_LIST_HEAD(&src_unconn_ll->src.in.acks_pending);
426 INIT_LIST_HEAD(&trgt_unconn_ll->trgt.out.retrans_list);
428 cor_reset_seqno(trgt_unconn_ll, 0);
429 if (use_rcvd_connid == 0) {
430 get_random_bytes((char *)
431 &trgt_unconn_ll->trgt.out.seqno_nextsend,
432 sizeof(
433 trgt_unconn_ll->trgt.out.seqno_nextsend));
434 trgt_unconn_ll->trgt.out.seqno_acked =
435 trgt_unconn_ll->trgt.out.seqno_nextsend;
436 trgt_unconn_ll->trgt.out.seqno_windowlimit =
437 trgt_unconn_ll->trgt.out.seqno_nextsend;
438 cor_reset_seqno(trgt_unconn_ll,
439 trgt_unconn_ll->trgt.out.seqno_nextsend);
441 get_random_bytes((char *) &src_unconn_ll->src.in.next_seqno,
442 sizeof(src_unconn_ll->src.in.next_seqno));
443 src_unconn_ll->src.in.window_seqnolimit =
444 src_unconn_ll->src.in.next_seqno;
445 src_unconn_ll->src.in.window_seqnolimit_remote =
446 src_unconn_ll->src.in.next_seqno;
449 get_random_bytes((char *) &tmp, 1);
450 trgt_unconn_ll->trgt.out.priority_seqno = (tmp & 15);
452 src_unconn_ll->src.in.priority_seqno = 0;
454 trgt_unconn_ll->trgt.out.jiffies_last_act = jiffies;
456 if (src_unconn_ll->is_highlatency)
457 trgt_unconn_ll->trgt.out.jiffies_idle_since =
458 (jiffies -
459 BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * HZ) <<
460 JIFFIES_LAST_IDLE_SHIFT;
461 else
462 trgt_unconn_ll->trgt.out.jiffies_idle_since =
463 jiffies << JIFFIES_LAST_IDLE_SHIFT;
465 trgt_unconn_ll->trgt.out.remote_bufsize_changerate = 64;
467 if (src_unconn_ll->is_client)
468 atomic_inc(&cor_num_conns);
470 if (use_rcvd_connid == 0)
471 cor_update_windowlimit(src_unconn_ll);
473 return 0;
476 int cor_conn_init_sock_source(struct cor_conn *cn)
478 struct cor_conn_src_sock_extradata *ed;
480 BUG_ON(cn == 0);
481 BUG_ON(cn->sourcetype != SOURCE_UNCONNECTED);
483 ed = kmem_cache_alloc(cor_conn_ed_slab, GFP_ATOMIC);
484 if (unlikely(ed == 0))
485 return 1;
486 memset(ed, 0, sizeof(struct cor_conn_src_sock_extradata));
488 cn->sourcetype = SOURCE_SOCK;
489 memset(&cn->src.sock, 0, sizeof(cn->src.sock));
490 cn->src.sock.ed = ed;
491 cn->src.sock.ed->src_sock = cn;
492 cn->src.sock.ed->priority = cor_priority_max();
494 cn->src.sock.ed->snd_speed.state = SNDSPEED_ACTIVE;
495 cn->src.sock.ed->snd_speed.flushed = 1;
496 cn->src.sock.ed->snd_speed.jiffies_last_refresh = jiffies;
497 cn->src.sock.ed->snd_speed.bytes_sent = 0;
498 cn->src.sock.ed->snd_speed.speed = SND_SPEED_START;
499 cn->src.sock.ed->snd_speed.speed_limited = SND_SPEED_START;
501 timer_setup(&cn->src.sock.ed->keepalive_timer,
502 cor_keepalive_req_timerfunc, 0);
504 return 0;
507 void cor_conn_init_sock_target(struct cor_conn *cn)
509 BUG_ON(cn == 0);
510 cn->targettype = TARGET_SOCK;
511 memset(&cn->trgt.sock, 0, sizeof(cn->trgt.sock));
512 cor_reset_seqno(cn, 0);
515 static void _cor_alloc_conn(struct cor_conn *cn, __u8 is_highlatency)
517 cn->sourcetype = SOURCE_UNCONNECTED;
518 cn->targettype = TARGET_UNCONNECTED;
520 cn->isreset = 0;
522 spin_lock_init(&cn->rcv_lock);
524 cor_databuf_init(cn);
526 cor_bufsize_init(cn, 0);
528 if (is_highlatency == 0) {
529 cn->is_highlatency = 0;
530 cn->bufsize.bufsize =
531 (BUFSIZE_INITIAL_LOWLAT << BUFSIZE_SHIFT);
532 } else {
533 cn->is_highlatency = 1;
534 cn->bufsize.bufsize =
535 (BUFSIZE_INITIAL_HIGHLAT << BUFSIZE_SHIFT);
539 struct cor_conn_bidir *cor_alloc_conn(gfp_t allocflags, __u8 is_highlatency)
541 struct cor_conn_bidir *cnb;
543 cnb = kmem_cache_alloc(cor_conn_slab, allocflags);
544 if (unlikely(cnb == 0))
545 return 0;
547 memset(cnb, 0, sizeof(struct cor_conn_bidir));
549 cnb->cli.is_client = 1;
550 kref_init(&cnb->ref);
552 _cor_alloc_conn(&cnb->cli, is_highlatency);
553 _cor_alloc_conn(&cnb->srv, is_highlatency);
555 return cnb;
558 static struct cor_sock *cor_get_corsock_by_port(__be32 port)
560 struct list_head *curr = cor_openports.next;
562 while (curr != &cor_openports) {
563 struct cor_sock *cs = container_of(curr, struct cor_sock,
564 data.listener.lh);
565 BUG_ON(cs->type != CS_TYPE_LISTENER);
566 if (cs->data.listener.port == port)
567 return cs;
569 curr = curr->next;
572 return 0;
575 __u32 cor_list_services(char *buf, __u32 buflen)
577 __u32 cnt = 0;
579 __u32 buf_offset = 4;
581 struct list_head *curr;
582 int rc;
585 * The variable length header rowcount need to be generated after the
586 * data. This is done by reserving the maximum space they could take. If
587 * they end up being smaller, the data is moved so that there is no gap.
590 BUG_ON(buf == 0);
591 BUG_ON(buflen < buf_offset);
593 spin_lock_bh(&cor_bindnodes);
595 curr = cor_openports.next;
596 while (curr != &cor_openports) {
597 struct cor_sock *cs = container_of(curr, struct cor_sock,
598 data.listener.lh);
599 BUG_ON(cs->type != CS_TYPE_LISTENER);
601 if (cs->data.listener.publish_service == 0)
602 goto cont;
604 if (unlikely(buf_offset + 4 < buf_offset) ||
605 buf_offset + 4 > buflen)
606 break;
608 buf[buf_offset] = ((char *) &cs->data.listener.port)[0];
609 buf[buf_offset + 1] = ((char *) &cs->data.listener.port)[1];
610 buf[buf_offset + 2] = ((char *) &cs->data.listener.port)[2];
611 buf[buf_offset + 3] = ((char *) &cs->data.listener.port)[3];
612 buf_offset += 4;
613 cnt++;
615 cont:
616 curr = curr->next;
619 spin_unlock_bh(&cor_bindnodes);
621 rc = cor_encode_len(buf, 4, cnt);
622 BUG_ON(rc <= 0);
623 BUG_ON(rc > 4);
625 if (likely(rc < 4))
626 memmove(buf + ((__u32) rc), buf + 4, buf_offset);
628 return buf_offset - 4 + ((__u32) rc);
631 void cor_set_publish_service(struct cor_sock *cs, __u8 value)
633 BUG_ON(value != 0 && value != 1);
635 mutex_lock(&cs->lock);
637 cs->publish_service = value;
639 if (cs->type == CS_TYPE_LISTENER) {
640 spin_lock_bh(&cor_bindnodes);
641 cs->data.listener.publish_service = value;
642 spin_unlock_bh(&cor_bindnodes);
645 mutex_unlock(&cs->lock);
648 void cor_close_port(struct cor_sock *cs)
650 mutex_lock(&cs->lock);
651 if (unlikely(cs->type != CS_TYPE_LISTENER))
652 goto out;
654 spin_lock_bh(&cor_bindnodes);
656 list_del(&cs->data.listener.lh);
658 while (list_empty(&cs->data.listener.conn_queue) == 0) {
659 struct cor_conn *src_sock_o = container_of(
660 cs->data.listener.conn_queue.next,
661 struct cor_conn, src.sock.cl_list);
662 BUG_ON(src_sock_o->src.sock.in_cl_list == 0);
663 list_del(&src_sock_o->src.sock.cl_list);
664 src_sock_o->src.sock.in_cl_list = 0;
665 spin_unlock_bh(&cor_bindnodes);
667 cor_reset_conn(src_sock_o);
669 spin_lock_bh(&cor_bindnodes);
670 cor_conn_kref_put(src_sock_o, "conn_queue");
673 spin_unlock_bh(&cor_bindnodes);
674 out:
675 mutex_unlock(&cs->lock);
678 int cor_open_port(struct cor_sock *cs_l, __be32 port)
680 int rc = 0;
682 spin_lock_bh(&cor_bindnodes);
683 if (cor_get_corsock_by_port(port) != 0) {
684 rc = -EADDRINUSE;
685 goto out;
688 BUG_ON(cs_l->type != CS_TYPE_UNCONNECTED);
690 cs_l->type = CS_TYPE_LISTENER;
691 cs_l->data.listener.port = port;
692 cs_l->data.listener.publish_service = cs_l->publish_service;
694 /* kref is not used here */
695 INIT_LIST_HEAD(&cs_l->data.listener.conn_queue);
697 list_add_tail((struct list_head *) &cs_l->data.listener.lh,
698 &cor_openports);
700 out:
701 spin_unlock_bh(&cor_bindnodes);
703 return rc;
706 int cor_connect_port(struct cor_conn *trgt_unconn_ll, __be32 port)
708 struct cor_conn *src_unconn_ll =
709 cor_get_conn_reversedir(trgt_unconn_ll);
710 struct cor_sock *cs;
711 int rc = CONNECT_PORT_OK;
713 spin_lock_bh(&cor_bindnodes);
715 cs = cor_get_corsock_by_port(port);
716 if (cs == 0) {
717 rc = CONNECT_PORT_PORTCLOSED;
718 goto out;
721 if (unlikely(cs->data.listener.queue_len >=
722 cs->data.listener.queue_maxlen)) {
723 if (cs->data.listener.queue_maxlen <= 0)
724 rc = CONNECT_PORT_PORTCLOSED;
725 else
726 rc = CONNECT_PORT_TEMPORARILY_OUT_OF_RESOURCES;
728 goto out;
731 BUG_ON(trgt_unconn_ll->is_client != 1);
733 if (cor_conn_init_sock_source(src_unconn_ll) != 0) {
734 rc = CONNECT_PORT_TEMPORARILY_OUT_OF_RESOURCES;
735 goto out;
737 cor_conn_init_sock_target(trgt_unconn_ll);
740 list_add_tail(&src_unconn_ll->src.sock.cl_list,
741 &cs->data.listener.conn_queue);
742 src_unconn_ll->src.sock.in_cl_list = 1;
743 cor_conn_kref_get(src_unconn_ll, "conn_queue");
745 cs->data.listener.queue_len++;
746 atomic_set(&cs->ready_to_accept, 1);
747 barrier();
748 cs->sk.sk_state_change(&cs->sk);
750 out:
751 spin_unlock_bh(&cor_bindnodes);
752 return rc;
756 * rc == 0 connected
757 * rc == 3 addr not found
758 * rc == 4 ==> connid allocation failed
759 * rc == 4 ==> control msg alloc failed
761 int cor_connect_neigh(struct cor_conn *trgt_unconn_ll, __be64 addr)
763 struct cor_control_msg_out *cm;
764 struct cor_neighbor *nb = 0;
766 nb = cor_find_neigh(addr);
767 if (nb == 0)
768 return 3;
770 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
771 if (unlikely(cm == 0)) {
772 cor_nb_kref_put(nb, "stack");
773 return 4;
776 if (unlikely(cor_conn_init_out(trgt_unconn_ll, nb, 0, 0))) {
777 cor_free_control_msg(cm);
778 cor_nb_kref_put(nb, "stack");
779 return 4;
782 trgt_unconn_ll->trgt.out.priority_last = cor_enc_priority(
783 _cor_conn_refresh_priority(trgt_unconn_ll));
785 cor_send_connect_nb(cm, trgt_unconn_ll->trgt.out.conn_id,
786 trgt_unconn_ll->trgt.out.seqno_nextsend,
787 cor_get_conn_reversedir(trgt_unconn_ll)->
788 src.in.next_seqno,
789 cor_get_conn_reversedir(trgt_unconn_ll));
791 cor_nb_kref_put(nb, "stack");
793 return 0;
796 static void _cor_reset_conn(struct cor_conn *cn_ll, int trgt_out_resetneeded)
798 unsigned long iflags;
800 if (cn_ll->sourcetype == SOURCE_IN) {
801 struct cor_neighbor *nb = cn_ll->src.in.nb;
803 if (cn_ll->src.in.conn_id != 0 &&
804 (cn_ll->src.in.conn_id & (1 << 31)) == 0) {
805 cor_insert_connid_reuse(nb, cn_ll->src.in.conn_id);
808 if (cn_ll->src.in.conn_id != 0) {
809 spin_lock_irqsave(&nb->connid_lock, iflags);
810 rb_erase(&cn_ll->src.in.rbn, &nb->connid_rb);
811 spin_unlock_irqrestore(&nb->connid_lock, iflags);
812 cor_conn_kref_put_bug(cn_ll, "connid_table");
814 cn_ll->src.in.conn_id = 0;
816 cor_free_ack_conns(cn_ll);
819 if (cn_ll->is_client)
820 atomic_dec(&cor_num_conns);
822 cor_reset_ooo_queue(cn_ll);
823 } else if (cn_ll->sourcetype == SOURCE_SOCK) {
824 if (likely(cn_ll->src.sock.ed->cs != 0)) {
825 cor_sk_write_space(cn_ll->src.sock.ed->cs);
826 kref_put(&cn_ll->src.sock.ed->cs->ref, cor_free_sock);
827 cn_ll->src.sock.ed->cs = 0;
829 if (unlikely(cn_ll->src.sock.in_cl_list != 0)) {
830 list_del(&cn_ll->src.sock.cl_list);
831 cn_ll->src.sock.in_cl_list = 0;
832 cor_conn_kref_put_bug(cn_ll, "conn_queue");
835 if (cn_ll->src.sock.socktype == SOCKTYPE_MANAGED) {
836 if (del_timer(&cn_ll->src.sock.ed->keepalive_timer) != 0)
837 cor_conn_kref_put(cn_ll, "keepalive_snd_timer");
841 if (cn_ll->targettype == TARGET_UNCONNECTED) {
842 if (cn_ll->trgt.unconnected.cmdparams != 0) {
843 kfree(cn_ll->trgt.unconnected.cmdparams);
844 cn_ll->trgt.unconnected.cmdparams = 0;
846 } else if (cn_ll->targettype == TARGET_OUT) {
847 struct cor_neighbor *nb = cn_ll->trgt.out.nb;
849 spin_lock_irqsave(&nb->conn_list_lock, iflags);
850 list_del(&cn_ll->trgt.out.nb_list);
851 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
852 cor_conn_kref_put_bug(cn_ll, "neighbor_list");
854 if (trgt_out_resetneeded && cn_ll->trgt.out.conn_id != 0) {
855 cor_send_reset_conn(cn_ll->trgt.out.nb,
856 cn_ll->trgt.out.conn_id, 0);
859 cn_ll->trgt.out.conn_id = 0;
861 cor_cancel_all_conn_retrans(cn_ll);
863 cor_neigh_waitingconns_remove_conn(cn_ll);
864 } else if (cn_ll->targettype == TARGET_SOCK) {
865 if (likely(cn_ll->trgt.sock.cs != 0)) {
866 if (cn_ll->trgt.sock.socktype == SOCKTYPE_RAW) {
867 cor_sk_data_ready(cn_ll->trgt.sock.cs);
868 } else {
869 cor_mngdsocket_readfromconn_fromatomic(
870 cn_ll->trgt.sock.cs);
872 kref_put(&cn_ll->trgt.sock.cs->ref, cor_free_sock);
873 cn_ll->trgt.sock.cs = 0;
874 cn_ll->trgt.sock.rcv_buf = 0;
878 cor_databuf_ackdiscard(cn_ll);
880 cor_account_bufspace(cn_ll);
882 cor_connreset_priority(cn_ll);
885 void cor_reset_conn_locked(struct cor_conn_bidir *cnb_ll)
887 BUG_ON(cnb_ll->cli.isreset <= 1 && cnb_ll->srv.isreset == 2);
888 BUG_ON(cnb_ll->cli.isreset == 2 && cnb_ll->srv.isreset <= 1);
890 if (cnb_ll->cli.isreset <= 1) {
891 __u8 old_isreset_cli = cnb_ll->cli.isreset;
892 __u8 old_isreset_srv = cnb_ll->srv.isreset;
894 cnb_ll->cli.isreset = 2;
895 cnb_ll->srv.isreset = 2;
897 _cor_reset_conn(&cnb_ll->cli, old_isreset_cli == 0);
898 _cor_reset_conn(&cnb_ll->srv, old_isreset_srv == 0);
902 void cor_reset_conn(struct cor_conn *cn)
904 struct cor_conn_bidir *cnb = cor_get_conn_bidir(cn);
906 cor_conn_kref_get(&cnb->cli, "stack");
907 cor_conn_kref_get(&cnb->srv, "stack");
909 spin_lock_bh(&cnb->cli.rcv_lock);
910 spin_lock_bh(&cnb->srv.rcv_lock);
912 cor_reset_conn_locked(cnb);
914 spin_unlock_bh(&cnb->srv.rcv_lock);
915 spin_unlock_bh(&cnb->cli.rcv_lock);
917 cor_conn_kref_put_bug(&cnb->cli, "stack");
918 cor_conn_kref_put(&cnb->srv, "stack");
921 static int __init cor_init(void)
923 int rc;
925 struct cor_conn_bidir cb;
926 struct cor_conn c;
929 printk(KERN_ERR "sizeof cor_conn_bidir: %u\n", (__u32) sizeof(cb));
930 printk(KERN_ERR "sizeof conn: %u\n", (__u32) sizeof(c));
931 printk(KERN_ERR " conn.source: %u\n", (__u32) sizeof(c.src));
932 printk(KERN_ERR " conn.source.in: %u\n", (__u32) sizeof(c.src.in));
933 printk(KERN_ERR " conn.source.sock: %u\n", (__u32) sizeof(c.src.sock));
934 printk(KERN_ERR " conn.trgt: %u\n", (__u32) sizeof(c.trgt));
935 printk(KERN_ERR " conn.trgt.out: %u\n",
936 (__u32) sizeof(c.trgt.out));
937 printk(KERN_ERR " conn.trgt.sock: %u\n",
938 (__u32) sizeof(c.trgt.sock));
939 printk(KERN_ERR " conn.data_buf: %u\n", (__u32) sizeof(c.data_buf));
940 printk(KERN_ERR " conn.bufsize: %u\n", (__u32) sizeof(c.bufsize));
942 printk(KERN_ERR "conn_src_sock_extradata: %u\n",
943 (__u32) sizeof(struct cor_conn_src_sock_extradata));
945 printk(KERN_ERR "sizeof cor_neighbor: %u\n",
946 (__u32) sizeof(struct cor_neighbor));
948 printk(KERN_ERR "sizeof mutex: %u\n", (__u32) sizeof(struct mutex));
949 printk(KERN_ERR "sizeof spinlock: %u\n", (__u32) sizeof(spinlock_t));
950 printk(KERN_ERR "sizeof timer_list: %u\n", (__u32) sizeof(struct timer_list));
951 printk(KERN_ERR "sizeof hrtimer: %u\n", (__u32) sizeof(struct hrtimer));
952 printk(KERN_ERR "sizeof ktime_t: %u\n", (__u32) sizeof(ktime_t));
953 printk(KERN_ERR "sizeof kref: %u\n", (__u32) sizeof(struct kref));
954 printk(KERN_ERR "sizeof list_head: %u\n",
955 (__u32) sizeof(struct list_head));
956 printk(KERN_ERR "sizeof rb_root: %u\n", (__u32) sizeof(struct rb_root));
957 printk(KERN_ERR "sizeof rb_node: %u\n", (__u32) sizeof(struct rb_node));
960 rc = cor_util_init();
961 if (unlikely(rc != 0))
962 return rc;
964 cor_conn_slab = kmem_cache_create("cor_conn",
965 sizeof(struct cor_conn_bidir), 8, 0, 0);
966 if (unlikely(cor_conn_slab == 0))
967 return -ENOMEM;
969 cor_conn_ed_slab = kmem_cache_create("cor_conn_src_sock_extradata",
970 sizeof(struct cor_conn_src_sock_extradata), 8, 0, 0);
971 if (unlikely(cor_conn_ed_slab == 0))
972 return -ENOMEM;
974 cor_connid_reuse_slab = kmem_cache_create("cor_connid_reuse",
975 sizeof(struct cor_connid_reuse_item), 8, 0, 0);
976 if (unlikely(cor_connid_reuse_slab == 0))
977 return -ENOMEM;
980 atomic_set(&cor_num_conns, 0);
981 barrier();
983 rc = cor_forward_init();
984 if (unlikely(rc != 0))
985 return rc;
987 rc = cor_kgen_init();
988 if (unlikely(rc != 0))
989 return rc;
991 rc = cor_rd_init1();
992 if (unlikely(rc != 0))
993 return rc;
995 rc = cor_snd_init();
996 if (unlikely(rc != 0))
997 return rc;
999 rc = cor_neighbor_init();
1000 if (unlikely(rc != 0))
1001 return rc;
1003 rc = cor_neigh_ann_rcv_init();
1004 if (unlikely(rc != 0))
1005 return rc;
1007 rc = cor_dev_init();
1008 if (unlikely(rc != 0))
1009 return rc;
1011 rc = cor_rcv_init();
1012 if (unlikely(rc != 0))
1013 return rc;
1015 rc = cor_sock_managed_init1();
1016 if (unlikely(rc != 0))
1017 return rc;
1019 rc = cor_conn_src_sock_init1();
1020 if (unlikely(rc != 0))
1021 return rc;
1023 rc = cor_sock_init2();
1024 if (unlikely(rc != 0))
1025 return rc;
1027 rc = cor_rd_init2();
1028 if (unlikely(rc != 0))
1029 return rc;
1031 return 0;
1034 static void __exit cor_exit(void)
1036 cor_rd_exit1();
1037 cor_sock_exit1();
1038 cor_conn_src_sock_exit1();
1039 cor_dev_exit1();
1041 flush_scheduled_work();
1043 cor_rcv_exit2();
1044 cor_neighbor_exit2();
1045 cor_neigh_ann_rcv_exit2();
1046 cor_snd_exit2();
1047 cor_rd_exit2();
1048 cor_kgen_exit2();
1049 cor_forward_exit2();
1051 BUG_ON(atomic_read(&cor_num_conns) != 0);
1053 kmem_cache_destroy(cor_conn_slab);
1054 cor_conn_slab = 0;
1056 kmem_cache_destroy(cor_conn_ed_slab);
1057 cor_conn_ed_slab = 0;
1059 kmem_cache_destroy(cor_connid_reuse_slab);
1060 cor_connid_reuse_slab = 0;
1063 module_init(cor_init);
1064 module_exit(cor_exit);
1065 MODULE_LICENSE("GPL");