split dev_queue
[cor.git] / net / cor / dev_queue.c
blob6f21919fceb0b022c4c36e86500af601f0b26faa
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2023 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
17 #include "cor.h"
20 static struct cor_neighbor *cor_resume_neighbors_peeknextnb(
21 struct cor_dev *cd, unsigned long *jiffies_nb_lastduration)
23 unsigned long iflags;
25 struct cor_neighbor *nb;
27 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
29 if (list_empty(&cd->send_queue.neighbors_waiting)) {
30 if (list_empty(&cd->send_queue.neighbors_waiting_nextpass)) {
31 BUG_ON(cd->send_queue.numconns != 0);
32 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
34 return 0;
35 } else {
36 unsigned long jiffies_tmp = jiffies;
38 cor_swap_list_items(&cd->send_queue.neighbors_waiting,
39 &cd->send_queue.neighbors_waiting_nextpass);
41 WARN_ONCE(time_before(jiffies_tmp,
42 cd->send_queue.jiffies_nb_pass_start),
43 "cor_resume_neighbors_peeknextnb: jiffies after jiffies_nb_pass_start (this is only a performance issue)");
45 cd->send_queue.jiffies_nb_lastduration = jiffies -
46 cd->send_queue.jiffies_nb_pass_start;
47 cd->send_queue.jiffies_nb_pass_start = jiffies_tmp;
51 *jiffies_nb_lastduration = cd->send_queue.jiffies_nb_lastduration;
54 BUG_ON(cd->send_queue.numconns == 0);
55 BUG_ON(list_empty(&cd->send_queue.neighbors_waiting));
57 nb = container_of(cd->send_queue.neighbors_waiting.next,
58 struct cor_neighbor, rb.lh);
60 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
61 BUG_ON(nb->rb.lh.prev != &cd->send_queue.neighbors_waiting);
62 BUG_ON((nb->rb.lh.next == &cd->send_queue.neighbors_waiting) &&
63 (cd->send_queue.neighbors_waiting.prev != &nb->rb.lh));
65 cor_nb_kref_get(nb, "stack");
67 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
69 return nb;
72 static int cor_resume_neighbors(struct cor_dev *cd, int *sent)
74 unsigned long iflags;
75 int rc;
77 unsigned long jiffies_nb_lastduration;
78 struct cor_neighbor *nb = cor_resume_neighbors_peeknextnb(cd,
79 &jiffies_nb_lastduration);
81 if (nb == 0)
82 return QOS_RESUME_DONE;
84 atomic_set(&nb->cmsg_delay_conndata, 1);
86 rc = cor_neigh_waitingsconns_resume(cd, nb, jiffies_nb_lastduration,
87 sent);
88 if (rc == QOS_RESUME_CONG) {
89 cor_nb_kref_put(nb, "stack");
90 return QOS_RESUME_CONG;
92 BUG_ON(rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR);
94 atomic_set(&nb->cmsg_delay_conndata, 0);
95 spin_lock_bh(&nb->cmsg_lock);
96 cor_schedule_controlmsg_timer(nb);
97 spin_unlock_bh(&nb->cmsg_lock);
99 #warning todo rename qos_*
100 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
101 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
102 if (nb->conns_waiting.cnt == 0) {
103 BUG_ON(nb->conns_waiting.priority_sum != 0);
104 nb->rb.in_queue = RB_INQUEUE_FALSE;
105 list_del(&nb->rb.lh);
106 cor_nb_kref_put_bug(nb, "qos_queue_nb");
107 } else {
108 BUG_ON(nb->conns_waiting.priority_sum >
109 cd->send_queue.priority_sum);
110 list_del(&nb->rb.lh);
111 list_add_tail(&nb->rb.lh,
112 &cd->send_queue.neighbors_waiting_nextpass);
115 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
117 cor_nb_kref_put(nb, "stack");
119 return QOS_RESUME_NEXTNEIGHBOR;
122 static int __cor_qos_resume(struct cor_dev *cd, int caller, int *sent)
124 unsigned long iflags;
125 int rc = QOS_RESUME_DONE;
126 struct list_head *lh;
128 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
130 if (caller == QOS_CALLER_KPACKET)
131 lh = &cd->send_queue.kpackets_waiting;
132 else if (caller == QOS_CALLER_CONN_RETRANS)
133 lh = &cd->send_queue.conn_retrans_waiting;
134 else if (caller == QOS_CALLER_ANNOUNCE)
135 lh = &cd->send_queue.announce_waiting;
136 else
137 BUG();
139 while (list_empty(lh) == 0) {
140 struct cor_resume_block *rb = container_of(lh->next,
141 struct cor_resume_block, lh);
143 unsigned long cmsg_send_start_j;
144 ktime_t cmsg_send_start_kt;
146 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
147 rb->in_queue = RB_INQUEUE_FALSE;
148 list_del(&rb->lh);
150 if (caller == QOS_CALLER_KPACKET) {
151 struct cor_neighbor *nb = container_of(rb,
152 struct cor_neighbor, rb_kp);
153 cmsg_send_start_j = nb->cmsg_send_start_j;
154 cmsg_send_start_kt = nb->cmsg_send_start_kt;
157 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
158 if (caller == QOS_CALLER_KPACKET) {
159 rc = cor_send_messages(container_of(rb,
160 struct cor_neighbor, rb_kp),
161 cmsg_send_start_j, cmsg_send_start_kt,
162 sent);
163 } else if (caller == QOS_CALLER_CONN_RETRANS) {
164 rc = cor_send_retrans(container_of(rb,
165 struct cor_neighbor, rb_cr), sent);
166 } else if (caller == QOS_CALLER_ANNOUNCE) {
167 rc = _cor_send_announce(container_of(rb,
168 struct cor_announce_data, rb), 1, sent);
169 } else {
170 BUG();
172 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
174 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET) {
175 struct cor_neighbor *nb = container_of(rb,
176 struct cor_neighbor, rb_kp);
178 nb->cmsg_send_start_j = cmsg_send_start_j;
179 nb->cmsg_send_start_kt = cmsg_send_start_kt;
182 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
183 rb->in_queue = RB_INQUEUE_TRUE;
184 list_add(&rb->lh, lh);
185 break;
188 if (caller == QOS_CALLER_KPACKET) {
189 cor_nb_kref_put(container_of(rb, struct cor_neighbor,
190 rb_kp), "qos_queue_kpacket");
191 } else if (caller == QOS_CALLER_CONN_RETRANS) {
192 cor_nb_kref_put(container_of(rb, struct cor_neighbor,
193 rb_cr), "qos_queue_conn_retrans");
194 } else if (caller == QOS_CALLER_ANNOUNCE) {
195 kref_put(&container_of(rb, struct cor_announce_data,
196 rb)->ref, cor_announce_data_free);
197 } else {
198 BUG();
201 kref_put(&cd->ref, cor_kreffree_bug);
204 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
206 return rc;
209 static int _cor_qos_resume(struct cor_dev *cd, int *sent)
211 unsigned long iflags;
212 int i = QOS_CALLER_KPACKET;
213 int rc;
215 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
217 while (1) {
218 if (unlikely(cd->send_queue.is_destroyed == 1)) {
219 rc = QOS_RESUME_EXIT;
220 break;
223 if (i == QOS_CALLER_KPACKET &&
224 list_empty(&cd->send_queue.kpackets_waiting)) {
225 i = QOS_CALLER_CONN_RETRANS;
226 continue;
227 } else if (i == QOS_CALLER_CONN_RETRANS &&
228 list_empty(&cd->send_queue.conn_retrans_waiting)) {
229 i = QOS_CALLER_ANNOUNCE;
230 continue;
231 } else if (i == QOS_CALLER_ANNOUNCE &&
232 list_empty(&cd->send_queue.announce_waiting)) {
233 i = QOS_CALLER_NEIGHBOR;
234 continue;
235 } else if (i == QOS_CALLER_NEIGHBOR &&
236 list_empty(&cd->send_queue.neighbors_waiting) &&
237 list_empty(&cd->send_queue.neighbors_waiting_nextpass)) {
238 rc = QOS_RESUME_DONE;
239 break;
242 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
244 if (i == QOS_CALLER_NEIGHBOR)
245 rc = cor_resume_neighbors(cd, sent);
246 else
247 rc = __cor_qos_resume(cd, i, sent);
249 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
251 if (rc == QOS_RESUME_CONG)
252 break;
254 i = QOS_CALLER_KPACKET;
257 if (rc == QOS_RESUME_DONE) {
258 BUG_ON(!list_empty(&cd->send_queue.kpackets_waiting));
259 BUG_ON(!list_empty(&cd->send_queue.conn_retrans_waiting));
260 BUG_ON(!list_empty(&cd->send_queue.announce_waiting));
261 BUG_ON(!list_empty(&cd->send_queue.neighbors_waiting));
262 BUG_ON(!list_empty(&cd->send_queue.neighbors_waiting_nextpass));
264 atomic_set(&cd->send_queue.qos_resume_scheduled, 0);
267 cor_dev_queue_set_congstatus(cd);
269 if (unlikely(cd->send_queue.is_destroyed == 1))
270 rc = QOS_RESUME_EXIT;
272 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
274 return rc;
277 int cor_qos_resume_threadfunc(void *data)
279 struct cor_dev *cd = (struct cor_dev *) data;
281 while (1) {
282 int sent = 0;
283 int rc;
285 rc = _cor_qos_resume(cd, &sent);
287 if (rc == QOS_RESUME_DONE) {
288 wait_event(cd->send_queue.qos_resume_wq,
289 atomic_read(
290 &cd->send_queue.qos_resume_scheduled)
291 != 0);
292 } else if (rc == QOS_RESUME_CONG) {
293 unsigned long jiffies_tmp = jiffies;
294 unsigned long delay_ms = 0;
296 if (sent)
297 cd->send_queue.jiffies_lastprogress = jiffies_tmp;
298 delay_ms = (jiffies_to_msecs(jiffies_tmp -
299 cd->send_queue.jiffies_lastprogress) + 8) / 4;
300 if (delay_ms < 2)
301 delay_ms = 2;
302 else if (delay_ms > 20)
303 delay_ms = 20;
305 msleep(delay_ms);
306 } else if (rc == QOS_RESUME_EXIT) {
307 return 0;
308 } else {
309 BUG();
314 static inline void cor_dev_queue_schedule_resume(struct cor_dev *cd)
316 if (atomic_cmpxchg(&cd->send_queue.qos_resume_scheduled, 0, 1) == 0) {
317 barrier();
318 wake_up(&cd->send_queue.qos_resume_wq);
322 static void _cor_destroy_queue_kpackets(struct cor_dev *cd_qlocked)
324 while (list_empty(&cd_qlocked->send_queue.kpackets_waiting) == 0) {
325 struct list_head *curr =
326 cd_qlocked->send_queue.kpackets_waiting.next;
327 struct cor_resume_block *rb = container_of(curr,
328 struct cor_resume_block, lh);
329 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
330 rb->in_queue = RB_INQUEUE_FALSE;
331 list_del(curr);
333 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb_kp),
334 "qos_queue_kpacket");
335 kref_put(&cd_qlocked->ref, cor_kreffree_bug);
339 static void _cor_destroy_queue_conn_retrans(struct cor_dev *cd_qlocked)
341 while (list_empty(&cd_qlocked->send_queue.conn_retrans_waiting) == 0) {
342 struct list_head *curr =
343 cd_qlocked->send_queue.conn_retrans_waiting.next;
344 struct cor_resume_block *rb = container_of(curr,
345 struct cor_resume_block, lh);
346 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
347 rb->in_queue = RB_INQUEUE_FALSE;
348 list_del(curr);
350 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb_cr),
351 "qos_queue_conn_retrans");
352 kref_put(&cd_qlocked->ref, cor_kreffree_bug);
356 static void _cor_destroy_queue_announce(struct cor_dev *cd_qlocked)
358 while (list_empty(&cd_qlocked->send_queue.announce_waiting) == 0) {
359 struct list_head *curr =
360 cd_qlocked->send_queue.announce_waiting.next;
361 struct cor_resume_block *rb = container_of(curr,
362 struct cor_resume_block, lh);
363 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
364 rb->in_queue = RB_INQUEUE_FALSE;
365 list_del(curr);
367 kref_put(&container_of(rb, struct cor_announce_data, rb)->ref,
368 cor_announce_data_free);
369 kref_put(&cd_qlocked->ref, cor_kreffree_bug);
373 static void _cor_destroy_queue_neighbor(struct cor_dev *cd_qlocked,
374 struct list_head *lh)
376 while (list_empty(lh) == 0) {
377 struct list_head *curr = lh->next;
378 struct cor_resume_block *rb = container_of(curr,
379 struct cor_resume_block, lh);
380 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
381 rb->in_queue = RB_INQUEUE_FALSE;
382 list_del(curr);
384 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb),
385 "qos_queue_nb");
386 kref_put(&cd_qlocked->ref, cor_kreffree_bug);
390 void cor_dev_queue_destroy(struct cor_dev *cd)
392 unsigned long iflags;
394 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
395 cd->send_queue.is_destroyed = 1;
396 _cor_destroy_queue_kpackets(cd);
397 _cor_destroy_queue_conn_retrans(cd);
398 _cor_destroy_queue_announce(cd);
399 _cor_destroy_queue_neighbor(cd, &cd->send_queue.neighbors_waiting);
400 _cor_destroy_queue_neighbor(cd,
401 &cd->send_queue.neighbors_waiting_nextpass);
403 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
405 cor_dev_queue_schedule_resume(cd);
408 int cor_dev_queue_init(struct cor_dev *cd)
410 spin_lock_init(&cd->send_queue.qlock);
412 atomic_set(&cd->send_queue.qos_resume_scheduled, 0);
414 init_waitqueue_head(&cd->send_queue.qos_resume_wq);
416 INIT_LIST_HEAD(&cd->send_queue.kpackets_waiting);
417 INIT_LIST_HEAD(&cd->send_queue.conn_retrans_waiting);
418 INIT_LIST_HEAD(&cd->send_queue.announce_waiting);
419 INIT_LIST_HEAD(&cd->send_queue.neighbors_waiting);
420 INIT_LIST_HEAD(&cd->send_queue.neighbors_waiting_nextpass);
422 atomic_set(&cd->send_queue.cong_status, 0);
424 cd->send_queue.qos_resume_thread = kthread_create(
425 cor_qos_resume_threadfunc, cd, "cor_qos_resume");
426 if (cd->send_queue.qos_resume_thread == 0) {
427 printk(KERN_ERR "cor: unable to start qos_resume thread\n");
429 return 1;
431 get_task_struct(cd->send_queue.qos_resume_thread);
432 wake_up_process(cd->send_queue.qos_resume_thread);
434 return 0;
437 void cor_dev_queue_set_congstatus(struct cor_dev *cd_qlocked)
439 __u32 newstatus;
441 if (time_before(cd_qlocked->send_queue.jiffies_lastdrop,
442 jiffies - HZ / 50)) {
443 newstatus = CONGSTATUS_NONE;
444 } else if (list_empty(&cd_qlocked->send_queue.kpackets_waiting) == 0) {
445 newstatus = CONGSTATUS_KPACKETS;
446 } else if (list_empty(&cd_qlocked->send_queue.conn_retrans_waiting) == 0) {
447 newstatus = CONGSTATUS_RETRANS;
448 } else if (list_empty(&cd_qlocked->send_queue.announce_waiting) == 0) {
449 newstatus = CONGSTATUS_ANNOUNCE;
450 } else if (list_empty(&cd_qlocked->send_queue.neighbors_waiting) == 0 ||
451 list_empty(&cd_qlocked->send_queue.neighbors_waiting_nextpass) == 0) {
452 newstatus = CONGSTATUS_CONNDATA;
453 } else {
454 newstatus = CONGSTATUS_NONE;
457 atomic_set(&cd_qlocked->send_queue.cong_status, newstatus);
460 void cor_dev_queue_set_lastdrop(struct cor_dev *cd)
462 unsigned long iflags;
464 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
465 cd->send_queue.jiffies_lastdrop = jiffies;
466 cor_dev_queue_set_congstatus(cd);
467 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);
471 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
472 * caller
474 void _cor_dev_queue_enqueue(struct cor_dev *cd_qlocked,
475 struct cor_resume_block *rb,
476 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
477 int caller, int from_nbcongwin_resume,
478 int from_nbnotactive_resume)
480 int queues_empty;
482 if (rb->in_queue == RB_INQUEUE_TRUE) {
483 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
485 if (caller == QOS_CALLER_KPACKET) {
486 struct cor_neighbor *nb = container_of(rb,
487 struct cor_neighbor, rb_kp);
488 if (time_before(cmsg_send_start_j,
489 nb->cmsg_send_start_j))
490 nb->cmsg_send_start_j = cmsg_send_start_j;
491 if (ktime_before(cmsg_send_start_kt,
492 nb->cmsg_send_start_kt))
493 nb->cmsg_send_start_kt = cmsg_send_start_kt;
495 return;
496 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
497 from_nbcongwin_resume == 0) {
498 return;
499 } else if (rb->in_queue == RB_INQUEUE_NBNOTACTIVE) {
500 return;
503 if (unlikely(cd_qlocked->send_queue.is_destroyed != 0))
504 return;
506 queues_empty = list_empty(&cd_qlocked->send_queue.kpackets_waiting) &&
507 list_empty(&cd_qlocked->send_queue.conn_retrans_waiting) &&
508 list_empty(&cd_qlocked->send_queue.announce_waiting) &&
509 list_empty(&cd_qlocked->send_queue.neighbors_waiting) &&
510 list_empty(&cd_qlocked->send_queue.neighbors_waiting_nextpass);
512 BUG_ON(!queues_empty &&
513 atomic_read(&cd_qlocked->send_queue.qos_resume_scheduled) == 0);
515 if (caller == QOS_CALLER_KPACKET) {
516 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
517 rb_kp);
518 nb->cmsg_send_start_j = cmsg_send_start_j;
519 nb->cmsg_send_start_kt = cmsg_send_start_kt;
520 list_add_tail(&rb->lh, &cd_qlocked->send_queue.kpackets_waiting);
521 cor_nb_kref_get(nb, "qos_queue_kpacket");
522 } else if (caller == QOS_CALLER_CONN_RETRANS) {
523 list_add_tail(&rb->lh, &cd_qlocked->send_queue.conn_retrans_waiting);
524 cor_nb_kref_get(container_of(rb, struct cor_neighbor, rb_cr),
525 "qos_queue_conn_retrans");
526 } else if (caller == QOS_CALLER_ANNOUNCE) {
527 list_add_tail(&rb->lh, &cd_qlocked->send_queue.announce_waiting);
528 kref_get(&container_of(rb, struct cor_announce_data, rb)->ref);
529 } else if (caller == QOS_CALLER_NEIGHBOR) {
530 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
531 rb);
532 if (unlikely(nb->conns_waiting.cnt == 0))
533 return;
535 list_add_tail(&rb->lh,
536 &cd_qlocked->send_queue.neighbors_waiting_nextpass);
537 cor_nb_kref_get(nb, "qos_queue_nb");
538 cd_qlocked->send_queue.numconns += nb->conns_waiting.cnt;
539 cd_qlocked->send_queue.priority_sum +=
540 nb->conns_waiting.priority_sum;
541 cd_qlocked->send_queue.jiffies_nb_lastduration = 0;
542 cd_qlocked->send_queue.jiffies_nb_pass_start = jiffies;
543 } else {
544 BUG();
546 rb->in_queue = RB_INQUEUE_TRUE;
547 kref_get(&cd_qlocked->ref);
549 cor_dev_queue_schedule_resume(cd_qlocked);
551 cor_dev_queue_set_congstatus(cd_qlocked);
554 void cor_dev_queue_enqueue(struct cor_dev *cd, struct cor_resume_block *rb,
555 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
556 int caller, int from_nbnotactive_resume)
558 unsigned long iflags;
560 spin_lock_irqsave(&cd->send_queue.qlock, iflags);
561 _cor_dev_queue_enqueue(cd, rb, cmsg_send_start_j, cmsg_send_start_kt,
562 caller, 0, from_nbnotactive_resume);
563 spin_unlock_irqrestore(&cd->send_queue.qlock, iflags);