4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
26 * Squeues: General purpose serialization mechanism
27 * ------------------------------------------------
32 * This is a general purpose high-performance serialization mechanism
33 * currently used by TCP/IP. It is implement by means of a per CPU queue,
34 * a worker thread and a polling thread with are bound to the CPU
35 * associated with the squeue. The squeue is strictly FIFO for both read
36 * and write side and only one thread can process it at any given time.
37 * The design goal of squeue was to offer a very high degree of
38 * parallelization (on a per H/W execution pipeline basis) with at
41 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
42 * SQUEUE_ENTER() macro as soon as a thread enter the module
43 * from either direction. For each packet, the processing function
44 * and argument is stored in the mblk itself. When the packet is ready
45 * to be processed, the squeue retrieves the stored function and calls
46 * it with the supplied argument and the pointer to the packet itself.
47 * The called function can assume that no other thread is processing
48 * the squeue when it is executing.
50 * Squeue/connection binding:
51 * --------------------------
53 * TCP/IP uses an IP classifier in conjunction with squeue where specific
54 * connections are assigned to specific squeue (based on various policies),
55 * at the connection creation time. Once assigned, the connection to
56 * squeue mapping is never changed and all future packets for that
57 * connection are processed on that squeue. The connection ("conn") to
58 * squeue mapping is stored in "conn_t" member "conn_sqp".
60 * Since the processing of the connection cuts across multiple layers
61 * but still allows packets for different connnection to be processed on
62 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
63 * "Per Connection Vertical Perimeter".
68 * Squeue doesn't necessary processes packets with its own worker thread.
69 * The callers can pick if they just want to queue the packet, process
70 * their packet if nothing is queued or drain and process. The first two
71 * modes are typically employed when the packet was generated while
72 * already doing the processing behind the squeue and last mode (drain
73 * and process) is typically employed when the thread is entering squeue
74 * for the first time. The squeue still imposes a finite time limit
75 * for which a external thread can do processing after which it switches
76 * processing to its own worker thread.
78 * Once created, squeues are never deleted. Hence squeue pointers are
79 * always valid. This means that functions outside the squeue can still
80 * refer safely to conn_sqp and their is no need for ref counts.
82 * Only a thread executing in the squeue can change the squeue of the
83 * connection. It does so by calling a squeue framework function to do this.
84 * After changing the squeue, the thread must leave the squeue. It must not
85 * continue to execute any code that needs squeue protection.
87 * The squeue framework, after entering the squeue, checks if the current
88 * squeue matches the conn_sqp. If the check fails, the packet is delivered
94 * Squeues can control the rate of packet arrival into itself from the
95 * NIC or specific Rx ring within a NIC. As part of capability negotiation
96 * between IP and MAC layer, squeue are created for each TCP soft ring
97 * (or TCP Rx ring - to be implemented in future). As part of this
98 * negotiation, squeues get a cookie for underlying soft ring or Rx
99 * ring, a function to turn off incoming packets and a function to call
100 * to poll for packets. This helps schedule the receive side packet
101 * processing so that queue backlog doesn't build up and packet processing
102 * doesn't keep getting disturbed by high priority interrupts. As part
103 * of this mode, as soon as a backlog starts building, squeue turns off
104 * the interrupts and switches to poll mode. In poll mode, when poll
105 * thread goes down to retrieve packets, it retrieves them in the form of
106 * a chain which improves performance even more. As the squeue/softring
107 * system gets more packets, it gets more efficient by switching to
108 * polling more often and dealing with larger packet chains.
112 #include <sys/types.h>
113 #include <sys/cmn_err.h>
114 #include <sys/debug.h>
115 #include <sys/kmem.h>
116 #include <sys/cpuvar.h>
117 #include <sys/condvar_impl.h>
118 #include <sys/systm.h>
119 #include <sys/callb.h>
122 #include <sys/sunddi.h>
124 #include <inet/ipclassifier.h>
125 #include <inet/udp_impl.h>
127 #include <sys/squeue_impl.h>
129 static void squeue_fire(void *);
130 static void squeue_drain(squeue_t
*, uint_t
, hrtime_t
);
131 static void squeue_worker(squeue_t
*sqp
);
132 static void squeue_polling_thread(squeue_t
*sqp
);
134 kmem_cache_t
*squeue_cache
;
136 #define SQUEUE_MSEC_TO_NSEC 1000000
138 int squeue_drain_ms
= 20;
139 int squeue_workerwait_ms
= 0;
141 /* The values above converted to ticks or nano seconds */
142 static int squeue_drain_ns
= 0;
143 static int squeue_workerwait_tick
= 0;
145 #define MAX_BYTES_TO_PICKUP 150000
147 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
149 * Enqueue our mblk chain. \
151 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
153 if ((sqp)->sq_last != NULL) \
154 (sqp)->sq_last->b_next = (mp); \
156 (sqp)->sq_first = (mp); \
157 (sqp)->sq_last = (tail); \
158 (sqp)->sq_count += (cnt); \
159 ASSERT((sqp)->sq_count > 0); \
160 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
161 mblk_t *, mp, mblk_t *, tail, int, cnt); \
166 * Blank the receive ring (in this case it is the soft ring). When
167 * blanked, the soft ring will not send any more packets up.
168 * Blanking may not succeed when there is a CPU already in the soft
169 * ring sending packets up. In that case, SQS_POLLING will not be
172 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
173 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
174 if (sq_poll_capable) { \
175 ASSERT(rx_ring != NULL); \
176 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
177 if (!(sqp->sq_state & SQS_POLLING)) { \
178 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
179 sqp->sq_state |= SQS_POLLING; \
184 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
185 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
186 if (sq_poll_capable) { \
187 ASSERT(rx_ring != NULL); \
188 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
189 if (sqp->sq_state & SQS_POLLING) { \
190 sqp->sq_state &= ~SQS_POLLING; \
191 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
196 /* Wakeup poll thread only if SQS_POLLING is set */
197 #define SQS_POLL_RING(sqp) { \
198 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
199 if (sqp->sq_state & SQS_POLLING) { \
200 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
201 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
202 sqp->sq_state |= SQS_GET_PKTS; \
203 cv_signal(&sqp->sq_poll_cv); \
209 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
210 (sqp)->sq_curmp = (mp); \
211 (sqp)->sq_curproc = (proc); \
212 (sqp)->sq_connp = (connp); \
213 (mp)->b_tag = (sqp)->sq_tag = (tag); \
216 #define SQUEUE_DBG_CLEAR(sqp) { \
217 (sqp)->sq_curmp = NULL; \
218 (sqp)->sq_curproc = NULL; \
219 (sqp)->sq_connp = NULL; \
222 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
223 #define SQUEUE_DBG_CLEAR(sqp)
229 squeue_cache
= kmem_cache_create("squeue_cache",
230 sizeof (squeue_t
), 64, NULL
, NULL
, NULL
, NULL
, NULL
, 0);
232 squeue_drain_ns
= squeue_drain_ms
* SQUEUE_MSEC_TO_NSEC
;
233 squeue_workerwait_tick
= MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms
);
238 squeue_create(clock_t wait
, pri_t pri
)
240 squeue_t
*sqp
= kmem_cache_alloc(squeue_cache
, KM_SLEEP
);
242 bzero(sqp
, sizeof (squeue_t
));
243 sqp
->sq_bind
= PBIND_NONE
;
244 sqp
->sq_priority
= pri
;
245 sqp
->sq_wait
= MSEC_TO_TICK(wait
);
246 sqp
->sq_worker
= thread_create(NULL
, 0, squeue_worker
,
247 sqp
, 0, &p0
, TS_RUN
, pri
);
249 sqp
->sq_poll_thr
= thread_create(NULL
, 0, squeue_polling_thread
,
250 sqp
, 0, &p0
, TS_RUN
, pri
);
252 sqp
->sq_enter
= squeue_enter
;
253 sqp
->sq_drain
= squeue_drain
;
259 * Bind squeue worker thread to the specified CPU, given by CPU id.
260 * If the CPU id value is -1, bind the worker thread to the value
261 * specified in sq_bind field. If a thread is already bound to a
262 * different CPU, unbind it from the old CPU and bind to the new one.
266 squeue_bind(squeue_t
*sqp
, processorid_t bind
)
268 mutex_enter(&sqp
->sq_lock
);
269 ASSERT(sqp
->sq_bind
!= PBIND_NONE
|| bind
!= PBIND_NONE
);
270 ASSERT(MUTEX_HELD(&cpu_lock
));
272 if (sqp
->sq_state
& SQS_BOUND
) {
273 if (sqp
->sq_bind
== bind
) {
274 mutex_exit(&sqp
->sq_lock
);
277 thread_affinity_clear(sqp
->sq_worker
);
279 sqp
->sq_state
|= SQS_BOUND
;
282 if (bind
!= PBIND_NONE
)
285 thread_affinity_set(sqp
->sq_worker
, sqp
->sq_bind
);
286 mutex_exit(&sqp
->sq_lock
);
290 squeue_unbind(squeue_t
*sqp
)
292 mutex_enter(&sqp
->sq_lock
);
293 if (!(sqp
->sq_state
& SQS_BOUND
)) {
294 mutex_exit(&sqp
->sq_lock
);
298 sqp
->sq_state
&= ~SQS_BOUND
;
299 thread_affinity_clear(sqp
->sq_worker
);
300 mutex_exit(&sqp
->sq_lock
);
304 squeue_worker_wakeup(squeue_t
*sqp
)
306 timeout_id_t tid
= (sqp
)->sq_tid
;
308 ASSERT(MUTEX_HELD(&(sqp
)->sq_lock
));
310 if (sqp
->sq_wait
== 0) {
312 ASSERT(!(sqp
->sq_state
& SQS_TMO_PROG
));
313 sqp
->sq_awaken
= ddi_get_lbolt();
314 cv_signal(&sqp
->sq_worker_cv
);
315 mutex_exit(&sqp
->sq_lock
);
320 * Queue isn't being processed, so take
321 * any post enqueue actions needed before leaving.
325 * Waiting for an enter() to process mblk(s).
327 clock_t now
= ddi_get_lbolt();
328 clock_t waited
= now
- sqp
->sq_awaken
;
330 if (TICK_TO_MSEC(waited
) >= sqp
->sq_wait
) {
332 * Times up and have a worker thread
333 * waiting for work, so schedule it.
336 sqp
->sq_awaken
= now
;
337 cv_signal(&sqp
->sq_worker_cv
);
338 mutex_exit(&sqp
->sq_lock
);
339 (void) untimeout(tid
);
342 mutex_exit(&sqp
->sq_lock
);
344 } else if (sqp
->sq_state
& SQS_TMO_PROG
) {
345 mutex_exit(&sqp
->sq_lock
);
348 clock_t wait
= sqp
->sq_wait
;
350 * Wait up to sqp->sq_wait ms for an
351 * enter() to process this queue. We
352 * don't want to contend on timeout locks
353 * with sq_lock held for performance reasons,
354 * so drop the sq_lock before calling timeout
355 * but we need to check if timeout is required
356 * after re acquiring the sq_lock. Once
357 * the sq_lock is dropped, someone else could
358 * have processed the packet or the timeout could
359 * have already fired.
361 sqp
->sq_state
|= SQS_TMO_PROG
;
362 mutex_exit(&sqp
->sq_lock
);
363 tid
= timeout(squeue_fire
, sqp
, wait
);
364 mutex_enter(&sqp
->sq_lock
);
365 /* Check again if we still need the timeout */
366 if (((sqp
->sq_state
& (SQS_PROC
|SQS_TMO_PROG
)) ==
367 SQS_TMO_PROG
) && (sqp
->sq_tid
== 0) &&
368 (sqp
->sq_first
!= NULL
)) {
369 sqp
->sq_state
&= ~SQS_TMO_PROG
;
371 mutex_exit(&sqp
->sq_lock
);
374 if (sqp
->sq_state
& SQS_TMO_PROG
) {
375 sqp
->sq_state
&= ~SQS_TMO_PROG
;
376 mutex_exit(&sqp
->sq_lock
);
377 (void) untimeout(tid
);
380 * The timer fired before we could
381 * reacquire the sq_lock. squeue_fire
382 * removes the SQS_TMO_PROG flag
383 * and we don't need to do anything
386 mutex_exit(&sqp
->sq_lock
);
391 ASSERT(MUTEX_NOT_HELD(&sqp
->sq_lock
));
395 * squeue_enter() - enter squeue sqp with mblk mp (which can be
396 * a chain), while tail points to the end and cnt in number of
397 * mblks in the chain.
399 * For a chain of single packet (i.e. mp == tail), go through the
400 * fast path if no one is processing the squeue and nothing is queued.
402 * The proc and arg for each mblk is already stored in the mblk in
403 * appropriate places.
405 * The process_flag specifies if we are allowed to process the mblk
406 * and drain in the entering thread context. If process_flag is
407 * SQ_FILL, then we just queue the mblk and return (after signaling
408 * the worker thread if no one else is processing the squeue).
410 * The ira argument can be used when the count is one.
411 * For a chain the caller needs to prepend any needed mblks from
412 * ip_recv_attr_to_mblk().
416 squeue_enter(squeue_t
*sqp
, mblk_t
*mp
, mblk_t
*tail
, uint32_t cnt
,
417 ip_recv_attr_t
*ira
, int process_flag
, uint8_t tag
)
425 ASSERT(tail
!= NULL
);
427 ASSERT(MUTEX_NOT_HELD(&sqp
->sq_lock
));
428 ASSERT(ira
== NULL
|| cnt
== 1);
430 mutex_enter(&sqp
->sq_lock
);
433 * Try to process the packet if SQ_FILL flag is not set and
434 * we are allowed to process the squeue. The SQ_NODRAIN is
435 * ignored if the packet chain consists of more than 1 packet.
437 if (!(sqp
->sq_state
& SQS_PROC
) && ((process_flag
== SQ_PROCESS
) ||
438 (process_flag
== SQ_NODRAIN
&& sqp
->sq_first
== NULL
))) {
440 * See if anything is already queued. If we are the
441 * first packet, do inline processing else queue the
442 * packet and do the drain.
444 if (sqp
->sq_first
== NULL
&& cnt
== 1) {
446 * Fast-path, ok to process and nothing queued.
448 sqp
->sq_state
|= (SQS_PROC
|SQS_FAST
);
449 sqp
->sq_run
= curthread
;
450 mutex_exit(&sqp
->sq_lock
);
453 * We are the chain of 1 packet so
454 * go through this fast path.
456 ASSERT(mp
->b_prev
!= NULL
);
457 ASSERT(mp
->b_queue
!= NULL
);
458 connp
= (conn_t
*)mp
->b_prev
;
460 proc
= (sqproc_t
)mp
->b_queue
;
462 ASSERT(proc
!= NULL
&& connp
!= NULL
);
463 ASSERT(mp
->b_next
== NULL
);
466 * Handle squeue switching. More details in the
467 * block comment at the top of the file
469 if (connp
->conn_sqp
== sqp
) {
470 SQUEUE_DBG_SET(sqp
, mp
, proc
, connp
,
472 connp
->conn_on_sqp
= B_TRUE
;
473 DTRACE_PROBE3(squeue__proc__start
, squeue_t
*,
474 sqp
, mblk_t
*, mp
, conn_t
*, connp
);
475 (*proc
)(connp
, mp
, sqp
, ira
);
476 DTRACE_PROBE2(squeue__proc__end
, squeue_t
*,
477 sqp
, conn_t
*, connp
);
478 connp
->conn_on_sqp
= B_FALSE
;
479 SQUEUE_DBG_CLEAR(sqp
);
482 SQUEUE_ENTER_ONE(connp
->conn_sqp
, mp
, proc
,
483 connp
, ira
, SQ_FILL
, SQTAG_SQUEUE_CHANGE
);
485 ASSERT(MUTEX_NOT_HELD(&sqp
->sq_lock
));
486 mutex_enter(&sqp
->sq_lock
);
487 sqp
->sq_state
&= ~(SQS_PROC
|SQS_FAST
);
489 if (sqp
->sq_first
== NULL
||
490 process_flag
== SQ_NODRAIN
) {
491 if (sqp
->sq_first
!= NULL
) {
492 squeue_worker_wakeup(sqp
);
496 * We processed inline our packet and nothing
497 * new has arrived. We are done. In case any
498 * control actions are pending, wake up the
501 if (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
)
502 cv_signal(&sqp
->sq_worker_cv
);
503 mutex_exit(&sqp
->sq_lock
);
511 attrmp
= ip_recv_attr_to_mblk(ira
);
512 if (attrmp
== NULL
) {
513 mutex_exit(&sqp
->sq_lock
);
514 ip_drop_input("squeue: "
515 "ip_recv_attr_to_mblk",
517 /* Caller already set b_prev/b_next */
518 mp
->b_prev
= mp
->b_next
= NULL
;
522 ASSERT(attrmp
->b_cont
== NULL
);
524 /* Move connp and func to new */
525 attrmp
->b_queue
= mp
->b_queue
;
527 attrmp
->b_prev
= mp
->b_prev
;
534 ENQUEUE_CHAIN(sqp
, mp
, tail
, cnt
);
540 * We are here because either we couldn't do inline
541 * processing (because something was already queued),
542 * or we had a chain of more than one packet,
543 * or something else arrived after we were done with
546 ASSERT(MUTEX_HELD(&sqp
->sq_lock
));
547 ASSERT(sqp
->sq_first
!= NULL
);
549 sqp
->sq_drain(sqp
, SQS_ENTER
, now
+ squeue_drain_ns
);
552 * If we didn't do a complete drain, the worker
553 * thread was already signalled by squeue_drain.
554 * In case any control actions are pending, wake
558 if (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
)
559 cv_signal(&sqp
->sq_worker_cv
);
560 mutex_exit(&sqp
->sq_lock
);
564 * We let a thread processing a squeue reenter only
565 * once. This helps the case of incoming connection
566 * where a SYN-ACK-ACK that triggers the conn_ind
567 * doesn't have to queue the packet if listener and
568 * eager are on the same squeue. Also helps the
569 * loopback connection where the two ends are bound
570 * to the same squeue (which is typical on single
573 * We let the thread reenter only once for the fear
574 * of stack getting blown with multiple traversal.
576 connp
= (conn_t
*)mp
->b_prev
;
577 if (!(sqp
->sq_state
& SQS_REENTER
) &&
578 (process_flag
!= SQ_FILL
) && (sqp
->sq_first
== NULL
) &&
579 (sqp
->sq_run
== curthread
) && (cnt
== 1) &&
580 (connp
->conn_on_sqp
== B_FALSE
)) {
581 sqp
->sq_state
|= SQS_REENTER
;
582 mutex_exit(&sqp
->sq_lock
);
584 ASSERT(mp
->b_prev
!= NULL
);
585 ASSERT(mp
->b_queue
!= NULL
);
588 proc
= (sqproc_t
)mp
->b_queue
;
592 * Handle squeue switching. More details in the
593 * block comment at the top of the file
595 if (connp
->conn_sqp
== sqp
) {
596 connp
->conn_on_sqp
= B_TRUE
;
597 DTRACE_PROBE3(squeue__proc__start
, squeue_t
*,
598 sqp
, mblk_t
*, mp
, conn_t
*, connp
);
599 (*proc
)(connp
, mp
, sqp
, ira
);
600 DTRACE_PROBE2(squeue__proc__end
, squeue_t
*,
601 sqp
, conn_t
*, connp
);
602 connp
->conn_on_sqp
= B_FALSE
;
605 SQUEUE_ENTER_ONE(connp
->conn_sqp
, mp
, proc
,
606 connp
, ira
, SQ_FILL
, SQTAG_SQUEUE_CHANGE
);
609 mutex_enter(&sqp
->sq_lock
);
610 sqp
->sq_state
&= ~SQS_REENTER
;
611 mutex_exit(&sqp
->sq_lock
);
616 * Queue is already being processed or there is already
617 * one or more paquets on the queue. Enqueue the
618 * packet and wakeup the squeue worker thread if the
619 * squeue is not being processed.
628 attrmp
= ip_recv_attr_to_mblk(ira
);
629 if (attrmp
== NULL
) {
630 mutex_exit(&sqp
->sq_lock
);
631 ip_drop_input("squeue: ip_recv_attr_to_mblk",
633 /* Caller already set b_prev/b_next */
634 mp
->b_prev
= mp
->b_next
= NULL
;
638 ASSERT(attrmp
->b_cont
== NULL
);
640 /* Move connp and func to new */
641 attrmp
->b_queue
= mp
->b_queue
;
643 attrmp
->b_prev
= mp
->b_prev
;
649 ENQUEUE_CHAIN(sqp
, mp
, tail
, cnt
);
650 if (!(sqp
->sq_state
& SQS_PROC
)) {
651 squeue_worker_wakeup(sqp
);
655 * In case any control actions are pending, wake
658 if (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
)
659 cv_signal(&sqp
->sq_worker_cv
);
660 mutex_exit(&sqp
->sq_lock
);
670 squeue_fire(void *arg
)
675 mutex_enter(&sqp
->sq_lock
);
677 state
= sqp
->sq_state
;
678 if (sqp
->sq_tid
== 0 && !(state
& SQS_TMO_PROG
)) {
679 mutex_exit(&sqp
->sq_lock
);
685 * The timeout fired before we got a chance to set it.
686 * Process it anyway but remove the SQS_TMO_PROG so that
687 * the guy trying to set the timeout knows that it has
688 * already been processed.
690 if (state
& SQS_TMO_PROG
)
691 sqp
->sq_state
&= ~SQS_TMO_PROG
;
693 if (!(state
& SQS_PROC
)) {
694 sqp
->sq_awaken
= ddi_get_lbolt();
695 cv_signal(&sqp
->sq_worker_cv
);
697 mutex_exit(&sqp
->sq_lock
);
701 squeue_drain(squeue_t
*sqp
, uint_t proc_type
, hrtime_t expire
)
708 ill_rx_ring_t
*sq_rx_ring
= sqp
->sq_rx_ring
;
710 boolean_t did_wakeup
= B_FALSE
;
711 boolean_t sq_poll_capable
;
712 ip_recv_attr_t
*ira
, iras
;
714 sq_poll_capable
= (sqp
->sq_state
& SQS_POLL_CAPAB
) != 0;
716 ASSERT(mutex_owned(&sqp
->sq_lock
));
717 ASSERT(!(sqp
->sq_state
& (SQS_POLL_THR_QUIESCED
|
718 SQS_POLL_QUIESCE_DONE
)));
720 head
= sqp
->sq_first
;
721 sqp
->sq_first
= NULL
;
725 if ((tid
= sqp
->sq_tid
) != 0)
728 sqp
->sq_state
|= SQS_PROC
| proc_type
;
731 * We have backlog built up. Switch to polling mode if the
732 * device underneath allows it. Need to do it so that
733 * more packets don't come in and disturb us (by contending
734 * for sq_lock or higher priority thread preempting us).
736 * The worker thread is allowed to do active polling while we
737 * just disable the interrupts for drain by non worker (kernel
738 * or userland) threads so they can peacefully process the
739 * packets during time allocated to them.
741 SQS_POLLING_ON(sqp
, sq_poll_capable
, sq_rx_ring
);
742 mutex_exit(&sqp
->sq_lock
);
745 (void) untimeout(tid
);
747 while ((mp
= head
) != NULL
) {
752 proc
= (sqproc_t
)mp
->b_queue
;
754 connp
= (conn_t
*)mp
->b_prev
;
757 /* Is there an ip_recv_attr_t to handle? */
758 if (ip_recv_attr_is_mblk(mp
)) {
761 ASSERT(attrmp
->b_cont
!= NULL
);
764 attrmp
->b_cont
= NULL
;
765 ASSERT(mp
->b_queue
== NULL
);
766 ASSERT(mp
->b_prev
== NULL
);
768 if (!ip_recv_attr_from_mblk(attrmp
, &iras
)) {
769 /* The ill or ip_stack_t disappeared on us */
770 ip_drop_input("ip_recv_attr_from_mblk",
772 ira_cleanup(&iras
, B_TRUE
);
783 * Handle squeue switching. More details in the
784 * block comment at the top of the file
786 if (connp
->conn_sqp
== sqp
) {
787 SQUEUE_DBG_SET(sqp
, mp
, proc
, connp
,
789 connp
->conn_on_sqp
= B_TRUE
;
790 DTRACE_PROBE3(squeue__proc__start
, squeue_t
*,
791 sqp
, mblk_t
*, mp
, conn_t
*, connp
);
792 (*proc
)(connp
, mp
, sqp
, ira
);
793 DTRACE_PROBE2(squeue__proc__end
, squeue_t
*,
794 sqp
, conn_t
*, connp
);
795 connp
->conn_on_sqp
= B_FALSE
;
798 SQUEUE_ENTER_ONE(connp
->conn_sqp
, mp
, proc
, connp
, ira
,
799 SQ_FILL
, SQTAG_SQUEUE_CHANGE
);
802 ira_cleanup(ira
, B_TRUE
);
805 SQUEUE_DBG_CLEAR(sqp
);
807 mutex_enter(&sqp
->sq_lock
);
810 * Check if there is still work to do (either more arrived or timer
811 * expired). If we are the worker thread and we are polling capable,
812 * continue doing the work since no one else is around to do the
813 * work anyway (but signal the poll thread to retrieve some packets
814 * in the meanwhile). If we are not the worker thread, just
815 * signal the worker thread to take up the work if processing time
818 if (sqp
->sq_first
!= NULL
) {
820 * Still more to process. If time quanta not expired, we
821 * should let the drain go on. The worker thread is allowed
822 * to drain as long as there is anything left.
825 if ((now
< expire
) || (proc_type
== SQS_WORKER
)) {
827 * If time not expired or we are worker thread and
828 * this squeue is polling capable, continue to do
831 * We turn off interrupts for all userland threads
832 * doing drain but we do active polling only for
835 * Calling SQS_POLL_RING() even in the case of
836 * SQS_POLLING_ON() not succeeding is ok as
837 * SQS_POLL_RING() will not wake up poll thread
838 * if SQS_POLLING bit is not set.
840 if (proc_type
== SQS_WORKER
)
845 sqp
->sq_awaken
= ddi_get_lbolt();
846 cv_signal(&sqp
->sq_worker_cv
);
851 * If the poll thread is already running, just return. The
852 * poll thread continues to hold the proc and will finish
855 if (sqp
->sq_state
& SQS_GET_PKTS
) {
856 ASSERT(!(sqp
->sq_state
& (SQS_POLL_THR_QUIESCED
|
857 SQS_POLL_QUIESCE_DONE
)));
858 sqp
->sq_state
&= ~proc_type
;
864 * If we are the worker thread and no work is left, send the poll
865 * thread down once more to see if something arrived. Otherwise,
866 * turn the interrupts back on and we are done.
868 if ((proc_type
== SQS_WORKER
) && (sqp
->sq_state
& SQS_POLLING
)) {
870 * Do one last check to see if anything arrived
871 * in the NIC. We leave the SQS_PROC set to ensure
872 * that poll thread keeps the PROC and can decide
873 * if it needs to turn polling off or continue
876 * If we drop the SQS_PROC here and poll thread comes
877 * up empty handed, it can not safely turn polling off
878 * since someone else could have acquired the PROC
879 * and started draining. The previously running poll
880 * thread and the current thread doing drain would end
881 * up in a race for turning polling on/off and more
882 * complex code would be required to deal with it.
884 * Its lot simpler for drain to hand the SQS_PROC to
885 * poll thread (if running) and let poll thread finish
886 * without worrying about racing with any other thread.
888 ASSERT(!(sqp
->sq_state
& (SQS_POLL_THR_QUIESCED
|
889 SQS_POLL_QUIESCE_DONE
)));
891 sqp
->sq_state
&= ~proc_type
;
894 * The squeue is either not capable of polling or the
895 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
896 * unsuccessful or poll thread already finished
897 * processing and didn't find anything. Since there
898 * is nothing queued and we already turn polling on
899 * (for all threads doing drain), we should turn
900 * polling off and relinquish the PROC.
902 ASSERT(!(sqp
->sq_state
& (SQS_POLL_THR_QUIESCED
|
903 SQS_POLL_QUIESCE_DONE
)));
904 SQS_POLLING_OFF(sqp
, sq_poll_capable
, sq_rx_ring
);
905 sqp
->sq_state
&= ~(SQS_PROC
| proc_type
);
906 if (!did_wakeup
&& sqp
->sq_first
!= NULL
) {
907 squeue_worker_wakeup(sqp
);
908 mutex_enter(&sqp
->sq_lock
);
911 * If we are not the worker and there is a pending quiesce
912 * event, wake up the worker
914 if ((proc_type
!= SQS_WORKER
) &&
915 (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
))
916 cv_signal(&sqp
->sq_worker_cv
);
921 * Quiesce, Restart, or Cleanup of the squeue poll thread.
923 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
924 * not attempt to poll the underlying soft ring any more. The quiesce is
925 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
926 * control operations such as changing the fanout of a NIC or VNIC (dladm
927 * setlinkprop) need to quiesce data flow before changing the wiring.
928 * The operation is done by the mac layer, but it calls back into IP to
929 * quiesce the soft ring. After completing the operation (say increase or
930 * decrease of the fanout) the mac layer then calls back into IP to restart
931 * the quiesced soft ring.
933 * Cleanup: This is triggered when the squeue binding to a soft ring is
934 * removed permanently. Typically interface plumb and unplumb would trigger
935 * this. It can also be triggered from the mac layer when a soft ring is
936 * being deleted say as the result of a fanout reduction. Since squeues are
937 * never deleted, the cleanup marks the squeue as fit for recycling and
938 * moves it to the zeroth squeue set.
941 squeue_poll_thr_control(squeue_t
*sqp
)
943 if (sqp
->sq_state
& SQS_POLL_THR_RESTART
) {
944 /* Restart implies a previous quiesce */
945 ASSERT(sqp
->sq_state
& SQS_POLL_THR_QUIESCED
);
946 sqp
->sq_state
&= ~(SQS_POLL_THR_QUIESCED
|
947 SQS_POLL_THR_RESTART
);
948 sqp
->sq_state
|= SQS_POLL_CAPAB
;
949 cv_signal(&sqp
->sq_worker_cv
);
953 if (sqp
->sq_state
& SQS_POLL_THR_QUIESCE
) {
954 sqp
->sq_state
|= SQS_POLL_THR_QUIESCED
;
955 sqp
->sq_state
&= ~SQS_POLL_THR_QUIESCE
;
956 cv_signal(&sqp
->sq_worker_cv
);
964 * With polling mode, we want to do as much processing as we possibly can
965 * in worker thread context. The sweet spot is worker thread keeps doing
966 * work all the time in polling mode and writers etc. keep dumping packets
967 * to worker thread. Occassionally, we send the poll thread (running at
968 * lower priority to NIC to get the chain of packets to feed to worker).
969 * Sending the poll thread down to NIC is dependant on 3 criterions
971 * 1) Its always driven from squeue_drain and only if worker thread is
973 * 2) We clear the backlog once and more packets arrived in between.
974 * Before starting drain again, send the poll thread down if
975 * the drain is being done by worker thread.
976 * 3) Before exiting the squeue_drain, if the poll thread is not already
977 * working and we are the worker thread, try to poll one more time.
979 * For latency sake, we do allow any thread calling squeue_enter
980 * to process its packet provided:
982 * 1) Nothing is queued
983 * 2) If more packets arrived in between, the non worker thread are allowed
984 * to do the drain till their time quanta expired provided SQS_GET_PKTS
985 * wasn't set in between.
987 * Avoiding deadlocks with interrupts
988 * ==================================
990 * One of the big problem is that we can't send poll_thr down while holding
991 * the sq_lock since the thread can block. So we drop the sq_lock before
992 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
993 * poll thread is running so that no other thread can acquire the
994 * perimeter in between. If the squeue_drain gets done (no more work
995 * left), it leaves the SQS_PROC set if poll thread is running.
999 * This is the squeue poll thread. In poll mode, it polls the underlying
1000 * TCP softring and feeds packets into the squeue. The worker thread then
1001 * drains the squeue. The poll thread also responds to control signals for
1002 * quiesceing, restarting, or cleanup of an squeue. These are driven by
1003 * control operations like plumb/unplumb or as a result of dynamic Rx ring
1004 * related operations that are driven from the mac layer.
1007 squeue_polling_thread(squeue_t
*sqp
)
1009 kmutex_t
*lock
= &sqp
->sq_lock
;
1010 kcondvar_t
*async
= &sqp
->sq_poll_cv
;
1011 ip_mac_rx_t sq_get_pkts
;
1012 ip_accept_t ip_accept
;
1013 ill_rx_ring_t
*sq_rx_ring
;
1015 mblk_t
*head
, *tail
, *mp
;
1017 void *sq_mac_handle
;
1018 callb_cpr_t cprinfo
;
1019 size_t bytes_to_pickup
;
1022 CALLB_CPR_INIT(&cprinfo
, lock
, callb_generic_cpr
, "sq_poll");
1026 CALLB_CPR_SAFE_BEGIN(&cprinfo
);
1027 cv_wait(async
, lock
);
1028 CALLB_CPR_SAFE_END(&cprinfo
, lock
);
1030 ctl_state
= sqp
->sq_state
& (SQS_POLL_THR_CONTROL
|
1031 SQS_POLL_THR_QUIESCED
);
1032 if (ctl_state
!= 0) {
1034 * If the squeue is quiesced, then wait for a control
1035 * request. A quiesced squeue must not poll the
1036 * underlying soft ring.
1038 if (ctl_state
== SQS_POLL_THR_QUIESCED
)
1041 * Act on control requests to quiesce, cleanup or
1044 squeue_poll_thr_control(sqp
);
1048 if (!(sqp
->sq_state
& SQS_POLL_CAPAB
))
1051 ASSERT((sqp
->sq_state
&
1052 (SQS_PROC
|SQS_POLLING
|SQS_GET_PKTS
)) ==
1053 (SQS_PROC
|SQS_POLLING
|SQS_GET_PKTS
));
1056 sq_rx_ring
= sqp
->sq_rx_ring
;
1057 sq_get_pkts
= sq_rx_ring
->rr_rx
;
1058 sq_mac_handle
= sq_rx_ring
->rr_rx_handle
;
1059 ip_accept
= sq_rx_ring
->rr_ip_accept
;
1060 sq_ill
= sq_rx_ring
->rr_ill
;
1061 bytes_to_pickup
= MAX_BYTES_TO_PICKUP
;
1063 head
= sq_get_pkts(sq_mac_handle
, bytes_to_pickup
);
1067 * We got the packet chain from the mac layer. It
1068 * would be nice to be able to process it inline
1069 * for better performance but we need to give
1070 * IP a chance to look at this chain to ensure
1071 * that packets are really meant for this squeue
1072 * and do the IP processing.
1074 mp
= ip_accept(sq_ill
, sq_rx_ring
, sqp
, head
,
1080 * The ip_accept function has already added an
1081 * ip_recv_attr_t mblk if that is needed.
1083 ENQUEUE_CHAIN(sqp
, mp
, tail
, cnt
);
1085 ASSERT((sqp
->sq_state
&
1086 (SQS_PROC
|SQS_POLLING
|SQS_GET_PKTS
)) ==
1087 (SQS_PROC
|SQS_POLLING
|SQS_GET_PKTS
));
1089 if (sqp
->sq_first
!= NULL
&& !(sqp
->sq_state
& SQS_WORKER
)) {
1091 * We have packets to process and worker thread
1092 * is not running. Check to see if poll thread is
1093 * allowed to process. Let it do processing only if it
1094 * picked up some packets from the NIC otherwise
1095 * wakeup the worker thread.
1101 sqp
->sq_run
= curthread
;
1102 sqp
->sq_drain(sqp
, SQS_POLL_PROC
, now
+
1106 if (sqp
->sq_first
== NULL
)
1110 * Couldn't do the entire drain because the
1111 * time limit expired, let the
1112 * worker thread take over.
1116 sqp
->sq_awaken
= ddi_get_lbolt();
1118 * Put the SQS_PROC_HELD on so the worker
1119 * thread can distinguish where its called from. We
1120 * can remove the SQS_PROC flag here and turn off the
1121 * polling so that it wouldn't matter who gets the
1122 * processing but we get better performance this way
1123 * and save the cost of turn polling off and possibly
1124 * on again as soon as we start draining again.
1126 * We can't remove the SQS_PROC flag without turning
1127 * polling off until we can guarantee that control
1128 * will return to squeue_drain immediately.
1130 sqp
->sq_state
|= SQS_PROC_HELD
;
1131 sqp
->sq_state
&= ~SQS_GET_PKTS
;
1132 cv_signal(&sqp
->sq_worker_cv
);
1133 } else if (sqp
->sq_first
== NULL
&&
1134 !(sqp
->sq_state
& SQS_WORKER
)) {
1136 * Nothing queued and worker thread not running.
1137 * Since we hold the proc, no other thread is
1138 * processing the squeue. This means that there
1139 * is no work to be done and nothing is queued
1140 * in squeue or in NIC. Turn polling off and go
1141 * back to interrupt mode.
1143 sqp
->sq_state
&= ~(SQS_PROC
|SQS_GET_PKTS
);
1144 /* LINTED: constant in conditional context */
1145 SQS_POLLING_OFF(sqp
, B_TRUE
, sq_rx_ring
);
1148 * If there is a pending control operation
1149 * wake up the worker, since it is currently
1152 if (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
)
1153 cv_signal(&sqp
->sq_worker_cv
);
1156 * Worker thread is already running. We don't need
1157 * to do anything. Indicate that poll thread is done.
1159 sqp
->sq_state
&= ~SQS_GET_PKTS
;
1161 if (sqp
->sq_state
& SQS_POLL_THR_CONTROL
) {
1163 * Act on control requests to quiesce, cleanup or
1166 squeue_poll_thr_control(sqp
);
1172 * The squeue worker thread acts on any control requests to quiesce, cleanup
1173 * or restart an ill_rx_ring_t by calling this function. The worker thread
1174 * synchronizes with the squeue poll thread to complete the request and finally
1175 * wakes up the requestor when the request is completed.
1178 squeue_worker_thr_control(squeue_t
*sqp
)
1181 ill_rx_ring_t
*rx_ring
;
1183 ASSERT(MUTEX_HELD(&sqp
->sq_lock
));
1185 if (sqp
->sq_state
& SQS_POLL_RESTART
) {
1186 /* Restart implies a previous quiesce. */
1187 ASSERT((sqp
->sq_state
& (SQS_PROC_HELD
|
1188 SQS_POLL_QUIESCE_DONE
| SQS_PROC
| SQS_WORKER
)) ==
1189 (SQS_POLL_QUIESCE_DONE
| SQS_PROC
| SQS_WORKER
));
1191 * Request the squeue poll thread to restart and wait till
1192 * it actually restarts.
1194 sqp
->sq_state
&= ~SQS_POLL_QUIESCE_DONE
;
1195 sqp
->sq_state
|= SQS_POLL_THR_RESTART
;
1196 cv_signal(&sqp
->sq_poll_cv
);
1197 while (sqp
->sq_state
& SQS_POLL_THR_QUIESCED
)
1198 cv_wait(&sqp
->sq_worker_cv
, &sqp
->sq_lock
);
1199 sqp
->sq_state
&= ~(SQS_POLL_RESTART
| SQS_PROC
|
1202 * Signal any waiter that is waiting for the restart
1205 sqp
->sq_state
|= SQS_POLL_RESTART_DONE
;
1206 cv_signal(&sqp
->sq_ctrlop_done_cv
);
1210 if (sqp
->sq_state
& SQS_PROC_HELD
) {
1211 /* The squeue poll thread handed control to us */
1212 ASSERT(sqp
->sq_state
& SQS_PROC
);
1216 * Prevent any other thread from processing the squeue
1217 * until we finish the control actions by setting SQS_PROC.
1218 * But allow ourself to reenter by setting SQS_WORKER
1220 sqp
->sq_state
|= (SQS_PROC
| SQS_WORKER
);
1222 /* Signal the squeue poll thread and wait for it to quiesce itself */
1223 if (!(sqp
->sq_state
& SQS_POLL_THR_QUIESCED
)) {
1224 sqp
->sq_state
|= SQS_POLL_THR_QUIESCE
;
1225 cv_signal(&sqp
->sq_poll_cv
);
1226 while (!(sqp
->sq_state
& SQS_POLL_THR_QUIESCED
))
1227 cv_wait(&sqp
->sq_worker_cv
, &sqp
->sq_lock
);
1230 rx_ring
= sqp
->sq_rx_ring
;
1231 ill
= rx_ring
->rr_ill
;
1233 * The lock hierarchy is as follows.
1234 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1236 mutex_exit(&sqp
->sq_lock
);
1237 mutex_enter(&ill
->ill_lock
);
1238 mutex_enter(&sqp
->sq_lock
);
1240 SQS_POLLING_OFF(sqp
, (sqp
->sq_state
& SQS_POLL_CAPAB
) != 0,
1242 sqp
->sq_state
&= ~(SQS_POLL_CAPAB
| SQS_GET_PKTS
| SQS_PROC_HELD
);
1243 if (sqp
->sq_state
& SQS_POLL_CLEANUP
) {
1245 * Disassociate this squeue from its ill_rx_ring_t.
1246 * The rr_sqp, sq_rx_ring fields are protected by the
1247 * corresponding squeue, ill_lock* and sq_lock. Holding any
1248 * of them will ensure that the ring to squeue mapping does
1251 ASSERT(!(sqp
->sq_state
& SQS_DEFAULT
));
1253 sqp
->sq_rx_ring
= NULL
;
1254 rx_ring
->rr_sqp
= NULL
;
1256 sqp
->sq_state
&= ~(SQS_POLL_CLEANUP
| SQS_POLL_THR_QUIESCED
|
1257 SQS_POLL_QUIESCE_DONE
);
1260 rx_ring
->rr_rx_handle
= NULL
;
1261 rx_ring
->rr_intr_handle
= NULL
;
1262 rx_ring
->rr_intr_enable
= NULL
;
1263 rx_ring
->rr_intr_disable
= NULL
;
1264 sqp
->sq_state
|= SQS_POLL_CLEANUP_DONE
;
1266 sqp
->sq_state
&= ~SQS_POLL_QUIESCE
;
1267 sqp
->sq_state
|= SQS_POLL_QUIESCE_DONE
;
1270 * Signal any waiter that is waiting for the quiesce or cleanup
1271 * to complete and also wait for it to actually see and reset the
1272 * SQS_POLL_CLEANUP_DONE.
1274 cv_signal(&sqp
->sq_ctrlop_done_cv
);
1275 mutex_exit(&ill
->ill_lock
);
1276 if (sqp
->sq_state
& SQS_POLL_CLEANUP_DONE
) {
1277 cv_wait(&sqp
->sq_worker_cv
, &sqp
->sq_lock
);
1278 sqp
->sq_state
&= ~(SQS_PROC
| SQS_WORKER
);
1283 squeue_worker(squeue_t
*sqp
)
1285 kmutex_t
*lock
= &sqp
->sq_lock
;
1286 kcondvar_t
*async
= &sqp
->sq_worker_cv
;
1287 callb_cpr_t cprinfo
;
1290 CALLB_CPR_INIT(&cprinfo
, lock
, callb_generic_cpr
, "sq_worker");
1296 * If the poll thread has handed control to us
1297 * we need to break out of the wait.
1299 if (sqp
->sq_state
& SQS_PROC_HELD
)
1303 * If the squeue is not being processed and we either
1304 * have messages to drain or some thread has signaled
1305 * some control activity we need to break
1307 if (!(sqp
->sq_state
& SQS_PROC
) &&
1308 ((sqp
->sq_state
& SQS_WORKER_THR_CONTROL
) ||
1309 (sqp
->sq_first
!= NULL
)))
1313 * If we have started some control action, then check
1314 * for the SQS_WORKER flag (since we don't
1315 * release the squeue) to make sure we own the squeue
1318 if ((sqp
->sq_state
& SQS_WORKER_THR_CONTROL
) &&
1319 (sqp
->sq_state
& SQS_WORKER
))
1322 CALLB_CPR_SAFE_BEGIN(&cprinfo
);
1323 cv_wait(async
, lock
);
1324 CALLB_CPR_SAFE_END(&cprinfo
, lock
);
1326 if (sqp
->sq_state
& SQS_WORKER_THR_CONTROL
) {
1327 squeue_worker_thr_control(sqp
);
1330 ASSERT(!(sqp
->sq_state
& (SQS_POLL_THR_QUIESCED
|
1331 SQS_POLL_CLEANUP_DONE
| SQS_POLL_QUIESCE_DONE
|
1332 SQS_WORKER_THR_CONTROL
| SQS_POLL_THR_CONTROL
)));
1334 if (sqp
->sq_state
& SQS_PROC_HELD
)
1335 sqp
->sq_state
&= ~SQS_PROC_HELD
;
1338 sqp
->sq_run
= curthread
;
1339 sqp
->sq_drain(sqp
, SQS_WORKER
, now
+ squeue_drain_ns
);
1345 squeue_getprivate(squeue_t
*sqp
, sqprivate_t p
)
1347 ASSERT(p
< SQPRIVATE_MAX
);
1349 return (&sqp
->sq_private
[p
]);
1354 squeue_wakeup_conn(void *arg
, mblk_t
*mp
, void *arg2
, ip_recv_attr_t
*dummy
)
1356 conn_t
*connp
= (conn_t
*)arg
;
1357 squeue_t
*sqp
= connp
->conn_sqp
;
1360 * Mark the squeue as paused before waking up the thread stuck
1361 * in squeue_synch_enter().
1363 mutex_enter(&sqp
->sq_lock
);
1364 sqp
->sq_state
|= SQS_PAUSE
;
1367 * Notify the thread that it's OK to proceed; that is done by
1368 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1370 ASSERT(mp
->b_flag
& MSGWAITSYNC
);
1371 mp
->b_flag
&= ~MSGWAITSYNC
;
1372 cv_broadcast(&connp
->conn_sq_cv
);
1375 * We are doing something on behalf of another thread, so we have to
1376 * pause and wait until it finishes.
1378 while (sqp
->sq_state
& SQS_PAUSE
) {
1379 cv_wait(&sqp
->sq_synch_cv
, &sqp
->sq_lock
);
1381 mutex_exit(&sqp
->sq_lock
);
1385 squeue_synch_enter(conn_t
*connp
, mblk_t
*use_mp
)
1390 sqp
= connp
->conn_sqp
;
1392 mutex_enter(&sqp
->sq_lock
);
1393 if (sqp
->sq_first
== NULL
&& !(sqp
->sq_state
& SQS_PROC
)) {
1395 * We are OK to proceed if the squeue is empty, and
1396 * no one owns the squeue.
1398 * The caller won't own the squeue as this is called from the
1401 ASSERT(sqp
->sq_run
== NULL
);
1403 sqp
->sq_state
|= SQS_PROC
;
1404 sqp
->sq_run
= curthread
;
1405 mutex_exit(&sqp
->sq_lock
);
1408 * Handle squeue switching. The conn's squeue can only change
1409 * while there is a thread in the squeue, which is why we do
1410 * the check after entering the squeue. If it has changed, exit
1411 * this squeue and redo everything with the new sqeueue.
1413 if (sqp
!= connp
->conn_sqp
) {
1414 mutex_enter(&sqp
->sq_lock
);
1415 sqp
->sq_state
&= ~SQS_PROC
;
1417 mutex_exit(&sqp
->sq_lock
);
1421 sqp
->sq_curmp
= NULL
;
1422 sqp
->sq_curproc
= NULL
;
1423 sqp
->sq_connp
= connp
;
1425 connp
->conn_on_sqp
= B_TRUE
;
1430 mp
= (use_mp
== NULL
) ? allocb(0, BPRI_MED
) : use_mp
;
1432 mutex_exit(&sqp
->sq_lock
);
1437 * We mark the mblk as awaiting synchronous squeue access
1438 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1439 * fires, MSGWAITSYNC is cleared, at which point we know we
1440 * have exclusive access.
1442 mp
->b_flag
|= MSGWAITSYNC
;
1444 CONN_INC_REF(connp
);
1445 SET_SQUEUE(mp
, squeue_wakeup_conn
, connp
);
1446 ENQUEUE_CHAIN(sqp
, mp
, mp
, 1);
1448 ASSERT(sqp
->sq_run
!= curthread
);
1450 /* Wait until the enqueued mblk get processed. */
1451 while (mp
->b_flag
& MSGWAITSYNC
)
1452 cv_wait(&connp
->conn_sq_cv
, &sqp
->sq_lock
);
1453 mutex_exit(&sqp
->sq_lock
);
1463 squeue_synch_exit(conn_t
*connp
)
1465 squeue_t
*sqp
= connp
->conn_sqp
;
1467 mutex_enter(&sqp
->sq_lock
);
1468 if (sqp
->sq_run
== curthread
) {
1469 ASSERT(sqp
->sq_state
& SQS_PROC
);
1471 sqp
->sq_state
&= ~SQS_PROC
;
1473 connp
->conn_on_sqp
= B_FALSE
;
1475 if (sqp
->sq_first
== NULL
) {
1476 mutex_exit(&sqp
->sq_lock
);
1479 * If this was a normal thread, then it would
1480 * (most likely) continue processing the pending
1481 * requests. Since the just completed operation
1482 * was executed synchronously, the thread should
1483 * not be delayed. To compensate, wake up the
1484 * worker thread right away when there are outstanding
1487 sqp
->sq_awaken
= ddi_get_lbolt();
1488 cv_signal(&sqp
->sq_worker_cv
);
1489 mutex_exit(&sqp
->sq_lock
);
1493 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1494 * and wake up the squeue owner, such that owner can continue
1497 ASSERT(sqp
->sq_state
& SQS_PAUSE
);
1498 sqp
->sq_state
&= ~SQS_PAUSE
;
1500 /* There should be only one thread blocking on sq_synch_cv. */
1501 cv_signal(&sqp
->sq_synch_cv
);
1502 mutex_exit(&sqp
->sq_lock
);