lib: remove unused libfru & libfrureg
[unleashed.git] / kernel / net / squeue.c
bloba4ae56d8bd1610d06f429b9082b6d2fcb8500889
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
26 * Copyright 2012 Joyent, Inc. All rights reserved.
30 * Squeues: General purpose serialization mechanism
31 * ------------------------------------------------
33 * Background:
34 * -----------
36 * This is a general purpose high-performance serialization mechanism
37 * currently used by TCP/IP. It is implement by means of a per CPU queue,
38 * a worker thread and a polling thread with are bound to the CPU
39 * associated with the squeue. The squeue is strictly FIFO for both read
40 * and write side and only one thread can process it at any given time.
41 * The design goal of squeue was to offer a very high degree of
42 * parallelization (on a per H/W execution pipeline basis) with at
43 * most one queuing.
45 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
46 * SQUEUE_ENTER() macro as soon as a thread enter the module
47 * from either direction. For each packet, the processing function
48 * and argument is stored in the mblk itself. When the packet is ready
49 * to be processed, the squeue retrieves the stored function and calls
50 * it with the supplied argument and the pointer to the packet itself.
51 * The called function can assume that no other thread is processing
52 * the squeue when it is executing.
54 * Squeue/connection binding:
55 * --------------------------
57 * TCP/IP uses an IP classifier in conjunction with squeue where specific
58 * connections are assigned to specific squeue (based on various policies),
59 * at the connection creation time. Once assigned, the connection to
60 * squeue mapping is never changed and all future packets for that
61 * connection are processed on that squeue. The connection ("conn") to
62 * squeue mapping is stored in "conn_t" member "conn_sqp".
64 * Since the processing of the connection cuts across multiple layers
65 * but still allows packets for different connnection to be processed on
66 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
67 * "Per Connection Vertical Perimeter".
69 * Processing Model:
70 * -----------------
72 * Squeue doesn't necessary processes packets with its own worker thread.
73 * The callers can pick if they just want to queue the packet, process
74 * their packet if nothing is queued or drain and process. The first two
75 * modes are typically employed when the packet was generated while
76 * already doing the processing behind the squeue and last mode (drain
77 * and process) is typically employed when the thread is entering squeue
78 * for the first time. The squeue still imposes a finite time limit
79 * for which a external thread can do processing after which it switches
80 * processing to its own worker thread.
82 * Once created, squeues are never deleted. Hence squeue pointers are
83 * always valid. This means that functions outside the squeue can still
84 * refer safely to conn_sqp and their is no need for ref counts.
86 * Only a thread executing in the squeue can change the squeue of the
87 * connection. It does so by calling a squeue framework function to do this.
88 * After changing the squeue, the thread must leave the squeue. It must not
89 * continue to execute any code that needs squeue protection.
91 * The squeue framework, after entering the squeue, checks if the current
92 * squeue matches the conn_sqp. If the check fails, the packet is delivered
93 * to right squeue.
95 * Polling Model:
96 * --------------
98 * Squeues can control the rate of packet arrival into itself from the
99 * NIC or specific Rx ring within a NIC. As part of capability negotiation
100 * between IP and MAC layer, squeue are created for each TCP soft ring
101 * (or TCP Rx ring - to be implemented in future). As part of this
102 * negotiation, squeues get a cookie for underlying soft ring or Rx
103 * ring, a function to turn off incoming packets and a function to call
104 * to poll for packets. This helps schedule the receive side packet
105 * processing so that queue backlog doesn't build up and packet processing
106 * doesn't keep getting disturbed by high priority interrupts. As part
107 * of this mode, as soon as a backlog starts building, squeue turns off
108 * the interrupts and switches to poll mode. In poll mode, when poll
109 * thread goes down to retrieve packets, it retrieves them in the form of
110 * a chain which improves performance even more. As the squeue/softring
111 * system gets more packets, it gets more efficient by switching to
112 * polling more often and dealing with larger packet chains.
116 #include <sys/types.h>
117 #include <sys/cmn_err.h>
118 #include <sys/debug.h>
119 #include <sys/kmem.h>
120 #include <sys/cpuvar.h>
121 #include <sys/condvar_impl.h>
122 #include <sys/systm.h>
123 #include <sys/callb.h>
124 #include <sys/sdt.h>
125 #include <sys/ddi.h>
126 #include <sys/sunddi.h>
127 #include <sys/stack.h>
128 #include <sys/archsystm.h>
130 #include <inet/ipclassifier.h>
131 #include <inet/udp_impl.h>
133 #include <sys/squeue_impl.h>
135 static void squeue_fire(void *);
136 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
137 static void squeue_worker(squeue_t *sqp);
138 static void squeue_polling_thread(squeue_t *sqp);
140 kmem_cache_t *squeue_cache;
142 #define SQUEUE_MSEC_TO_NSEC 1000000
144 int squeue_drain_ms = 20;
145 int squeue_workerwait_ms = 0;
147 /* The values above converted to ticks or nano seconds */
148 static int squeue_drain_ns = 0;
149 static int squeue_workerwait_tick = 0;
151 uintptr_t squeue_drain_stack_needed = 10240;
152 uint_t squeue_drain_stack_toodeep;
154 #define MAX_BYTES_TO_PICKUP 150000
156 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
157 /* \
158 * Enqueue our mblk chain. \
159 */ \
160 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
162 if ((sqp)->sq_last != NULL) \
163 (sqp)->sq_last->b_next = (mp); \
164 else \
165 (sqp)->sq_first = (mp); \
166 (sqp)->sq_last = (tail); \
167 (sqp)->sq_count += (cnt); \
168 ASSERT((sqp)->sq_count > 0); \
169 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
170 mblk_t *, mp, mblk_t *, tail, int, cnt); \
175 * Blank the receive ring (in this case it is the soft ring). When
176 * blanked, the soft ring will not send any more packets up.
177 * Blanking may not succeed when there is a CPU already in the soft
178 * ring sending packets up. In that case, SQS_POLLING will not be
179 * set.
181 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
182 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
183 if (sq_poll_capable) { \
184 ASSERT(rx_ring != NULL); \
185 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
186 if (!(sqp->sq_state & SQS_POLLING)) { \
187 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
188 sqp->sq_state |= SQS_POLLING; \
193 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
194 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
195 if (sq_poll_capable) { \
196 ASSERT(rx_ring != NULL); \
197 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
198 if (sqp->sq_state & SQS_POLLING) { \
199 sqp->sq_state &= ~SQS_POLLING; \
200 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
205 /* Wakeup poll thread only if SQS_POLLING is set */
206 #define SQS_POLL_RING(sqp) { \
207 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
208 if (sqp->sq_state & SQS_POLLING) { \
209 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
210 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
211 sqp->sq_state |= SQS_GET_PKTS; \
212 cv_signal(&sqp->sq_poll_cv); \
217 #ifdef DEBUG
218 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
219 (sqp)->sq_curmp = (mp); \
220 (sqp)->sq_curproc = (proc); \
221 (sqp)->sq_connp = (connp); \
222 (mp)->b_tag = (sqp)->sq_tag = (tag); \
225 #define SQUEUE_DBG_CLEAR(sqp) { \
226 (sqp)->sq_curmp = NULL; \
227 (sqp)->sq_curproc = NULL; \
228 (sqp)->sq_connp = NULL; \
230 #else
231 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
232 #define SQUEUE_DBG_CLEAR(sqp)
233 #endif
235 void
236 squeue_init(void)
238 squeue_cache = kmem_cache_create("squeue_cache",
239 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
241 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
242 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
245 /* ARGSUSED */
246 squeue_t *
247 squeue_create(clock_t wait, pri_t pri)
249 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
251 bzero(sqp, sizeof (squeue_t));
252 sqp->sq_bind = PBIND_NONE;
253 sqp->sq_priority = pri;
254 sqp->sq_wait = MSEC_TO_TICK(wait);
255 sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
256 sqp, 0, &p0, TS_RUN, pri);
258 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
259 sqp, 0, &p0, TS_RUN, pri);
261 return (sqp);
265 * Bind squeue worker thread to the specified CPU, given by CPU id.
266 * If the CPU id value is -1, bind the worker thread to the value
267 * specified in sq_bind field. If a thread is already bound to a
268 * different CPU, unbind it from the old CPU and bind to the new one.
271 void
272 squeue_bind(squeue_t *sqp, processorid_t bind)
274 mutex_enter(&sqp->sq_lock);
275 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
276 ASSERT(MUTEX_HELD(&cpu_lock));
278 if (sqp->sq_state & SQS_BOUND) {
279 if (sqp->sq_bind == bind) {
280 mutex_exit(&sqp->sq_lock);
281 return;
283 thread_affinity_clear(sqp->sq_worker);
284 } else {
285 sqp->sq_state |= SQS_BOUND;
288 if (bind != PBIND_NONE)
289 sqp->sq_bind = bind;
291 thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
292 mutex_exit(&sqp->sq_lock);
295 void
296 squeue_unbind(squeue_t *sqp)
298 mutex_enter(&sqp->sq_lock);
299 if (!(sqp->sq_state & SQS_BOUND)) {
300 mutex_exit(&sqp->sq_lock);
301 return;
304 sqp->sq_state &= ~SQS_BOUND;
305 thread_affinity_clear(sqp->sq_worker);
306 mutex_exit(&sqp->sq_lock);
309 void
310 squeue_worker_wakeup(squeue_t *sqp)
312 timeout_id_t tid = (sqp)->sq_tid;
314 ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
316 if (sqp->sq_wait == 0) {
317 ASSERT(tid == 0);
318 ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
319 sqp->sq_awaken = ddi_get_lbolt();
320 cv_signal(&sqp->sq_worker_cv);
321 mutex_exit(&sqp->sq_lock);
322 return;
326 * Queue isn't being processed, so take
327 * any post enqueue actions needed before leaving.
329 if (tid != 0) {
331 * Waiting for an enter() to process mblk(s).
333 clock_t now = ddi_get_lbolt();
334 clock_t waited = now - sqp->sq_awaken;
336 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
338 * Times up and have a worker thread
339 * waiting for work, so schedule it.
341 sqp->sq_tid = 0;
342 sqp->sq_awaken = now;
343 cv_signal(&sqp->sq_worker_cv);
344 mutex_exit(&sqp->sq_lock);
345 (void) untimeout(tid);
346 return;
348 mutex_exit(&sqp->sq_lock);
349 return;
350 } else if (sqp->sq_state & SQS_TMO_PROG) {
351 mutex_exit(&sqp->sq_lock);
352 return;
353 } else {
354 clock_t wait = sqp->sq_wait;
356 * Wait up to sqp->sq_wait ms for an
357 * enter() to process this queue. We
358 * don't want to contend on timeout locks
359 * with sq_lock held for performance reasons,
360 * so drop the sq_lock before calling timeout
361 * but we need to check if timeout is required
362 * after re acquiring the sq_lock. Once
363 * the sq_lock is dropped, someone else could
364 * have processed the packet or the timeout could
365 * have already fired.
367 sqp->sq_state |= SQS_TMO_PROG;
368 mutex_exit(&sqp->sq_lock);
369 tid = timeout(squeue_fire, sqp, wait);
370 mutex_enter(&sqp->sq_lock);
371 /* Check again if we still need the timeout */
372 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
373 SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
374 (sqp->sq_first != NULL)) {
375 sqp->sq_state &= ~SQS_TMO_PROG;
376 sqp->sq_tid = tid;
377 mutex_exit(&sqp->sq_lock);
378 return;
379 } else {
380 if (sqp->sq_state & SQS_TMO_PROG) {
381 sqp->sq_state &= ~SQS_TMO_PROG;
382 mutex_exit(&sqp->sq_lock);
383 (void) untimeout(tid);
384 } else {
386 * The timer fired before we could
387 * reacquire the sq_lock. squeue_fire
388 * removes the SQS_TMO_PROG flag
389 * and we don't need to do anything
390 * else.
392 mutex_exit(&sqp->sq_lock);
397 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
401 * squeue_enter() - enter squeue sqp with mblk mp (which can be
402 * a chain), while tail points to the end and cnt in number of
403 * mblks in the chain.
405 * For a chain of single packet (i.e. mp == tail), go through the
406 * fast path if no one is processing the squeue and nothing is queued.
408 * The proc and arg for each mblk is already stored in the mblk in
409 * appropriate places.
411 * The process_flag specifies if we are allowed to process the mblk
412 * and drain in the entering thread context. If process_flag is
413 * SQ_FILL, then we just queue the mblk and return (after signaling
414 * the worker thread if no one else is processing the squeue).
416 * The ira argument can be used when the count is one.
417 * For a chain the caller needs to prepend any needed mblks from
418 * ip_recv_attr_to_mblk().
420 /* ARGSUSED */
421 void
422 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
423 ip_recv_attr_t *ira, int process_flag, uint8_t tag)
425 conn_t *connp;
426 sqproc_t proc;
427 hrtime_t now;
429 ASSERT(sqp != NULL);
430 ASSERT(mp != NULL);
431 ASSERT(tail != NULL);
432 ASSERT(cnt > 0);
433 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
434 ASSERT(ira == NULL || cnt == 1);
436 mutex_enter(&sqp->sq_lock);
439 * Try to process the packet if SQ_FILL flag is not set and
440 * we are allowed to process the squeue. The SQ_NODRAIN is
441 * ignored if the packet chain consists of more than 1 packet.
443 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
444 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
446 * See if anything is already queued. If we are the
447 * first packet, do inline processing else queue the
448 * packet and do the drain.
450 if (sqp->sq_first == NULL && cnt == 1) {
452 * Fast-path, ok to process and nothing queued.
454 sqp->sq_state |= (SQS_PROC|SQS_FAST);
455 sqp->sq_run = curthread;
456 mutex_exit(&sqp->sq_lock);
459 * We are the chain of 1 packet so
460 * go through this fast path.
462 ASSERT(mp->b_prev != NULL);
463 ASSERT(mp->b_queue != NULL);
464 connp = (conn_t *)mp->b_prev;
465 mp->b_prev = NULL;
466 proc = (sqproc_t)mp->b_queue;
467 mp->b_queue = NULL;
468 ASSERT(proc != NULL && connp != NULL);
469 ASSERT(mp->b_next == NULL);
472 * Handle squeue switching. More details in the
473 * block comment at the top of the file
475 if (connp->conn_sqp == sqp) {
476 SQUEUE_DBG_SET(sqp, mp, proc, connp,
477 tag);
478 connp->conn_on_sqp = B_TRUE;
479 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
480 sqp, mblk_t *, mp, conn_t *, connp);
481 (*proc)(connp, mp, sqp, ira);
482 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
483 sqp, conn_t *, connp);
484 connp->conn_on_sqp = B_FALSE;
485 SQUEUE_DBG_CLEAR(sqp);
486 CONN_DEC_REF(connp);
487 } else {
488 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
489 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
491 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
492 mutex_enter(&sqp->sq_lock);
493 sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
494 sqp->sq_run = NULL;
495 if (sqp->sq_first == NULL ||
496 process_flag == SQ_NODRAIN) {
497 if (sqp->sq_first != NULL) {
498 squeue_worker_wakeup(sqp);
499 return;
502 * We processed inline our packet and nothing
503 * new has arrived. We are done. In case any
504 * control actions are pending, wake up the
505 * worker.
507 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
508 cv_signal(&sqp->sq_worker_cv);
509 mutex_exit(&sqp->sq_lock);
510 return;
512 } else {
513 if (ira != NULL) {
514 mblk_t *attrmp;
516 ASSERT(cnt == 1);
517 attrmp = ip_recv_attr_to_mblk(ira);
518 if (attrmp == NULL) {
519 mutex_exit(&sqp->sq_lock);
520 ip_drop_input("squeue: "
521 "ip_recv_attr_to_mblk",
522 mp, NULL);
523 /* Caller already set b_prev/b_next */
524 mp->b_prev = mp->b_next = NULL;
525 freemsg(mp);
526 return;
528 ASSERT(attrmp->b_cont == NULL);
529 attrmp->b_cont = mp;
530 /* Move connp and func to new */
531 attrmp->b_queue = mp->b_queue;
532 mp->b_queue = NULL;
533 attrmp->b_prev = mp->b_prev;
534 mp->b_prev = NULL;
536 ASSERT(mp == tail);
537 tail = mp = attrmp;
540 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
541 #ifdef DEBUG
542 mp->b_tag = tag;
543 #endif
546 * We are here because either we couldn't do inline
547 * processing (because something was already queued),
548 * or we had a chain of more than one packet,
549 * or something else arrived after we were done with
550 * inline processing.
552 ASSERT(MUTEX_HELD(&sqp->sq_lock));
553 ASSERT(sqp->sq_first != NULL);
554 now = gethrtime();
555 sqp->sq_run = curthread;
556 squeue_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
559 * If we didn't do a complete drain, the worker
560 * thread was already signalled by squeue_drain.
561 * In case any control actions are pending, wake
562 * up the worker.
564 sqp->sq_run = NULL;
565 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
566 cv_signal(&sqp->sq_worker_cv);
567 mutex_exit(&sqp->sq_lock);
568 return;
569 } else {
571 * We let a thread processing a squeue reenter only
572 * once. This helps the case of incoming connection
573 * where a SYN-ACK-ACK that triggers the conn_ind
574 * doesn't have to queue the packet if listener and
575 * eager are on the same squeue. Also helps the
576 * loopback connection where the two ends are bound
577 * to the same squeue (which is typical on single
578 * CPU machines).
580 * We let the thread reenter only once for the fear
581 * of stack getting blown with multiple traversal.
583 connp = (conn_t *)mp->b_prev;
584 if (!(sqp->sq_state & SQS_REENTER) &&
585 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
586 (sqp->sq_run == curthread) && (cnt == 1) &&
587 (connp->conn_on_sqp == B_FALSE)) {
588 sqp->sq_state |= SQS_REENTER;
589 mutex_exit(&sqp->sq_lock);
591 ASSERT(mp->b_prev != NULL);
592 ASSERT(mp->b_queue != NULL);
594 mp->b_prev = NULL;
595 proc = (sqproc_t)mp->b_queue;
596 mp->b_queue = NULL;
599 * Handle squeue switching. More details in the
600 * block comment at the top of the file
602 if (connp->conn_sqp == sqp) {
603 connp->conn_on_sqp = B_TRUE;
604 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
605 sqp, mblk_t *, mp, conn_t *, connp);
606 (*proc)(connp, mp, sqp, ira);
607 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
608 sqp, conn_t *, connp);
609 connp->conn_on_sqp = B_FALSE;
610 CONN_DEC_REF(connp);
611 } else {
612 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
613 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
616 mutex_enter(&sqp->sq_lock);
617 sqp->sq_state &= ~SQS_REENTER;
618 mutex_exit(&sqp->sq_lock);
619 return;
623 * Queue is already being processed or there is already
624 * one or more paquets on the queue. Enqueue the
625 * packet and wakeup the squeue worker thread if the
626 * squeue is not being processed.
628 #ifdef DEBUG
629 mp->b_tag = tag;
630 #endif
631 if (ira != NULL) {
632 mblk_t *attrmp;
634 ASSERT(cnt == 1);
635 attrmp = ip_recv_attr_to_mblk(ira);
636 if (attrmp == NULL) {
637 mutex_exit(&sqp->sq_lock);
638 ip_drop_input("squeue: ip_recv_attr_to_mblk",
639 mp, NULL);
640 /* Caller already set b_prev/b_next */
641 mp->b_prev = mp->b_next = NULL;
642 freemsg(mp);
643 return;
645 ASSERT(attrmp->b_cont == NULL);
646 attrmp->b_cont = mp;
647 /* Move connp and func to new */
648 attrmp->b_queue = mp->b_queue;
649 mp->b_queue = NULL;
650 attrmp->b_prev = mp->b_prev;
651 mp->b_prev = NULL;
653 ASSERT(mp == tail);
654 tail = mp = attrmp;
656 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
657 if (!(sqp->sq_state & SQS_PROC)) {
658 squeue_worker_wakeup(sqp);
659 return;
662 * In case any control actions are pending, wake
663 * up the worker.
665 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
666 cv_signal(&sqp->sq_worker_cv);
667 mutex_exit(&sqp->sq_lock);
668 return;
673 * PRIVATE FUNCTIONS
676 static void
677 squeue_fire(void *arg)
679 squeue_t *sqp = arg;
680 uint_t state;
682 mutex_enter(&sqp->sq_lock);
684 state = sqp->sq_state;
685 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
686 mutex_exit(&sqp->sq_lock);
687 return;
690 sqp->sq_tid = 0;
692 * The timeout fired before we got a chance to set it.
693 * Process it anyway but remove the SQS_TMO_PROG so that
694 * the guy trying to set the timeout knows that it has
695 * already been processed.
697 if (state & SQS_TMO_PROG)
698 sqp->sq_state &= ~SQS_TMO_PROG;
700 if (!(state & SQS_PROC)) {
701 sqp->sq_awaken = ddi_get_lbolt();
702 cv_signal(&sqp->sq_worker_cv);
704 mutex_exit(&sqp->sq_lock);
707 static void
708 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
710 mblk_t *mp;
711 mblk_t *head;
712 sqproc_t proc;
713 conn_t *connp;
714 timeout_id_t tid;
715 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
716 hrtime_t now;
717 boolean_t did_wakeup = B_FALSE;
718 boolean_t sq_poll_capable;
719 ip_recv_attr_t *ira, iras;
722 * Before doing any work, check our stack depth; if we're not a
723 * worker thread for this squeue and we're beginning to get tight on
724 * on stack, kick the worker, bump a counter and return.
726 if (proc_type != SQS_WORKER && STACK_BIAS + (uintptr_t)getfp() -
727 (uintptr_t)curthread->t_stkbase < squeue_drain_stack_needed) {
728 ASSERT(mutex_owned(&sqp->sq_lock));
729 sqp->sq_awaken = ddi_get_lbolt();
730 cv_signal(&sqp->sq_worker_cv);
731 squeue_drain_stack_toodeep++;
732 return;
735 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
736 again:
737 ASSERT(mutex_owned(&sqp->sq_lock));
738 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
739 SQS_POLL_QUIESCE_DONE)));
741 head = sqp->sq_first;
742 sqp->sq_first = NULL;
743 sqp->sq_last = NULL;
744 sqp->sq_count = 0;
746 if ((tid = sqp->sq_tid) != 0)
747 sqp->sq_tid = 0;
749 sqp->sq_state |= SQS_PROC | proc_type;
752 * We have backlog built up. Switch to polling mode if the
753 * device underneath allows it. Need to do it so that
754 * more packets don't come in and disturb us (by contending
755 * for sq_lock or higher priority thread preempting us).
757 * The worker thread is allowed to do active polling while we
758 * just disable the interrupts for drain by non worker (kernel
759 * or userland) threads so they can peacefully process the
760 * packets during time allocated to them.
762 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
763 mutex_exit(&sqp->sq_lock);
765 if (tid != 0)
766 (void) untimeout(tid);
768 while ((mp = head) != NULL) {
770 head = mp->b_next;
771 mp->b_next = NULL;
773 proc = (sqproc_t)mp->b_queue;
774 mp->b_queue = NULL;
775 connp = (conn_t *)mp->b_prev;
776 mp->b_prev = NULL;
778 /* Is there an ip_recv_attr_t to handle? */
779 if (ip_recv_attr_is_mblk(mp)) {
780 mblk_t *attrmp = mp;
782 ASSERT(attrmp->b_cont != NULL);
784 mp = attrmp->b_cont;
785 attrmp->b_cont = NULL;
786 ASSERT(mp->b_queue == NULL);
787 ASSERT(mp->b_prev == NULL);
789 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
790 /* The ill or ip_stack_t disappeared on us */
791 ip_drop_input("ip_recv_attr_from_mblk",
792 mp, NULL);
793 ira_cleanup(&iras, B_TRUE);
794 CONN_DEC_REF(connp);
795 continue;
797 ira = &iras;
798 } else {
799 ira = NULL;
804 * Handle squeue switching. More details in the
805 * block comment at the top of the file
807 if (connp->conn_sqp == sqp) {
808 SQUEUE_DBG_SET(sqp, mp, proc, connp,
809 mp->b_tag);
810 connp->conn_on_sqp = B_TRUE;
811 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
812 sqp, mblk_t *, mp, conn_t *, connp);
813 (*proc)(connp, mp, sqp, ira);
814 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
815 sqp, conn_t *, connp);
816 connp->conn_on_sqp = B_FALSE;
817 CONN_DEC_REF(connp);
818 } else {
819 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
820 SQ_FILL, SQTAG_SQUEUE_CHANGE);
822 if (ira != NULL)
823 ira_cleanup(ira, B_TRUE);
826 SQUEUE_DBG_CLEAR(sqp);
828 mutex_enter(&sqp->sq_lock);
831 * Check if there is still work to do (either more arrived or timer
832 * expired). If we are the worker thread and we are polling capable,
833 * continue doing the work since no one else is around to do the
834 * work anyway (but signal the poll thread to retrieve some packets
835 * in the meanwhile). If we are not the worker thread, just
836 * signal the worker thread to take up the work if processing time
837 * has expired.
839 if (sqp->sq_first != NULL) {
841 * Still more to process. If time quanta not expired, we
842 * should let the drain go on. The worker thread is allowed
843 * to drain as long as there is anything left.
845 now = gethrtime();
846 if ((now < expire) || (proc_type == SQS_WORKER)) {
848 * If time not expired or we are worker thread and
849 * this squeue is polling capable, continue to do
850 * the drain.
852 * We turn off interrupts for all userland threads
853 * doing drain but we do active polling only for
854 * worker thread.
856 * Calling SQS_POLL_RING() even in the case of
857 * SQS_POLLING_ON() not succeeding is ok as
858 * SQS_POLL_RING() will not wake up poll thread
859 * if SQS_POLLING bit is not set.
861 if (proc_type == SQS_WORKER)
862 SQS_POLL_RING(sqp);
863 goto again;
864 } else {
865 did_wakeup = B_TRUE;
866 sqp->sq_awaken = ddi_get_lbolt();
867 cv_signal(&sqp->sq_worker_cv);
872 * If the poll thread is already running, just return. The
873 * poll thread continues to hold the proc and will finish
874 * processing.
876 if (sqp->sq_state & SQS_GET_PKTS) {
877 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
878 SQS_POLL_QUIESCE_DONE)));
879 sqp->sq_state &= ~proc_type;
880 return;
885 * If we are the worker thread and no work is left, send the poll
886 * thread down once more to see if something arrived. Otherwise,
887 * turn the interrupts back on and we are done.
889 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
891 * Do one last check to see if anything arrived
892 * in the NIC. We leave the SQS_PROC set to ensure
893 * that poll thread keeps the PROC and can decide
894 * if it needs to turn polling off or continue
895 * processing.
897 * If we drop the SQS_PROC here and poll thread comes
898 * up empty handed, it can not safely turn polling off
899 * since someone else could have acquired the PROC
900 * and started draining. The previously running poll
901 * thread and the current thread doing drain would end
902 * up in a race for turning polling on/off and more
903 * complex code would be required to deal with it.
905 * Its lot simpler for drain to hand the SQS_PROC to
906 * poll thread (if running) and let poll thread finish
907 * without worrying about racing with any other thread.
909 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
910 SQS_POLL_QUIESCE_DONE)));
911 SQS_POLL_RING(sqp);
912 sqp->sq_state &= ~proc_type;
913 } else {
915 * The squeue is either not capable of polling or the
916 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
917 * unsuccessful or poll thread already finished
918 * processing and didn't find anything. Since there
919 * is nothing queued and we already turn polling on
920 * (for all threads doing drain), we should turn
921 * polling off and relinquish the PROC.
923 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
924 SQS_POLL_QUIESCE_DONE)));
925 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
926 sqp->sq_state &= ~(SQS_PROC | proc_type);
927 if (!did_wakeup && sqp->sq_first != NULL) {
928 squeue_worker_wakeup(sqp);
929 mutex_enter(&sqp->sq_lock);
932 * If we are not the worker and there is a pending quiesce
933 * event, wake up the worker
935 if ((proc_type != SQS_WORKER) &&
936 (sqp->sq_state & SQS_WORKER_THR_CONTROL))
937 cv_signal(&sqp->sq_worker_cv);
942 * Quiesce, Restart, or Cleanup of the squeue poll thread.
944 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
945 * not attempt to poll the underlying soft ring any more. The quiesce is
946 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
947 * control operations such as changing the fanout of a NIC or VNIC (dladm
948 * setlinkprop) need to quiesce data flow before changing the wiring.
949 * The operation is done by the mac layer, but it calls back into IP to
950 * quiesce the soft ring. After completing the operation (say increase or
951 * decrease of the fanout) the mac layer then calls back into IP to restart
952 * the quiesced soft ring.
954 * Cleanup: This is triggered when the squeue binding to a soft ring is
955 * removed permanently. Typically interface plumb and unplumb would trigger
956 * this. It can also be triggered from the mac layer when a soft ring is
957 * being deleted say as the result of a fanout reduction. Since squeues are
958 * never deleted, the cleanup marks the squeue as fit for recycling and
959 * moves it to the zeroth squeue set.
961 static void
962 squeue_poll_thr_control(squeue_t *sqp)
964 if (sqp->sq_state & SQS_POLL_THR_RESTART) {
965 /* Restart implies a previous quiesce */
966 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
967 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
968 SQS_POLL_THR_RESTART);
969 sqp->sq_state |= SQS_POLL_CAPAB;
970 cv_signal(&sqp->sq_worker_cv);
971 return;
974 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
975 sqp->sq_state |= SQS_POLL_THR_QUIESCED;
976 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
977 cv_signal(&sqp->sq_worker_cv);
978 return;
983 * POLLING Notes
985 * With polling mode, we want to do as much processing as we possibly can
986 * in worker thread context. The sweet spot is worker thread keeps doing
987 * work all the time in polling mode and writers etc. keep dumping packets
988 * to worker thread. Occassionally, we send the poll thread (running at
989 * lower priority to NIC to get the chain of packets to feed to worker).
990 * Sending the poll thread down to NIC is dependant on 3 criterions
992 * 1) Its always driven from squeue_drain and only if worker thread is
993 * doing the drain.
994 * 2) We clear the backlog once and more packets arrived in between.
995 * Before starting drain again, send the poll thread down if
996 * the drain is being done by worker thread.
997 * 3) Before exiting the squeue_drain, if the poll thread is not already
998 * working and we are the worker thread, try to poll one more time.
1000 * For latency sake, we do allow any thread calling squeue_enter
1001 * to process its packet provided:
1003 * 1) Nothing is queued
1004 * 2) If more packets arrived in between, the non worker thread are allowed
1005 * to do the drain till their time quanta expired provided SQS_GET_PKTS
1006 * wasn't set in between.
1008 * Avoiding deadlocks with interrupts
1009 * ==================================
1011 * One of the big problem is that we can't send poll_thr down while holding
1012 * the sq_lock since the thread can block. So we drop the sq_lock before
1013 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
1014 * poll thread is running so that no other thread can acquire the
1015 * perimeter in between. If the squeue_drain gets done (no more work
1016 * left), it leaves the SQS_PROC set if poll thread is running.
1020 * This is the squeue poll thread. In poll mode, it polls the underlying
1021 * TCP softring and feeds packets into the squeue. The worker thread then
1022 * drains the squeue. The poll thread also responds to control signals for
1023 * quiesceing, restarting, or cleanup of an squeue. These are driven by
1024 * control operations like plumb/unplumb or as a result of dynamic Rx ring
1025 * related operations that are driven from the mac layer.
1027 static void
1028 squeue_polling_thread(squeue_t *sqp)
1030 kmutex_t *lock = &sqp->sq_lock;
1031 kcondvar_t *async = &sqp->sq_poll_cv;
1032 ip_mac_rx_t sq_get_pkts;
1033 ip_accept_t ip_accept;
1034 ill_rx_ring_t *sq_rx_ring;
1035 ill_t *sq_ill;
1036 mblk_t *head, *tail, *mp;
1037 uint_t cnt;
1038 void *sq_mac_handle;
1039 callb_cpr_t cprinfo;
1040 size_t bytes_to_pickup;
1041 uint32_t ctl_state;
1043 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
1044 mutex_enter(lock);
1046 for (;;) {
1047 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1048 cv_wait(async, lock);
1049 CALLB_CPR_SAFE_END(&cprinfo, lock);
1051 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
1052 SQS_POLL_THR_QUIESCED);
1053 if (ctl_state != 0) {
1055 * If the squeue is quiesced, then wait for a control
1056 * request. A quiesced squeue must not poll the
1057 * underlying soft ring.
1059 if (ctl_state == SQS_POLL_THR_QUIESCED)
1060 continue;
1062 * Act on control requests to quiesce, cleanup or
1063 * restart an squeue
1065 squeue_poll_thr_control(sqp);
1066 continue;
1069 if (!(sqp->sq_state & SQS_POLL_CAPAB))
1070 continue;
1072 ASSERT((sqp->sq_state &
1073 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1074 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1076 poll_again:
1077 sq_rx_ring = sqp->sq_rx_ring;
1078 sq_get_pkts = sq_rx_ring->rr_rx;
1079 sq_mac_handle = sq_rx_ring->rr_rx_handle;
1080 ip_accept = sq_rx_ring->rr_ip_accept;
1081 sq_ill = sq_rx_ring->rr_ill;
1082 bytes_to_pickup = MAX_BYTES_TO_PICKUP;
1083 mutex_exit(lock);
1084 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
1085 mp = NULL;
1086 if (head != NULL) {
1088 * We got the packet chain from the mac layer. It
1089 * would be nice to be able to process it inline
1090 * for better performance but we need to give
1091 * IP a chance to look at this chain to ensure
1092 * that packets are really meant for this squeue
1093 * and do the IP processing.
1095 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
1096 &tail, &cnt);
1098 mutex_enter(lock);
1099 if (mp != NULL) {
1101 * The ip_accept function has already added an
1102 * ip_recv_attr_t mblk if that is needed.
1104 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
1106 ASSERT((sqp->sq_state &
1107 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1108 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1110 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1112 * We have packets to process and worker thread
1113 * is not running. Check to see if poll thread is
1114 * allowed to process. Let it do processing only if it
1115 * picked up some packets from the NIC otherwise
1116 * wakeup the worker thread.
1118 if (mp != NULL) {
1119 hrtime_t now;
1121 now = gethrtime();
1122 sqp->sq_run = curthread;
1123 squeue_drain(sqp, SQS_POLL_PROC, now +
1124 squeue_drain_ns);
1125 sqp->sq_run = NULL;
1127 if (sqp->sq_first == NULL)
1128 goto poll_again;
1131 * Couldn't do the entire drain because the
1132 * time limit expired, let the
1133 * worker thread take over.
1137 sqp->sq_awaken = ddi_get_lbolt();
1139 * Put the SQS_PROC_HELD on so the worker
1140 * thread can distinguish where its called from. We
1141 * can remove the SQS_PROC flag here and turn off the
1142 * polling so that it wouldn't matter who gets the
1143 * processing but we get better performance this way
1144 * and save the cost of turn polling off and possibly
1145 * on again as soon as we start draining again.
1147 * We can't remove the SQS_PROC flag without turning
1148 * polling off until we can guarantee that control
1149 * will return to squeue_drain immediately.
1151 sqp->sq_state |= SQS_PROC_HELD;
1152 sqp->sq_state &= ~SQS_GET_PKTS;
1153 cv_signal(&sqp->sq_worker_cv);
1154 } else if (sqp->sq_first == NULL &&
1155 !(sqp->sq_state & SQS_WORKER)) {
1157 * Nothing queued and worker thread not running.
1158 * Since we hold the proc, no other thread is
1159 * processing the squeue. This means that there
1160 * is no work to be done and nothing is queued
1161 * in squeue or in NIC. Turn polling off and go
1162 * back to interrupt mode.
1164 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1165 /* LINTED: constant in conditional context */
1166 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1169 * If there is a pending control operation
1170 * wake up the worker, since it is currently
1171 * not running.
1173 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1174 cv_signal(&sqp->sq_worker_cv);
1175 } else {
1177 * Worker thread is already running. We don't need
1178 * to do anything. Indicate that poll thread is done.
1180 sqp->sq_state &= ~SQS_GET_PKTS;
1182 if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1184 * Act on control requests to quiesce, cleanup or
1185 * restart an squeue
1187 squeue_poll_thr_control(sqp);
1193 * The squeue worker thread acts on any control requests to quiesce, cleanup
1194 * or restart an ill_rx_ring_t by calling this function. The worker thread
1195 * synchronizes with the squeue poll thread to complete the request and finally
1196 * wakes up the requestor when the request is completed.
1198 static void
1199 squeue_worker_thr_control(squeue_t *sqp)
1201 ill_t *ill;
1202 ill_rx_ring_t *rx_ring;
1204 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1206 if (sqp->sq_state & SQS_POLL_RESTART) {
1207 /* Restart implies a previous quiesce. */
1208 ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1209 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1210 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1212 * Request the squeue poll thread to restart and wait till
1213 * it actually restarts.
1215 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1216 sqp->sq_state |= SQS_POLL_THR_RESTART;
1217 cv_signal(&sqp->sq_poll_cv);
1218 while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1219 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1220 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1221 SQS_WORKER);
1223 * Signal any waiter that is waiting for the restart
1224 * to complete
1226 sqp->sq_state |= SQS_POLL_RESTART_DONE;
1227 cv_signal(&sqp->sq_ctrlop_done_cv);
1228 return;
1231 if (sqp->sq_state & SQS_PROC_HELD) {
1232 /* The squeue poll thread handed control to us */
1233 ASSERT(sqp->sq_state & SQS_PROC);
1237 * Prevent any other thread from processing the squeue
1238 * until we finish the control actions by setting SQS_PROC.
1239 * But allow ourself to reenter by setting SQS_WORKER
1241 sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1243 /* Signal the squeue poll thread and wait for it to quiesce itself */
1244 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1245 sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1246 cv_signal(&sqp->sq_poll_cv);
1247 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1248 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1251 rx_ring = sqp->sq_rx_ring;
1252 ill = rx_ring->rr_ill;
1254 * The lock hierarchy is as follows.
1255 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1257 mutex_exit(&sqp->sq_lock);
1258 mutex_enter(&ill->ill_lock);
1259 mutex_enter(&sqp->sq_lock);
1261 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1262 sqp->sq_rx_ring);
1263 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1264 if (sqp->sq_state & SQS_POLL_CLEANUP) {
1266 * Disassociate this squeue from its ill_rx_ring_t.
1267 * The rr_sqp, sq_rx_ring fields are protected by the
1268 * corresponding squeue, ill_lock* and sq_lock. Holding any
1269 * of them will ensure that the ring to squeue mapping does
1270 * not change.
1272 ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1274 sqp->sq_rx_ring = NULL;
1275 rx_ring->rr_sqp = NULL;
1277 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1278 SQS_POLL_QUIESCE_DONE);
1279 sqp->sq_ill = NULL;
1281 rx_ring->rr_rx_handle = NULL;
1282 rx_ring->rr_intr_handle = NULL;
1283 rx_ring->rr_intr_enable = NULL;
1284 rx_ring->rr_intr_disable = NULL;
1285 sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1286 } else {
1287 sqp->sq_state &= ~SQS_POLL_QUIESCE;
1288 sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1291 * Signal any waiter that is waiting for the quiesce or cleanup
1292 * to complete and also wait for it to actually see and reset the
1293 * SQS_POLL_CLEANUP_DONE.
1295 cv_signal(&sqp->sq_ctrlop_done_cv);
1296 mutex_exit(&ill->ill_lock);
1297 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1298 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1299 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1303 static void
1304 squeue_worker(squeue_t *sqp)
1306 kmutex_t *lock = &sqp->sq_lock;
1307 kcondvar_t *async = &sqp->sq_worker_cv;
1308 callb_cpr_t cprinfo;
1309 hrtime_t now;
1311 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1312 mutex_enter(lock);
1314 for (;;) {
1315 for (;;) {
1317 * If the poll thread has handed control to us
1318 * we need to break out of the wait.
1320 if (sqp->sq_state & SQS_PROC_HELD)
1321 break;
1324 * If the squeue is not being processed and we either
1325 * have messages to drain or some thread has signaled
1326 * some control activity we need to break
1328 if (!(sqp->sq_state & SQS_PROC) &&
1329 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1330 (sqp->sq_first != NULL)))
1331 break;
1334 * If we have started some control action, then check
1335 * for the SQS_WORKER flag (since we don't
1336 * release the squeue) to make sure we own the squeue
1337 * and break out
1339 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1340 (sqp->sq_state & SQS_WORKER))
1341 break;
1343 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1344 cv_wait(async, lock);
1345 CALLB_CPR_SAFE_END(&cprinfo, lock);
1347 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1348 squeue_worker_thr_control(sqp);
1349 continue;
1351 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1352 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1353 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1355 if (sqp->sq_state & SQS_PROC_HELD)
1356 sqp->sq_state &= ~SQS_PROC_HELD;
1358 now = gethrtime();
1359 sqp->sq_run = curthread;
1360 squeue_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
1361 sqp->sq_run = NULL;
1365 uintptr_t *
1366 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1368 ASSERT(p < SQPRIVATE_MAX);
1370 return (&sqp->sq_private[p]);
1373 /* ARGSUSED */
1374 void
1375 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1377 conn_t *connp = (conn_t *)arg;
1378 squeue_t *sqp = connp->conn_sqp;
1381 * Mark the squeue as paused before waking up the thread stuck
1382 * in squeue_synch_enter().
1384 mutex_enter(&sqp->sq_lock);
1385 sqp->sq_state |= SQS_PAUSE;
1388 * Notify the thread that it's OK to proceed; that is done by
1389 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1391 ASSERT(mp->b_flag & MSGWAITSYNC);
1392 mp->b_flag &= ~MSGWAITSYNC;
1393 cv_broadcast(&connp->conn_sq_cv);
1396 * We are doing something on behalf of another thread, so we have to
1397 * pause and wait until it finishes.
1399 while (sqp->sq_state & SQS_PAUSE) {
1400 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1402 mutex_exit(&sqp->sq_lock);
1406 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1408 squeue_t *sqp;
1410 again:
1411 sqp = connp->conn_sqp;
1413 mutex_enter(&sqp->sq_lock);
1414 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1416 * We are OK to proceed if the squeue is empty, and
1417 * no one owns the squeue.
1419 * The caller won't own the squeue as this is called from the
1420 * application.
1422 ASSERT(sqp->sq_run == NULL);
1424 sqp->sq_state |= SQS_PROC;
1425 sqp->sq_run = curthread;
1426 mutex_exit(&sqp->sq_lock);
1429 * Handle squeue switching. The conn's squeue can only change
1430 * while there is a thread in the squeue, which is why we do
1431 * the check after entering the squeue. If it has changed, exit
1432 * this squeue and redo everything with the new sqeueue.
1434 if (sqp != connp->conn_sqp) {
1435 mutex_enter(&sqp->sq_lock);
1436 sqp->sq_state &= ~SQS_PROC;
1437 sqp->sq_run = NULL;
1438 mutex_exit(&sqp->sq_lock);
1439 goto again;
1441 #if SQUEUE_DEBUG
1442 sqp->sq_curmp = NULL;
1443 sqp->sq_curproc = NULL;
1444 sqp->sq_connp = connp;
1445 #endif
1446 connp->conn_on_sqp = B_TRUE;
1447 return (0);
1448 } else {
1449 mblk_t *mp;
1451 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1452 if (mp == NULL) {
1453 mutex_exit(&sqp->sq_lock);
1454 return (ENOMEM);
1458 * We mark the mblk as awaiting synchronous squeue access
1459 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1460 * fires, MSGWAITSYNC is cleared, at which point we know we
1461 * have exclusive access.
1463 mp->b_flag |= MSGWAITSYNC;
1465 CONN_INC_REF(connp);
1466 SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1467 ENQUEUE_CHAIN(sqp, mp, mp, 1);
1469 ASSERT(sqp->sq_run != curthread);
1471 /* Wait until the enqueued mblk get processed. */
1472 while (mp->b_flag & MSGWAITSYNC)
1473 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1474 mutex_exit(&sqp->sq_lock);
1476 if (use_mp == NULL)
1477 freeb(mp);
1479 return (0);
1483 void
1484 squeue_synch_exit(conn_t *connp)
1486 squeue_t *sqp = connp->conn_sqp;
1488 mutex_enter(&sqp->sq_lock);
1489 if (sqp->sq_run == curthread) {
1490 ASSERT(sqp->sq_state & SQS_PROC);
1492 sqp->sq_state &= ~SQS_PROC;
1493 sqp->sq_run = NULL;
1494 connp->conn_on_sqp = B_FALSE;
1496 if (sqp->sq_first == NULL) {
1497 mutex_exit(&sqp->sq_lock);
1498 } else {
1500 * If this was a normal thread, then it would
1501 * (most likely) continue processing the pending
1502 * requests. Since the just completed operation
1503 * was executed synchronously, the thread should
1504 * not be delayed. To compensate, wake up the
1505 * worker thread right away when there are outstanding
1506 * requests.
1508 sqp->sq_awaken = ddi_get_lbolt();
1509 cv_signal(&sqp->sq_worker_cv);
1510 mutex_exit(&sqp->sq_lock);
1512 } else {
1514 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1515 * and wake up the squeue owner, such that owner can continue
1516 * processing.
1518 ASSERT(sqp->sq_state & SQS_PAUSE);
1519 sqp->sq_state &= ~SQS_PAUSE;
1521 /* There should be only one thread blocking on sq_synch_cv. */
1522 cv_signal(&sqp->sq_synch_cv);
1523 mutex_exit(&sqp->sq_lock);