32bit seqno, bugfixes
[cor.git] / net / cor / conn.c
blob7d80fd4d3e9450769f095a1cc55cd7ef5525a86f
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
18 #include "cor.h"
20 DEFINE_SPINLOCK(cor_bindnodes);
22 static LIST_HEAD(cor_openports);
24 static struct kmem_cache *cor_conn_slab;
25 static struct kmem_cache *cor_conn_ed_slab;
26 struct kmem_cache *cor_connid_reuse_slab;
28 atomic_t cor_num_conns;
30 int cor_new_incoming_conn_allowed(struct cor_neighbor *nb)
32 /**
33 * MAX_CONNS is only loosly enforced for now
34 * (cor_num_conns is not checked and incremented at the same time)
36 if (atomic_read(&cor_num_conns) >= MAX_CONNS)
37 return 0;
38 else
39 return 1;
42 static __u64 cor_get_prio_in(__u64 priority)
44 if (PRIORITY_IN_MULITPLIER_PERCENT >= 100)
45 return priority;
47 if (unlikely(priority > U64_MAX / 100))
48 return (priority / 100) * PRIORITY_IN_MULITPLIER_PERCENT;
49 else
50 return (priority * PRIORITY_IN_MULITPLIER_PERCENT) / 100;
53 static __u32 cor_conn_prio_sum_limit(__u32 priority, __u64 priority_sum)
55 __u32 shiftcnt = 0;
57 while ((PRIORITY_SUM_IN_MAX >> shiftcnt) > U32_MAX) {
58 shiftcnt++;
61 return div_u64(((__u64) priority) * (PRIORITY_SUM_IN_MAX >> shiftcnt),
62 (priority_sum >> shiftcnt));
65 __u32 _cor_conn_refresh_priority(struct cor_conn *cn_lx)
67 BUG_ON(cn_lx->is_client == 0);
69 if (cn_lx->sourcetype == SOURCE_IN) {
70 __u32 priority = (__u32)
71 cor_get_prio_in(cn_lx->src.in.priority);
72 __u64 priority_sum = cor_get_prio_in(atomic64_read(
73 &cn_lx->src.in.nb->priority_sum));
74 if (priority_sum > PRIORITY_SUM_IN_MAX) {
75 return cor_conn_prio_sum_limit(priority, priority_sum);
76 } else {
77 return priority;
79 } else if (cn_lx->sourcetype == SOURCE_SOCK) {
80 return cn_lx->src.sock.ed->priority;
81 } else {
82 BUG();
83 return 0;
87 __u32 cor_conn_refresh_priority(struct cor_conn *cn, int locked)
89 struct cor_conn *cn_reversedir = cor_get_conn_reversedir(cn);
90 __u32 priority = 0;
92 if (likely(locked == 0)) {
93 if (cn->is_client == 0)
94 return cor_conn_refresh_priority(cn_reversedir, 0);
96 spin_lock_bh(&cn->rcv_lock);
97 spin_lock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
98 } else {
99 BUG_ON(cn->is_client == 0);
102 if (unlikely(cn->isreset != 0) || cn->targettype != TARGET_OUT)
103 goto out;
105 priority = _cor_conn_refresh_priority(cn);
107 if (cn->trgt.out.priority_send_allowed != 0) {
108 __u16 priority_enc = cor_enc_priority(priority);
110 if (priority_enc != cn->trgt.out.priority_last ||
111 cn->is_highlatency_send_needed != 0)
112 cor_send_priority(cn, priority_enc);
115 out:
116 if (likely(locked == 0)) {
117 spin_unlock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
118 spin_unlock_bh(&cn->rcv_lock);
121 return priority;
124 static void _cor_set_conn_in_priority(struct cor_conn *src_in_lx,
125 __u32 newpriority)
127 struct cor_neighbor *nb = src_in_lx->src.in.nb;
128 __u32 oldpriority = src_in_lx->src.in.priority;
130 cor_update_atomic_sum(&nb->priority_sum, oldpriority, newpriority);
131 src_in_lx->src.in.priority = newpriority;
134 void cor_set_conn_is_highlatency(struct cor_conn *cn, __u8 is_highlatency,
135 int locked, int call_refresh_priority)
137 BUG_ON(cn->is_client == 0);
139 if (locked == 0) {
140 spin_lock_bh(&cn->rcv_lock);
141 spin_lock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
144 if (unlikely(cn->isreset != 0))
145 goto out;
147 if (cn->is_highlatency != is_highlatency) {
148 cn->is_highlatency = is_highlatency;
149 cor_get_conn_reversedir(cn)->is_highlatency = is_highlatency;
150 cn->is_highlatency_send_needed = 1;
152 if (cn->targettype == TARGET_OUT)
153 cor_get_conn_idletime(cn);
155 if (cor_get_conn_reversedir(cn)->targettype == TARGET_OUT)
156 cor_get_conn_idletime(cor_get_conn_reversedir(cn));
158 if (call_refresh_priority)
159 cor_conn_refresh_priority(cn, 1);
162 out:
163 if (locked == 0) {
164 spin_unlock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
165 spin_unlock_bh(&cn->rcv_lock);
169 void cor_set_conn_in_priority(struct cor_neighbor *nb, __u32 conn_id,
170 struct cor_conn *src_in, __u8 priority_seqno, __u16 priority,
171 __u8 is_highlatency)
173 struct cor_conn *trgt_out = cor_get_conn_reversedir(src_in);
174 __u32 newpriority;
176 if (unlikely(src_in->is_client == 0))
177 return;
179 spin_lock_bh(&src_in->rcv_lock);
180 spin_lock_bh(&trgt_out->rcv_lock);
182 if (unlikely(cor_is_conn_in(src_in, nb, conn_id) == 0))
183 goto out;
185 if (src_in->src.in.priority_seqno != priority_seqno)
186 goto out;
187 src_in->src.in.priority_seqno =
188 (src_in->src.in.priority_seqno + 1) & 15;
190 cor_set_conn_is_highlatency(src_in, is_highlatency, 1, 0);
192 newpriority = (cor_dec_priority(priority) * 4) / 5;
193 _cor_set_conn_in_priority(src_in, newpriority);
194 cor_conn_refresh_priority(src_in, 1);
196 out:
197 spin_unlock_bh(&trgt_out->rcv_lock);
198 spin_unlock_bh(&src_in->rcv_lock);
201 static void cor_connreset_priority(struct cor_conn *cn_lx)
203 if (cn_lx->is_client == 0)
204 return;
206 if (cn_lx->sourcetype == SOURCE_IN)
207 _cor_set_conn_in_priority(cn_lx, 0);
211 static void cor_conn_move_to_idle_list(struct cor_conn *trgt_out_lx)
213 unsigned long iflags;
214 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
216 BUG_ON(trgt_out_lx->data_buf.datasize != 0);
218 trgt_out_lx->trgt.out.nblist_busy_remaining = 0;
219 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
221 spin_lock_irqsave(&nb->conn_list_lock, iflags);
223 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
225 list_del(&trgt_out_lx->trgt.out.nb_list);
226 list_add_tail(&trgt_out_lx->trgt.out.nb_list, &nb->snd_conn_idle_list);
228 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
230 trgt_out_lx->trgt.out.in_nb_busy_list = 0;
233 static void cor_conn_move_to_busy_list(struct cor_conn *trgt_out_lx)
235 unsigned long iflags;
236 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
238 BUG_ON(trgt_out_lx->data_buf.datasize == 0);
240 trgt_out_lx->trgt.out.nblist_busy_remaining =
241 trgt_out_lx->data_buf.datasize;
243 spin_lock_irqsave(&nb->conn_list_lock, iflags);
245 trgt_out_lx->trgt.out.jiffies_last_act = jiffies;
247 list_del(&trgt_out_lx->trgt.out.nb_list);
248 list_add_tail(&trgt_out_lx->trgt.out.nb_list,
249 &nb->snd_conn_busy_list);
251 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
253 trgt_out_lx->trgt.out.in_nb_busy_list = 1;
256 void cor_conn_set_last_act(struct cor_conn *trgt_out_lx)
258 if (unlikely(trgt_out_lx->isreset != 0))
259 return;
261 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
263 if (unlikely(trgt_out_lx->trgt.out.in_nb_busy_list == 0)) {
264 if (unlikely(trgt_out_lx->data_buf.datasize != 0)) {
265 cor_conn_move_to_busy_list(trgt_out_lx);
266 } else if (unlikely(time_after(jiffies,
267 trgt_out_lx->trgt.out.jiffies_last_act +
268 HZ * CONN_ACTIVITY_UPDATEINTERVAL_SEC))) {
269 cor_conn_move_to_idle_list(trgt_out_lx);
271 } else {
272 if (unlikely(trgt_out_lx->data_buf.datasize == 0)) {
273 cor_conn_move_to_idle_list(trgt_out_lx);
274 } else if (unlikely(
275 trgt_out_lx->trgt.out.nblist_busy_remaining ==
276 0)) {
277 cor_conn_move_to_busy_list(trgt_out_lx);
281 if (unlikely(time_after(jiffies,
282 trgt_out_lx->trgt.out.jiffies_last_act +
283 (CONN_BUSY_INACTIVITY_TIMEOUT_SEC * HZ) / 4))) {
284 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
285 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
286 trgt_out_lx->bufsize.bufsize >>
287 BUFSIZE_SHIFT);
289 if (trgt_out_lx->bufsize.state != BUFSIZE_DECR &&
290 trgt_out_lx->bufsize.state !=
291 BUFSIZE_DECR_FAST) {
292 trgt_out_lx->bufsize.state = BUFSIZE_DECR;
293 trgt_out_lx->bufsize.act.decr.size_start =
294 trgt_out_lx->bufsize.bufsize;
300 static void _cor_free_conn(struct cor_conn *cn)
302 BUG_ON(cn->isreset == 0);
304 if (cn->sourcetype == SOURCE_IN) {
305 WARN_ONCE(list_empty(&cn->src.in.reorder_queue) == 0,
306 "cor_free_conn(): cn->src.in.reorder_queue is not empty");
307 WARN_ONCE(list_empty(&cn->src.in.acks_pending) == 0,
308 "cor_free_conn():cn->src.in.acks_pending is not empty");
310 WARN_ONCE(cn->src.in.conn_id != 0,
311 "cor_free_conn(): cn->src.in.conn_id is not 0");
312 cor_nb_kref_put(cn->src.in.nb, "conn");
313 cn->src.in.nb = 0;
314 } else if (cn->sourcetype == SOURCE_SOCK) {
315 BUG_ON(cn->src.sock.ed == 0);
316 memset(cn->src.sock.ed, 9 * 16 + 10,
317 sizeof(struct cor_conn_src_sock_extradata));
318 kmem_cache_free(cor_conn_ed_slab, cn->src.sock.ed);
319 cn->src.sock.ed = 0;
322 if (cn->targettype == TARGET_OUT) {
323 WARN_ONCE(list_empty(&cn->trgt.out.retrans_list) == 0,
324 "cor_free_conn(): cn->trgt.out.retrans_list is not empty");
325 WARN_ONCE(cn->trgt.out.rb.in_queue != RB_INQUEUE_FALSE,
326 "cor_free_conn(): cn->trgt.out.rb.in_queue is not RB_INQUEUE_FALSE");
327 WARN_ONCE(cn->trgt.out.conn_id != 0,
328 "cor_free_conn(): cn->trgt.out.conn_id is not 0");
329 cor_nb_kref_put(cn->trgt.out.nb, "conn");
330 cn->trgt.out.nb = 0;
333 WARN_ONCE(cn->data_buf.datasize != 0,
334 "cor_free_conn(): cn->data_buf.datasize is not 0");
335 WARN_ONCE(cn->data_buf.overhead != 0,
336 "cor_free_conn(): cn->data_buf.overhead is not 0");
337 WARN_ONCE(list_empty(&cn->data_buf.items) == 0,
338 "cor_free_conn(): cn->data_buf.items is not empty");
339 WARN_ONCE(cn->data_buf.nextread != 0,
340 "cor_free_conn(): cn->data_buf.nextread is not 0");
343 void cor_free_conn(struct kref *ref)
345 struct cor_conn_bidir *cnb = container_of(ref, struct cor_conn_bidir,
346 ref);
348 _cor_free_conn(&cnb->cli);
349 _cor_free_conn(&cnb->srv);
351 memset(cnb, 9 * 16 + 10, sizeof(struct cor_conn_bidir));
352 kmem_cache_free(cor_conn_slab, cnb);
356 * rc == 0 ==> ok
357 * rc == 1 ==> connid_reuse or connid allocation failed
359 int cor_conn_init_out(struct cor_conn *trgt_unconn_ll, struct cor_neighbor *nb,
360 __u32 rcvd_connid, int use_rcvd_connid)
362 unsigned long iflags;
363 struct cor_conn *src_unconn_ll =
364 cor_get_conn_reversedir(trgt_unconn_ll);
365 __u8 tmp;
367 BUG_ON(trgt_unconn_ll->targettype != TARGET_UNCONNECTED);
368 BUG_ON(src_unconn_ll == 0);
369 BUG_ON(src_unconn_ll->sourcetype != SOURCE_UNCONNECTED);
371 memset(&trgt_unconn_ll->trgt.out, 0,
372 sizeof(trgt_unconn_ll->trgt.out));
373 memset(&src_unconn_ll->src.in, 0, sizeof(src_unconn_ll->src.in));
375 trgt_unconn_ll->targettype = TARGET_OUT;
376 src_unconn_ll->sourcetype = SOURCE_IN;
378 spin_lock_irqsave(&nb->conn_list_lock, iflags);
379 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_KILLED))
380 goto out_err;
382 if (use_rcvd_connid) {
383 BUG_ON((rcvd_connid & (1 << 31)) == 0);
385 src_unconn_ll->src.in.conn_id = rcvd_connid;
386 if (unlikely(cor_insert_connid(nb, src_unconn_ll) != 0)) {
387 src_unconn_ll->src.in.conn_id = 0;
388 goto out_err;
390 } else {
391 if (unlikely(cor_connid_alloc(nb, src_unconn_ll))) {
392 goto out_err;
396 list_add_tail(&trgt_unconn_ll->trgt.out.nb_list,
397 &nb->snd_conn_idle_list);
398 cor_conn_kref_get(trgt_unconn_ll, "neighbor_list");
400 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
402 if (0) {
403 out_err:
404 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
405 trgt_unconn_ll->targettype = TARGET_UNCONNECTED;
406 src_unconn_ll->sourcetype = SOURCE_UNCONNECTED;
407 return 1;
410 trgt_unconn_ll->trgt.out.nb = nb;
411 src_unconn_ll->src.in.nb = nb;
412 cor_nb_kref_get(nb, "conn");
413 cor_nb_kref_get(nb, "conn");
415 cor_conn_set_last_act(trgt_unconn_ll);
417 INIT_LIST_HEAD(&src_unconn_ll->src.in.reorder_queue);
418 INIT_LIST_HEAD(&src_unconn_ll->src.in.acks_pending);
419 INIT_LIST_HEAD(&trgt_unconn_ll->trgt.out.retrans_list);
421 cor_reset_seqno(trgt_unconn_ll, 0);
422 if (use_rcvd_connid == 0) {
423 get_random_bytes((char *)
424 &trgt_unconn_ll->trgt.out.seqno_nextsend,
425 sizeof(
426 trgt_unconn_ll->trgt.out.seqno_nextsend));
427 trgt_unconn_ll->trgt.out.seqno_acked =
428 trgt_unconn_ll->trgt.out.seqno_nextsend;
429 trgt_unconn_ll->trgt.out.seqno_windowlimit =
430 trgt_unconn_ll->trgt.out.seqno_nextsend;
431 cor_reset_seqno(trgt_unconn_ll,
432 trgt_unconn_ll->trgt.out.seqno_nextsend);
434 get_random_bytes((char *) &src_unconn_ll->src.in.next_seqno,
435 sizeof(src_unconn_ll->src.in.next_seqno));
436 src_unconn_ll->src.in.window_seqnolimit =
437 src_unconn_ll->src.in.next_seqno;
438 src_unconn_ll->src.in.window_seqnolimit_remote =
439 src_unconn_ll->src.in.next_seqno;
442 get_random_bytes((char *) &tmp, 1);
443 trgt_unconn_ll->trgt.out.priority_seqno = (tmp & 15);
445 src_unconn_ll->src.in.priority_seqno = 0;
447 trgt_unconn_ll->trgt.out.jiffies_last_act = jiffies;
449 if (src_unconn_ll->is_highlatency)
450 trgt_unconn_ll->trgt.out.jiffies_idle_since =
451 (jiffies -
452 BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * HZ) <<
453 JIFFIES_LAST_IDLE_SHIFT;
454 else
455 trgt_unconn_ll->trgt.out.jiffies_idle_since =
456 jiffies << JIFFIES_LAST_IDLE_SHIFT;
458 trgt_unconn_ll->trgt.out.remote_bufsize_changerate = 64;
460 if (src_unconn_ll->is_client)
461 atomic_inc(&cor_num_conns);
463 if (use_rcvd_connid == 0)
464 cor_update_windowlimit(src_unconn_ll);
466 return 0;
469 int cor_conn_init_sock_source(struct cor_conn *cn)
471 struct cor_conn_src_sock_extradata *ed;
473 BUG_ON(cn == 0);
474 BUG_ON(cn->sourcetype != SOURCE_UNCONNECTED);
476 ed = kmem_cache_alloc(cor_conn_ed_slab, GFP_ATOMIC);
477 if (unlikely(ed == 0))
478 return 1;
479 memset(ed, 0, sizeof(struct cor_conn_src_sock_extradata));
481 cn->sourcetype = SOURCE_SOCK;
482 memset(&cn->src.sock, 0, sizeof(cn->src.sock));
483 cn->src.sock.ed = ed;
484 cn->src.sock.ed->priority = cor_priority_max();
486 cn->src.sock.ed->snd_speed.state = SNDSPEED_ACTIVE;
487 cn->src.sock.ed->snd_speed.flushed = 1;
488 cn->src.sock.ed->snd_speed.jiffies_last_refresh = jiffies;
489 cn->src.sock.ed->snd_speed.bytes_sent = 0;
490 cn->src.sock.ed->snd_speed.speed = SND_SPEED_START;
491 cn->src.sock.ed->snd_speed.speed_limited = SND_SPEED_START;
493 timer_setup(&cn->src.sock.keepalive_timer,
494 cor_keepalive_req_timerfunc, 0);
496 return 0;
499 void cor_conn_init_sock_target(struct cor_conn *cn)
501 BUG_ON(cn == 0);
502 cn->targettype = TARGET_SOCK;
503 memset(&cn->trgt.sock, 0, sizeof(cn->trgt.sock));
504 cor_reset_seqno(cn, 0);
507 static void _cor_alloc_conn(struct cor_conn *cn, __u8 is_highlatency)
509 cn->sourcetype = SOURCE_UNCONNECTED;
510 cn->targettype = TARGET_UNCONNECTED;
512 cn->isreset = 0;
514 spin_lock_init(&cn->rcv_lock);
516 cor_databuf_init(cn);
518 cor_bufsize_init(cn, 0);
520 if (is_highlatency == 0) {
521 cn->is_highlatency = 0;
522 cn->bufsize.bufsize =
523 (BUFSIZE_INITIAL_LOWLAT << BUFSIZE_SHIFT);
524 } else {
525 cn->is_highlatency = 1;
526 cn->bufsize.bufsize =
527 (BUFSIZE_INITIAL_HIGHLAT << BUFSIZE_SHIFT);
531 struct cor_conn_bidir *cor_alloc_conn(gfp_t allocflags, __u8 is_highlatency)
533 struct cor_conn_bidir *cnb;
535 cnb = kmem_cache_alloc(cor_conn_slab, allocflags);
536 if (unlikely(cnb == 0))
537 return 0;
539 memset(cnb, 0, sizeof(struct cor_conn_bidir));
541 cnb->cli.is_client = 1;
542 kref_init(&cnb->ref);
544 _cor_alloc_conn(&cnb->cli, is_highlatency);
545 _cor_alloc_conn(&cnb->srv, is_highlatency);
547 return cnb;
550 static struct cor_sock *cor_get_corsock_by_port(__be32 port)
552 struct list_head *curr = cor_openports.next;
554 while (curr != &cor_openports) {
555 struct cor_sock *cs = container_of(curr, struct cor_sock,
556 data.listener.lh);
557 BUG_ON(cs->type != CS_TYPE_LISTENER);
558 if (cs->data.listener.port == port)
559 return cs;
561 curr = curr->next;
564 return 0;
567 __u32 cor_list_services(char *buf, __u32 buflen)
569 __u32 cnt = 0;
571 __u32 buf_offset = 4;
573 struct list_head *curr;
574 int rc;
577 * The variable length header rowcount need to be generated after the
578 * data. This is done by reserving the maximum space they could take. If
579 * they end up being smaller, the data is moved so that there is no gap.
582 BUG_ON(buf == 0);
583 BUG_ON(buflen < buf_offset);
585 spin_lock_bh(&cor_bindnodes);
587 curr = cor_openports.next;
588 while (curr != &cor_openports) {
589 struct cor_sock *cs = container_of(curr, struct cor_sock,
590 data.listener.lh);
591 BUG_ON(cs->type != CS_TYPE_LISTENER);
593 if (cs->data.listener.publish_service == 0)
594 goto cont;
596 if (unlikely(buf_offset + 4 < buf_offset) ||
597 buf_offset + 4 > buflen)
598 break;
600 buf[buf_offset] = ((char *) &cs->data.listener.port)[0];
601 buf[buf_offset + 1] = ((char *) &cs->data.listener.port)[1];
602 buf[buf_offset + 2] = ((char *) &cs->data.listener.port)[2];
603 buf[buf_offset + 3] = ((char *) &cs->data.listener.port)[3];
604 buf_offset += 4;
605 cnt++;
607 cont:
608 curr = curr->next;
611 spin_unlock_bh(&cor_bindnodes);
613 rc = cor_encode_len(buf, 4, cnt);
614 BUG_ON(rc <= 0);
615 BUG_ON(rc > 4);
617 if (likely(rc < 4))
618 memmove(buf + ((__u32) rc), buf + 4, buf_offset);
620 return buf_offset - 4 + ((__u32) rc);
623 void cor_set_publish_service(struct cor_sock *cs, __u8 value)
625 BUG_ON(value != 0 && value != 1);
627 mutex_lock(&cs->lock);
629 cs->publish_service = value;
631 if (cs->type == CS_TYPE_LISTENER) {
632 spin_lock_bh(&cor_bindnodes);
633 cs->data.listener.publish_service = value;
634 spin_unlock_bh(&cor_bindnodes);
637 mutex_unlock(&cs->lock);
640 void cor_close_port(struct cor_sock *cs)
642 mutex_lock(&cs->lock);
643 if (unlikely(cs->type != CS_TYPE_LISTENER))
644 goto out;
646 spin_lock_bh(&cor_bindnodes);
648 list_del(&cs->data.listener.lh);
650 while (list_empty(&cs->data.listener.conn_queue) == 0) {
651 struct cor_conn *src_sock_o = container_of(
652 cs->data.listener.conn_queue.next,
653 struct cor_conn, src.sock.cl_list);
654 BUG_ON(src_sock_o->src.sock.in_cl_list == 0);
655 list_del(&src_sock_o->src.sock.cl_list);
656 src_sock_o->src.sock.in_cl_list = 0;
657 spin_unlock_bh(&cor_bindnodes);
659 cor_reset_conn(src_sock_o);
661 spin_lock_bh(&cor_bindnodes);
662 cor_conn_kref_put(src_sock_o, "conn_queue");
665 spin_unlock_bh(&cor_bindnodes);
666 out:
667 mutex_unlock(&cs->lock);
670 int cor_open_port(struct cor_sock *cs_l, __be32 port)
672 int rc = 0;
674 spin_lock_bh(&cor_bindnodes);
675 if (cor_get_corsock_by_port(port) != 0) {
676 rc = -EADDRINUSE;
677 goto out;
680 BUG_ON(cs_l->type != CS_TYPE_UNCONNECTED);
682 cs_l->type = CS_TYPE_LISTENER;
683 cs_l->data.listener.port = port;
684 cs_l->data.listener.publish_service = cs_l->publish_service;
686 /* kref is not used here */
687 INIT_LIST_HEAD(&cs_l->data.listener.conn_queue);
689 list_add_tail((struct list_head *) &cs_l->data.listener.lh,
690 &cor_openports);
692 out:
693 spin_unlock_bh(&cor_bindnodes);
695 return rc;
698 int cor_connect_port(struct cor_conn *trgt_unconn_ll, __be32 port)
700 struct cor_conn *src_unconn_ll =
701 cor_get_conn_reversedir(trgt_unconn_ll);
702 struct cor_sock *cs;
703 int rc = CONNECT_PORT_OK;
705 spin_lock_bh(&cor_bindnodes);
707 cs = cor_get_corsock_by_port(port);
708 if (cs == 0) {
709 rc = CONNECT_PORT_PORTCLOSED;
710 goto out;
713 if (unlikely(cs->data.listener.queue_len >=
714 cs->data.listener.queue_maxlen)) {
715 if (cs->data.listener.queue_maxlen <= 0)
716 rc = CONNECT_PORT_PORTCLOSED;
717 else
718 rc = CONNECT_PORT_TEMPORARILY_OUT_OF_RESOURCES;
720 goto out;
723 BUG_ON(trgt_unconn_ll->is_client != 1);
725 if (cor_conn_init_sock_source(src_unconn_ll) != 0) {
726 rc = CONNECT_PORT_TEMPORARILY_OUT_OF_RESOURCES;
727 goto out;
729 cor_conn_init_sock_target(trgt_unconn_ll);
732 list_add_tail(&src_unconn_ll->src.sock.cl_list,
733 &cs->data.listener.conn_queue);
734 src_unconn_ll->src.sock.in_cl_list = 1;
735 cor_conn_kref_get(src_unconn_ll, "conn_queue");
737 cs->data.listener.queue_len++;
738 atomic_set(&cs->ready_to_accept, 1);
739 barrier();
740 cs->sk.sk_state_change(&cs->sk);
742 out:
743 spin_unlock_bh(&cor_bindnodes);
744 return rc;
748 * rc == 0 connected
749 * rc == 3 addr not found
750 * rc == 4 ==> connid allocation failed
751 * rc == 4 ==> control msg alloc failed
753 int cor_connect_neigh(struct cor_conn *trgt_unconn_ll, __be64 addr)
755 struct cor_control_msg_out *cm;
756 struct cor_neighbor *nb = 0;
758 nb = cor_find_neigh(addr);
759 if (nb == 0)
760 return 3;
762 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_MED);
763 if (unlikely(cm == 0)) {
764 cor_nb_kref_put(nb, "stack");
765 return 4;
768 if (unlikely(cor_conn_init_out(trgt_unconn_ll, nb, 0, 0))) {
769 cor_free_control_msg(cm);
770 cor_nb_kref_put(nb, "stack");
771 return 4;
774 trgt_unconn_ll->trgt.out.priority_last = cor_enc_priority(
775 _cor_conn_refresh_priority(trgt_unconn_ll));
777 cor_send_connect_nb(cm, trgt_unconn_ll->trgt.out.conn_id,
778 trgt_unconn_ll->trgt.out.seqno_nextsend,
779 cor_get_conn_reversedir(trgt_unconn_ll)->
780 src.in.next_seqno,
781 cor_get_conn_reversedir(trgt_unconn_ll));
783 cor_nb_kref_put(nb, "stack");
785 return 0;
788 static void _cor_reset_conn(struct cor_conn *cn_ll, int trgt_out_resetneeded)
790 unsigned long iflags;
792 if (cn_ll->sourcetype == SOURCE_IN) {
793 struct cor_neighbor *nb = cn_ll->src.in.nb;
795 if (cn_ll->src.in.conn_id != 0 &&
796 (cn_ll->src.in.conn_id & (1 << 31)) == 0) {
797 cor_insert_connid_reuse(nb, cn_ll->src.in.conn_id);
800 if (cn_ll->src.in.conn_id != 0) {
801 spin_lock_irqsave(&nb->connid_lock, iflags);
802 rb_erase(&cn_ll->src.in.rbn, &nb->connid_rb);
803 spin_unlock_irqrestore(&nb->connid_lock, iflags);
804 cor_conn_kref_put_bug(cn_ll, "connid_table");
806 cn_ll->src.in.conn_id = 0;
808 cor_free_ack_conns(cn_ll);
811 if (cn_ll->is_client)
812 atomic_dec(&cor_num_conns);
814 cor_reset_ooo_queue(cn_ll);
815 } else if (cn_ll->sourcetype == SOURCE_SOCK) {
816 if (likely(cn_ll->src.sock.ed->cs != 0)) {
817 cor_sk_write_space(cn_ll->src.sock.ed->cs);
818 kref_put(&cn_ll->src.sock.ed->cs->ref, cor_free_sock);
819 cn_ll->src.sock.ed->cs = 0;
821 if (unlikely(cn_ll->src.sock.in_cl_list != 0)) {
822 list_del(&cn_ll->src.sock.cl_list);
823 cn_ll->src.sock.in_cl_list = 0;
824 cor_conn_kref_put_bug(cn_ll, "conn_queue");
827 if (cn_ll->src.sock.socktype == SOCKTYPE_MANAGED) {
828 if (del_timer(&cn_ll->src.sock.keepalive_timer) != 0)
829 cor_conn_kref_put(cn_ll, "keepalive_snd_timer");
833 if (cn_ll->targettype == TARGET_UNCONNECTED) {
834 if (cn_ll->trgt.unconnected.cmdparams != 0) {
835 kfree(cn_ll->trgt.unconnected.cmdparams);
836 cn_ll->trgt.unconnected.cmdparams = 0;
838 } else if (cn_ll->targettype == TARGET_OUT) {
839 struct cor_neighbor *nb = cn_ll->trgt.out.nb;
841 spin_lock_irqsave(&nb->conn_list_lock, iflags);
842 list_del(&cn_ll->trgt.out.nb_list);
843 spin_unlock_irqrestore(&nb->conn_list_lock, iflags);
844 cor_conn_kref_put_bug(cn_ll, "neighbor_list");
846 if (trgt_out_resetneeded && cn_ll->trgt.out.conn_id != 0) {
847 cor_send_reset_conn(cn_ll->trgt.out.nb,
848 cn_ll->trgt.out.conn_id, 0);
851 cn_ll->trgt.out.conn_id = 0;
853 cor_cancel_all_conn_retrans(cn_ll);
855 cor_qos_remove_conn(cn_ll);
856 } else if (cn_ll->targettype == TARGET_SOCK) {
857 if (likely(cn_ll->trgt.sock.cs != 0)) {
858 if (cn_ll->trgt.sock.socktype == SOCKTYPE_RAW) {
859 cor_sk_data_ready(cn_ll->trgt.sock.cs);
860 } else {
861 cor_mngdsocket_readfromconn_fromatomic(
862 cn_ll->trgt.sock.cs);
864 kref_put(&cn_ll->trgt.sock.cs->ref, cor_free_sock);
865 cn_ll->trgt.sock.cs = 0;
866 cn_ll->trgt.sock.rcv_buf = 0;
870 cor_databuf_ackdiscard(cn_ll);
872 cor_account_bufspace(cn_ll);
874 cor_connreset_priority(cn_ll);
877 void cor_reset_conn_locked(struct cor_conn_bidir *cnb_ll)
879 BUG_ON(cnb_ll->cli.isreset <= 1 && cnb_ll->srv.isreset == 2);
880 BUG_ON(cnb_ll->cli.isreset == 2 && cnb_ll->srv.isreset <= 1);
882 if (cnb_ll->cli.isreset <= 1) {
883 __u8 old_isreset_cli = cnb_ll->cli.isreset;
884 __u8 old_isreset_srv = cnb_ll->srv.isreset;
886 cnb_ll->cli.isreset = 2;
887 cnb_ll->srv.isreset = 2;
889 _cor_reset_conn(&cnb_ll->cli, old_isreset_cli == 0);
890 _cor_reset_conn(&cnb_ll->srv, old_isreset_srv == 0);
894 void cor_reset_conn(struct cor_conn *cn)
896 struct cor_conn_bidir *cnb = cor_get_conn_bidir(cn);
898 cor_conn_kref_get(&cnb->cli, "stack");
899 cor_conn_kref_get(&cnb->srv, "stack");
901 spin_lock_bh(&cnb->cli.rcv_lock);
902 spin_lock_bh(&cnb->srv.rcv_lock);
904 cor_reset_conn_locked(cnb);
906 spin_unlock_bh(&cnb->srv.rcv_lock);
907 spin_unlock_bh(&cnb->cli.rcv_lock);
909 cor_conn_kref_put_bug(&cnb->cli, "stack");
910 cor_conn_kref_put(&cnb->srv, "stack");
913 static int __init cor_init(void)
915 int rc;
917 struct cor_conn_bidir cb;
918 struct cor_conn c;
921 printk(KERN_ERR "sizeof cor_conn_bidir: %u\n", (__u32) sizeof(cb));
922 printk(KERN_ERR "sizeof conn: %u\n", (__u32) sizeof(c));
923 printk(KERN_ERR " conn.source: %u\n", (__u32) sizeof(c.src));
924 printk(KERN_ERR " conn.source.in: %u\n", (__u32) sizeof(c.src.in));
925 printk(KERN_ERR " conn.source.sock: %u\n", (__u32) sizeof(c.src.sock));
926 printk(KERN_ERR " conn.trgt: %u\n", (__u32) sizeof(c.trgt));
927 printk(KERN_ERR " conn.trgt.out: %u\n",
928 (__u32) sizeof(c.trgt.out));
929 printk(KERN_ERR " conn.trgt.sock: %u\n",
930 (__u32) sizeof(c.trgt.sock));
931 printk(KERN_ERR " conn.data_buf: %u\n", (__u32) sizeof(c.data_buf));
932 printk(KERN_ERR " conn.bufsize: %u\n", (__u32) sizeof(c.bufsize));
934 printk(KERN_ERR "conn_src_sock_extradata: %u\n",
935 (__u32) sizeof(struct cor_conn_src_sock_extradata));
937 printk(KERN_ERR "sizeof cor_neighbor: %u\n",
938 (__u32) sizeof(struct cor_neighbor));
940 printk(KERN_ERR "sizeof mutex: %u\n", (__u32) sizeof(struct mutex));
941 printk(KERN_ERR "sizeof spinlock: %u\n", (__u32) sizeof(spinlock_t));
942 printk(KERN_ERR "sizeof timer_list: %u\n", (__u32) sizeof(struct timer_list));
943 printk(KERN_ERR "sizeof kref: %u\n", (__u32) sizeof(struct kref));
944 printk(KERN_ERR "sizeof list_head: %u\n",
945 (__u32) sizeof(struct list_head));
946 printk(KERN_ERR "sizeof rb_root: %u\n", (__u32) sizeof(struct rb_root));
947 printk(KERN_ERR "sizeof rb_node: %u\n", (__u32) sizeof(struct rb_node));
950 rc = cor_util_init();
951 if (unlikely(rc != 0))
952 return rc;
954 cor_conn_slab = kmem_cache_create("cor_conn",
955 sizeof(struct cor_conn_bidir), 8, 0, 0);
956 if (unlikely(cor_conn_slab == 0))
957 return -ENOMEM;
959 cor_conn_ed_slab = kmem_cache_create("cor_conn_src_sock_extradata",
960 sizeof(struct cor_conn_src_sock_extradata), 8, 0, 0);
961 if (unlikely(cor_conn_ed_slab == 0))
962 return -ENOMEM;
964 cor_connid_reuse_slab = kmem_cache_create("cor_connid_reuse",
965 sizeof(struct cor_connid_reuse_item), 8, 0, 0);
966 if (unlikely(cor_connid_reuse_slab == 0))
967 return -ENOMEM;
970 atomic_set(&cor_num_conns, 0);
971 barrier();
973 rc = cor_forward_init();
974 if (unlikely(rc != 0))
975 return rc;
977 rc = cor_kgen_init();
978 if (unlikely(rc != 0))
979 return rc;
981 rc = cor_rd_init1();
982 if (unlikely(rc != 0))
983 return rc;
985 rc = cor_snd_init();
986 if (unlikely(rc != 0))
987 return rc;
989 rc = cor_neighbor_init();
990 if (unlikely(rc != 0))
991 return rc;
993 rc = cor_neigh_ann_rcv_init();
994 if (unlikely(rc != 0))
995 return rc;
997 rc = cor_dev_init();
998 if (unlikely(rc != 0))
999 return rc;
1001 rc = cor_rcv_init();
1002 if (unlikely(rc != 0))
1003 return rc;
1005 rc = cor_sock_managed_init1();
1006 if (unlikely(rc != 0))
1007 return rc;
1009 rc = cor_conn_src_sock_init1();
1010 if (unlikely(rc != 0))
1011 return rc;
1013 rc = cor_sock_init2();
1014 if (unlikely(rc != 0))
1015 return rc;
1017 rc = cor_rd_init2();
1018 if (unlikely(rc != 0))
1019 return rc;
1021 return 0;
1024 static void __exit cor_exit(void)
1026 cor_rd_exit1();
1027 cor_sock_exit1();
1028 cor_conn_src_sock_exit1();
1029 cor_dev_exit1();
1031 flush_scheduled_work();
1033 cor_rcv_exit2();
1034 cor_neighbor_exit2();
1035 cor_neigh_ann_rcv_exit2();
1036 cor_snd_exit2();
1037 cor_rd_exit2();
1038 cor_kgen_exit2();
1039 cor_forward_exit2();
1041 BUG_ON(atomic_read(&cor_num_conns) != 0);
1043 kmem_cache_destroy(cor_conn_slab);
1044 cor_conn_slab = 0;
1046 kmem_cache_destroy(cor_conn_ed_slab);
1047 cor_conn_ed_slab = 0;
1049 kmem_cache_destroy(cor_connid_reuse_slab);
1050 cor_connid_reuse_slab = 0;
1053 module_init(cor_init);
1054 module_exit(cor_exit);
1055 MODULE_LICENSE("GPL");