3925 IP DCE does not scale
[illumos-gate.git] / usr / src / uts / common / inet / squeue.c
blob4108931c367efdd2815b5c839373ec8791fda6ec
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
26 * Squeues: General purpose serialization mechanism
27 * ------------------------------------------------
29 * Background:
30 * -----------
32 * This is a general purpose high-performance serialization mechanism
33 * currently used by TCP/IP. It is implement by means of a per CPU queue,
34 * a worker thread and a polling thread with are bound to the CPU
35 * associated with the squeue. The squeue is strictly FIFO for both read
36 * and write side and only one thread can process it at any given time.
37 * The design goal of squeue was to offer a very high degree of
38 * parallelization (on a per H/W execution pipeline basis) with at
39 * most one queuing.
41 * The modules needing protection typically calls SQUEUE_ENTER_ONE() or
42 * SQUEUE_ENTER() macro as soon as a thread enter the module
43 * from either direction. For each packet, the processing function
44 * and argument is stored in the mblk itself. When the packet is ready
45 * to be processed, the squeue retrieves the stored function and calls
46 * it with the supplied argument and the pointer to the packet itself.
47 * The called function can assume that no other thread is processing
48 * the squeue when it is executing.
50 * Squeue/connection binding:
51 * --------------------------
53 * TCP/IP uses an IP classifier in conjunction with squeue where specific
54 * connections are assigned to specific squeue (based on various policies),
55 * at the connection creation time. Once assigned, the connection to
56 * squeue mapping is never changed and all future packets for that
57 * connection are processed on that squeue. The connection ("conn") to
58 * squeue mapping is stored in "conn_t" member "conn_sqp".
60 * Since the processing of the connection cuts across multiple layers
61 * but still allows packets for different connnection to be processed on
62 * other CPU/squeues, squeues are also termed as "Vertical Perimeter" or
63 * "Per Connection Vertical Perimeter".
65 * Processing Model:
66 * -----------------
68 * Squeue doesn't necessary processes packets with its own worker thread.
69 * The callers can pick if they just want to queue the packet, process
70 * their packet if nothing is queued or drain and process. The first two
71 * modes are typically employed when the packet was generated while
72 * already doing the processing behind the squeue and last mode (drain
73 * and process) is typically employed when the thread is entering squeue
74 * for the first time. The squeue still imposes a finite time limit
75 * for which a external thread can do processing after which it switches
76 * processing to its own worker thread.
78 * Once created, squeues are never deleted. Hence squeue pointers are
79 * always valid. This means that functions outside the squeue can still
80 * refer safely to conn_sqp and their is no need for ref counts.
82 * Only a thread executing in the squeue can change the squeue of the
83 * connection. It does so by calling a squeue framework function to do this.
84 * After changing the squeue, the thread must leave the squeue. It must not
85 * continue to execute any code that needs squeue protection.
87 * The squeue framework, after entering the squeue, checks if the current
88 * squeue matches the conn_sqp. If the check fails, the packet is delivered
89 * to right squeue.
91 * Polling Model:
92 * --------------
94 * Squeues can control the rate of packet arrival into itself from the
95 * NIC or specific Rx ring within a NIC. As part of capability negotiation
96 * between IP and MAC layer, squeue are created for each TCP soft ring
97 * (or TCP Rx ring - to be implemented in future). As part of this
98 * negotiation, squeues get a cookie for underlying soft ring or Rx
99 * ring, a function to turn off incoming packets and a function to call
100 * to poll for packets. This helps schedule the receive side packet
101 * processing so that queue backlog doesn't build up and packet processing
102 * doesn't keep getting disturbed by high priority interrupts. As part
103 * of this mode, as soon as a backlog starts building, squeue turns off
104 * the interrupts and switches to poll mode. In poll mode, when poll
105 * thread goes down to retrieve packets, it retrieves them in the form of
106 * a chain which improves performance even more. As the squeue/softring
107 * system gets more packets, it gets more efficient by switching to
108 * polling more often and dealing with larger packet chains.
112 #include <sys/types.h>
113 #include <sys/cmn_err.h>
114 #include <sys/debug.h>
115 #include <sys/kmem.h>
116 #include <sys/cpuvar.h>
117 #include <sys/condvar_impl.h>
118 #include <sys/systm.h>
119 #include <sys/callb.h>
120 #include <sys/sdt.h>
121 #include <sys/ddi.h>
122 #include <sys/sunddi.h>
124 #include <inet/ipclassifier.h>
125 #include <inet/udp_impl.h>
127 #include <sys/squeue_impl.h>
129 static void squeue_fire(void *);
130 static void squeue_drain(squeue_t *, uint_t, hrtime_t);
131 static void squeue_worker(squeue_t *sqp);
132 static void squeue_polling_thread(squeue_t *sqp);
134 kmem_cache_t *squeue_cache;
136 #define SQUEUE_MSEC_TO_NSEC 1000000
138 int squeue_drain_ms = 20;
139 int squeue_workerwait_ms = 0;
141 /* The values above converted to ticks or nano seconds */
142 static int squeue_drain_ns = 0;
143 static int squeue_workerwait_tick = 0;
145 #define MAX_BYTES_TO_PICKUP 150000
147 #define ENQUEUE_CHAIN(sqp, mp, tail, cnt) { \
148 /* \
149 * Enqueue our mblk chain. \
150 */ \
151 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
153 if ((sqp)->sq_last != NULL) \
154 (sqp)->sq_last->b_next = (mp); \
155 else \
156 (sqp)->sq_first = (mp); \
157 (sqp)->sq_last = (tail); \
158 (sqp)->sq_count += (cnt); \
159 ASSERT((sqp)->sq_count > 0); \
160 DTRACE_PROBE4(squeue__enqueuechain, squeue_t *, sqp, \
161 mblk_t *, mp, mblk_t *, tail, int, cnt); \
166 * Blank the receive ring (in this case it is the soft ring). When
167 * blanked, the soft ring will not send any more packets up.
168 * Blanking may not succeed when there is a CPU already in the soft
169 * ring sending packets up. In that case, SQS_POLLING will not be
170 * set.
172 #define SQS_POLLING_ON(sqp, sq_poll_capable, rx_ring) { \
173 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
174 if (sq_poll_capable) { \
175 ASSERT(rx_ring != NULL); \
176 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
177 if (!(sqp->sq_state & SQS_POLLING)) { \
178 if (rx_ring->rr_intr_disable(rx_ring->rr_intr_handle)) \
179 sqp->sq_state |= SQS_POLLING; \
184 #define SQS_POLLING_OFF(sqp, sq_poll_capable, rx_ring) { \
185 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
186 if (sq_poll_capable) { \
187 ASSERT(rx_ring != NULL); \
188 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
189 if (sqp->sq_state & SQS_POLLING) { \
190 sqp->sq_state &= ~SQS_POLLING; \
191 rx_ring->rr_intr_enable(rx_ring->rr_intr_handle); \
196 /* Wakeup poll thread only if SQS_POLLING is set */
197 #define SQS_POLL_RING(sqp) { \
198 ASSERT(MUTEX_HELD(&(sqp)->sq_lock)); \
199 if (sqp->sq_state & SQS_POLLING) { \
200 ASSERT(sqp->sq_state & SQS_POLL_CAPAB); \
201 if (!(sqp->sq_state & SQS_GET_PKTS)) { \
202 sqp->sq_state |= SQS_GET_PKTS; \
203 cv_signal(&sqp->sq_poll_cv); \
208 #ifdef DEBUG
209 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag) { \
210 (sqp)->sq_curmp = (mp); \
211 (sqp)->sq_curproc = (proc); \
212 (sqp)->sq_connp = (connp); \
213 (mp)->b_tag = (sqp)->sq_tag = (tag); \
216 #define SQUEUE_DBG_CLEAR(sqp) { \
217 (sqp)->sq_curmp = NULL; \
218 (sqp)->sq_curproc = NULL; \
219 (sqp)->sq_connp = NULL; \
221 #else
222 #define SQUEUE_DBG_SET(sqp, mp, proc, connp, tag)
223 #define SQUEUE_DBG_CLEAR(sqp)
224 #endif
226 void
227 squeue_init(void)
229 squeue_cache = kmem_cache_create("squeue_cache",
230 sizeof (squeue_t), 64, NULL, NULL, NULL, NULL, NULL, 0);
232 squeue_drain_ns = squeue_drain_ms * SQUEUE_MSEC_TO_NSEC;
233 squeue_workerwait_tick = MSEC_TO_TICK_ROUNDUP(squeue_workerwait_ms);
236 /* ARGSUSED */
237 squeue_t *
238 squeue_create(clock_t wait, pri_t pri)
240 squeue_t *sqp = kmem_cache_alloc(squeue_cache, KM_SLEEP);
242 bzero(sqp, sizeof (squeue_t));
243 sqp->sq_bind = PBIND_NONE;
244 sqp->sq_priority = pri;
245 sqp->sq_wait = MSEC_TO_TICK(wait);
246 sqp->sq_worker = thread_create(NULL, 0, squeue_worker,
247 sqp, 0, &p0, TS_RUN, pri);
249 sqp->sq_poll_thr = thread_create(NULL, 0, squeue_polling_thread,
250 sqp, 0, &p0, TS_RUN, pri);
252 sqp->sq_enter = squeue_enter;
253 sqp->sq_drain = squeue_drain;
255 return (sqp);
259 * Bind squeue worker thread to the specified CPU, given by CPU id.
260 * If the CPU id value is -1, bind the worker thread to the value
261 * specified in sq_bind field. If a thread is already bound to a
262 * different CPU, unbind it from the old CPU and bind to the new one.
265 void
266 squeue_bind(squeue_t *sqp, processorid_t bind)
268 mutex_enter(&sqp->sq_lock);
269 ASSERT(sqp->sq_bind != PBIND_NONE || bind != PBIND_NONE);
270 ASSERT(MUTEX_HELD(&cpu_lock));
272 if (sqp->sq_state & SQS_BOUND) {
273 if (sqp->sq_bind == bind) {
274 mutex_exit(&sqp->sq_lock);
275 return;
277 thread_affinity_clear(sqp->sq_worker);
278 } else {
279 sqp->sq_state |= SQS_BOUND;
282 if (bind != PBIND_NONE)
283 sqp->sq_bind = bind;
285 thread_affinity_set(sqp->sq_worker, sqp->sq_bind);
286 mutex_exit(&sqp->sq_lock);
289 void
290 squeue_unbind(squeue_t *sqp)
292 mutex_enter(&sqp->sq_lock);
293 if (!(sqp->sq_state & SQS_BOUND)) {
294 mutex_exit(&sqp->sq_lock);
295 return;
298 sqp->sq_state &= ~SQS_BOUND;
299 thread_affinity_clear(sqp->sq_worker);
300 mutex_exit(&sqp->sq_lock);
303 void
304 squeue_worker_wakeup(squeue_t *sqp)
306 timeout_id_t tid = (sqp)->sq_tid;
308 ASSERT(MUTEX_HELD(&(sqp)->sq_lock));
310 if (sqp->sq_wait == 0) {
311 ASSERT(tid == 0);
312 ASSERT(!(sqp->sq_state & SQS_TMO_PROG));
313 sqp->sq_awaken = ddi_get_lbolt();
314 cv_signal(&sqp->sq_worker_cv);
315 mutex_exit(&sqp->sq_lock);
316 return;
320 * Queue isn't being processed, so take
321 * any post enqueue actions needed before leaving.
323 if (tid != 0) {
325 * Waiting for an enter() to process mblk(s).
327 clock_t now = ddi_get_lbolt();
328 clock_t waited = now - sqp->sq_awaken;
330 if (TICK_TO_MSEC(waited) >= sqp->sq_wait) {
332 * Times up and have a worker thread
333 * waiting for work, so schedule it.
335 sqp->sq_tid = 0;
336 sqp->sq_awaken = now;
337 cv_signal(&sqp->sq_worker_cv);
338 mutex_exit(&sqp->sq_lock);
339 (void) untimeout(tid);
340 return;
342 mutex_exit(&sqp->sq_lock);
343 return;
344 } else if (sqp->sq_state & SQS_TMO_PROG) {
345 mutex_exit(&sqp->sq_lock);
346 return;
347 } else {
348 clock_t wait = sqp->sq_wait;
350 * Wait up to sqp->sq_wait ms for an
351 * enter() to process this queue. We
352 * don't want to contend on timeout locks
353 * with sq_lock held for performance reasons,
354 * so drop the sq_lock before calling timeout
355 * but we need to check if timeout is required
356 * after re acquiring the sq_lock. Once
357 * the sq_lock is dropped, someone else could
358 * have processed the packet or the timeout could
359 * have already fired.
361 sqp->sq_state |= SQS_TMO_PROG;
362 mutex_exit(&sqp->sq_lock);
363 tid = timeout(squeue_fire, sqp, wait);
364 mutex_enter(&sqp->sq_lock);
365 /* Check again if we still need the timeout */
366 if (((sqp->sq_state & (SQS_PROC|SQS_TMO_PROG)) ==
367 SQS_TMO_PROG) && (sqp->sq_tid == 0) &&
368 (sqp->sq_first != NULL)) {
369 sqp->sq_state &= ~SQS_TMO_PROG;
370 sqp->sq_tid = tid;
371 mutex_exit(&sqp->sq_lock);
372 return;
373 } else {
374 if (sqp->sq_state & SQS_TMO_PROG) {
375 sqp->sq_state &= ~SQS_TMO_PROG;
376 mutex_exit(&sqp->sq_lock);
377 (void) untimeout(tid);
378 } else {
380 * The timer fired before we could
381 * reacquire the sq_lock. squeue_fire
382 * removes the SQS_TMO_PROG flag
383 * and we don't need to do anything
384 * else.
386 mutex_exit(&sqp->sq_lock);
391 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
395 * squeue_enter() - enter squeue sqp with mblk mp (which can be
396 * a chain), while tail points to the end and cnt in number of
397 * mblks in the chain.
399 * For a chain of single packet (i.e. mp == tail), go through the
400 * fast path if no one is processing the squeue and nothing is queued.
402 * The proc and arg for each mblk is already stored in the mblk in
403 * appropriate places.
405 * The process_flag specifies if we are allowed to process the mblk
406 * and drain in the entering thread context. If process_flag is
407 * SQ_FILL, then we just queue the mblk and return (after signaling
408 * the worker thread if no one else is processing the squeue).
410 * The ira argument can be used when the count is one.
411 * For a chain the caller needs to prepend any needed mblks from
412 * ip_recv_attr_to_mblk().
414 /* ARGSUSED */
415 void
416 squeue_enter(squeue_t *sqp, mblk_t *mp, mblk_t *tail, uint32_t cnt,
417 ip_recv_attr_t *ira, int process_flag, uint8_t tag)
419 conn_t *connp;
420 sqproc_t proc;
421 hrtime_t now;
423 ASSERT(sqp != NULL);
424 ASSERT(mp != NULL);
425 ASSERT(tail != NULL);
426 ASSERT(cnt > 0);
427 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
428 ASSERT(ira == NULL || cnt == 1);
430 mutex_enter(&sqp->sq_lock);
433 * Try to process the packet if SQ_FILL flag is not set and
434 * we are allowed to process the squeue. The SQ_NODRAIN is
435 * ignored if the packet chain consists of more than 1 packet.
437 if (!(sqp->sq_state & SQS_PROC) && ((process_flag == SQ_PROCESS) ||
438 (process_flag == SQ_NODRAIN && sqp->sq_first == NULL))) {
440 * See if anything is already queued. If we are the
441 * first packet, do inline processing else queue the
442 * packet and do the drain.
444 if (sqp->sq_first == NULL && cnt == 1) {
446 * Fast-path, ok to process and nothing queued.
448 sqp->sq_state |= (SQS_PROC|SQS_FAST);
449 sqp->sq_run = curthread;
450 mutex_exit(&sqp->sq_lock);
453 * We are the chain of 1 packet so
454 * go through this fast path.
456 ASSERT(mp->b_prev != NULL);
457 ASSERT(mp->b_queue != NULL);
458 connp = (conn_t *)mp->b_prev;
459 mp->b_prev = NULL;
460 proc = (sqproc_t)mp->b_queue;
461 mp->b_queue = NULL;
462 ASSERT(proc != NULL && connp != NULL);
463 ASSERT(mp->b_next == NULL);
466 * Handle squeue switching. More details in the
467 * block comment at the top of the file
469 if (connp->conn_sqp == sqp) {
470 SQUEUE_DBG_SET(sqp, mp, proc, connp,
471 tag);
472 connp->conn_on_sqp = B_TRUE;
473 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
474 sqp, mblk_t *, mp, conn_t *, connp);
475 (*proc)(connp, mp, sqp, ira);
476 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
477 sqp, conn_t *, connp);
478 connp->conn_on_sqp = B_FALSE;
479 SQUEUE_DBG_CLEAR(sqp);
480 CONN_DEC_REF(connp);
481 } else {
482 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
483 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
485 ASSERT(MUTEX_NOT_HELD(&sqp->sq_lock));
486 mutex_enter(&sqp->sq_lock);
487 sqp->sq_state &= ~(SQS_PROC|SQS_FAST);
488 sqp->sq_run = NULL;
489 if (sqp->sq_first == NULL ||
490 process_flag == SQ_NODRAIN) {
491 if (sqp->sq_first != NULL) {
492 squeue_worker_wakeup(sqp);
493 return;
496 * We processed inline our packet and nothing
497 * new has arrived. We are done. In case any
498 * control actions are pending, wake up the
499 * worker.
501 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
502 cv_signal(&sqp->sq_worker_cv);
503 mutex_exit(&sqp->sq_lock);
504 return;
506 } else {
507 if (ira != NULL) {
508 mblk_t *attrmp;
510 ASSERT(cnt == 1);
511 attrmp = ip_recv_attr_to_mblk(ira);
512 if (attrmp == NULL) {
513 mutex_exit(&sqp->sq_lock);
514 ip_drop_input("squeue: "
515 "ip_recv_attr_to_mblk",
516 mp, NULL);
517 /* Caller already set b_prev/b_next */
518 mp->b_prev = mp->b_next = NULL;
519 freemsg(mp);
520 return;
522 ASSERT(attrmp->b_cont == NULL);
523 attrmp->b_cont = mp;
524 /* Move connp and func to new */
525 attrmp->b_queue = mp->b_queue;
526 mp->b_queue = NULL;
527 attrmp->b_prev = mp->b_prev;
528 mp->b_prev = NULL;
530 ASSERT(mp == tail);
531 tail = mp = attrmp;
534 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
535 #ifdef DEBUG
536 mp->b_tag = tag;
537 #endif
540 * We are here because either we couldn't do inline
541 * processing (because something was already queued),
542 * or we had a chain of more than one packet,
543 * or something else arrived after we were done with
544 * inline processing.
546 ASSERT(MUTEX_HELD(&sqp->sq_lock));
547 ASSERT(sqp->sq_first != NULL);
548 now = gethrtime();
549 sqp->sq_run = curthread;
550 sqp->sq_drain(sqp, SQS_ENTER, now + squeue_drain_ns);
553 * If we didn't do a complete drain, the worker
554 * thread was already signalled by squeue_drain.
555 * In case any control actions are pending, wake
556 * up the worker.
558 sqp->sq_run = NULL;
559 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
560 cv_signal(&sqp->sq_worker_cv);
561 mutex_exit(&sqp->sq_lock);
562 return;
563 } else {
565 * We let a thread processing a squeue reenter only
566 * once. This helps the case of incoming connection
567 * where a SYN-ACK-ACK that triggers the conn_ind
568 * doesn't have to queue the packet if listener and
569 * eager are on the same squeue. Also helps the
570 * loopback connection where the two ends are bound
571 * to the same squeue (which is typical on single
572 * CPU machines).
574 * We let the thread reenter only once for the fear
575 * of stack getting blown with multiple traversal.
577 connp = (conn_t *)mp->b_prev;
578 if (!(sqp->sq_state & SQS_REENTER) &&
579 (process_flag != SQ_FILL) && (sqp->sq_first == NULL) &&
580 (sqp->sq_run == curthread) && (cnt == 1) &&
581 (connp->conn_on_sqp == B_FALSE)) {
582 sqp->sq_state |= SQS_REENTER;
583 mutex_exit(&sqp->sq_lock);
585 ASSERT(mp->b_prev != NULL);
586 ASSERT(mp->b_queue != NULL);
588 mp->b_prev = NULL;
589 proc = (sqproc_t)mp->b_queue;
590 mp->b_queue = NULL;
593 * Handle squeue switching. More details in the
594 * block comment at the top of the file
596 if (connp->conn_sqp == sqp) {
597 connp->conn_on_sqp = B_TRUE;
598 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
599 sqp, mblk_t *, mp, conn_t *, connp);
600 (*proc)(connp, mp, sqp, ira);
601 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
602 sqp, conn_t *, connp);
603 connp->conn_on_sqp = B_FALSE;
604 CONN_DEC_REF(connp);
605 } else {
606 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc,
607 connp, ira, SQ_FILL, SQTAG_SQUEUE_CHANGE);
610 mutex_enter(&sqp->sq_lock);
611 sqp->sq_state &= ~SQS_REENTER;
612 mutex_exit(&sqp->sq_lock);
613 return;
617 * Queue is already being processed or there is already
618 * one or more paquets on the queue. Enqueue the
619 * packet and wakeup the squeue worker thread if the
620 * squeue is not being processed.
622 #ifdef DEBUG
623 mp->b_tag = tag;
624 #endif
625 if (ira != NULL) {
626 mblk_t *attrmp;
628 ASSERT(cnt == 1);
629 attrmp = ip_recv_attr_to_mblk(ira);
630 if (attrmp == NULL) {
631 mutex_exit(&sqp->sq_lock);
632 ip_drop_input("squeue: ip_recv_attr_to_mblk",
633 mp, NULL);
634 /* Caller already set b_prev/b_next */
635 mp->b_prev = mp->b_next = NULL;
636 freemsg(mp);
637 return;
639 ASSERT(attrmp->b_cont == NULL);
640 attrmp->b_cont = mp;
641 /* Move connp and func to new */
642 attrmp->b_queue = mp->b_queue;
643 mp->b_queue = NULL;
644 attrmp->b_prev = mp->b_prev;
645 mp->b_prev = NULL;
647 ASSERT(mp == tail);
648 tail = mp = attrmp;
650 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
651 if (!(sqp->sq_state & SQS_PROC)) {
652 squeue_worker_wakeup(sqp);
653 return;
656 * In case any control actions are pending, wake
657 * up the worker.
659 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
660 cv_signal(&sqp->sq_worker_cv);
661 mutex_exit(&sqp->sq_lock);
662 return;
667 * PRIVATE FUNCTIONS
670 static void
671 squeue_fire(void *arg)
673 squeue_t *sqp = arg;
674 uint_t state;
676 mutex_enter(&sqp->sq_lock);
678 state = sqp->sq_state;
679 if (sqp->sq_tid == 0 && !(state & SQS_TMO_PROG)) {
680 mutex_exit(&sqp->sq_lock);
681 return;
684 sqp->sq_tid = 0;
686 * The timeout fired before we got a chance to set it.
687 * Process it anyway but remove the SQS_TMO_PROG so that
688 * the guy trying to set the timeout knows that it has
689 * already been processed.
691 if (state & SQS_TMO_PROG)
692 sqp->sq_state &= ~SQS_TMO_PROG;
694 if (!(state & SQS_PROC)) {
695 sqp->sq_awaken = ddi_get_lbolt();
696 cv_signal(&sqp->sq_worker_cv);
698 mutex_exit(&sqp->sq_lock);
701 static void
702 squeue_drain(squeue_t *sqp, uint_t proc_type, hrtime_t expire)
704 mblk_t *mp;
705 mblk_t *head;
706 sqproc_t proc;
707 conn_t *connp;
708 timeout_id_t tid;
709 ill_rx_ring_t *sq_rx_ring = sqp->sq_rx_ring;
710 hrtime_t now;
711 boolean_t did_wakeup = B_FALSE;
712 boolean_t sq_poll_capable;
713 ip_recv_attr_t *ira, iras;
715 sq_poll_capable = (sqp->sq_state & SQS_POLL_CAPAB) != 0;
716 again:
717 ASSERT(mutex_owned(&sqp->sq_lock));
718 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
719 SQS_POLL_QUIESCE_DONE)));
721 head = sqp->sq_first;
722 sqp->sq_first = NULL;
723 sqp->sq_last = NULL;
724 sqp->sq_count = 0;
726 if ((tid = sqp->sq_tid) != 0)
727 sqp->sq_tid = 0;
729 sqp->sq_state |= SQS_PROC | proc_type;
732 * We have backlog built up. Switch to polling mode if the
733 * device underneath allows it. Need to do it so that
734 * more packets don't come in and disturb us (by contending
735 * for sq_lock or higher priority thread preempting us).
737 * The worker thread is allowed to do active polling while we
738 * just disable the interrupts for drain by non worker (kernel
739 * or userland) threads so they can peacefully process the
740 * packets during time allocated to them.
742 SQS_POLLING_ON(sqp, sq_poll_capable, sq_rx_ring);
743 mutex_exit(&sqp->sq_lock);
745 if (tid != 0)
746 (void) untimeout(tid);
748 while ((mp = head) != NULL) {
750 head = mp->b_next;
751 mp->b_next = NULL;
753 proc = (sqproc_t)mp->b_queue;
754 mp->b_queue = NULL;
755 connp = (conn_t *)mp->b_prev;
756 mp->b_prev = NULL;
758 /* Is there an ip_recv_attr_t to handle? */
759 if (ip_recv_attr_is_mblk(mp)) {
760 mblk_t *attrmp = mp;
762 ASSERT(attrmp->b_cont != NULL);
764 mp = attrmp->b_cont;
765 attrmp->b_cont = NULL;
766 ASSERT(mp->b_queue == NULL);
767 ASSERT(mp->b_prev == NULL);
769 if (!ip_recv_attr_from_mblk(attrmp, &iras)) {
770 /* The ill or ip_stack_t disappeared on us */
771 ip_drop_input("ip_recv_attr_from_mblk",
772 mp, NULL);
773 ira_cleanup(&iras, B_TRUE);
774 CONN_DEC_REF(connp);
775 continue;
777 ira = &iras;
778 } else {
779 ira = NULL;
784 * Handle squeue switching. More details in the
785 * block comment at the top of the file
787 if (connp->conn_sqp == sqp) {
788 SQUEUE_DBG_SET(sqp, mp, proc, connp,
789 mp->b_tag);
790 connp->conn_on_sqp = B_TRUE;
791 DTRACE_PROBE3(squeue__proc__start, squeue_t *,
792 sqp, mblk_t *, mp, conn_t *, connp);
793 (*proc)(connp, mp, sqp, ira);
794 DTRACE_PROBE2(squeue__proc__end, squeue_t *,
795 sqp, conn_t *, connp);
796 connp->conn_on_sqp = B_FALSE;
797 CONN_DEC_REF(connp);
798 } else {
799 SQUEUE_ENTER_ONE(connp->conn_sqp, mp, proc, connp, ira,
800 SQ_FILL, SQTAG_SQUEUE_CHANGE);
802 if (ira != NULL)
803 ira_cleanup(ira, B_TRUE);
806 SQUEUE_DBG_CLEAR(sqp);
808 mutex_enter(&sqp->sq_lock);
811 * Check if there is still work to do (either more arrived or timer
812 * expired). If we are the worker thread and we are polling capable,
813 * continue doing the work since no one else is around to do the
814 * work anyway (but signal the poll thread to retrieve some packets
815 * in the meanwhile). If we are not the worker thread, just
816 * signal the worker thread to take up the work if processing time
817 * has expired.
819 if (sqp->sq_first != NULL) {
821 * Still more to process. If time quanta not expired, we
822 * should let the drain go on. The worker thread is allowed
823 * to drain as long as there is anything left.
825 now = gethrtime();
826 if ((now < expire) || (proc_type == SQS_WORKER)) {
828 * If time not expired or we are worker thread and
829 * this squeue is polling capable, continue to do
830 * the drain.
832 * We turn off interrupts for all userland threads
833 * doing drain but we do active polling only for
834 * worker thread.
836 * Calling SQS_POLL_RING() even in the case of
837 * SQS_POLLING_ON() not succeeding is ok as
838 * SQS_POLL_RING() will not wake up poll thread
839 * if SQS_POLLING bit is not set.
841 if (proc_type == SQS_WORKER)
842 SQS_POLL_RING(sqp);
843 goto again;
844 } else {
845 did_wakeup = B_TRUE;
846 sqp->sq_awaken = ddi_get_lbolt();
847 cv_signal(&sqp->sq_worker_cv);
852 * If the poll thread is already running, just return. The
853 * poll thread continues to hold the proc and will finish
854 * processing.
856 if (sqp->sq_state & SQS_GET_PKTS) {
857 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
858 SQS_POLL_QUIESCE_DONE)));
859 sqp->sq_state &= ~proc_type;
860 return;
865 * If we are the worker thread and no work is left, send the poll
866 * thread down once more to see if something arrived. Otherwise,
867 * turn the interrupts back on and we are done.
869 if ((proc_type == SQS_WORKER) && (sqp->sq_state & SQS_POLLING)) {
871 * Do one last check to see if anything arrived
872 * in the NIC. We leave the SQS_PROC set to ensure
873 * that poll thread keeps the PROC and can decide
874 * if it needs to turn polling off or continue
875 * processing.
877 * If we drop the SQS_PROC here and poll thread comes
878 * up empty handed, it can not safely turn polling off
879 * since someone else could have acquired the PROC
880 * and started draining. The previously running poll
881 * thread and the current thread doing drain would end
882 * up in a race for turning polling on/off and more
883 * complex code would be required to deal with it.
885 * Its lot simpler for drain to hand the SQS_PROC to
886 * poll thread (if running) and let poll thread finish
887 * without worrying about racing with any other thread.
889 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
890 SQS_POLL_QUIESCE_DONE)));
891 SQS_POLL_RING(sqp);
892 sqp->sq_state &= ~proc_type;
893 } else {
895 * The squeue is either not capable of polling or the
896 * attempt to blank (i.e., turn SQS_POLLING_ON()) was
897 * unsuccessful or poll thread already finished
898 * processing and didn't find anything. Since there
899 * is nothing queued and we already turn polling on
900 * (for all threads doing drain), we should turn
901 * polling off and relinquish the PROC.
903 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
904 SQS_POLL_QUIESCE_DONE)));
905 SQS_POLLING_OFF(sqp, sq_poll_capable, sq_rx_ring);
906 sqp->sq_state &= ~(SQS_PROC | proc_type);
907 if (!did_wakeup && sqp->sq_first != NULL) {
908 squeue_worker_wakeup(sqp);
909 mutex_enter(&sqp->sq_lock);
912 * If we are not the worker and there is a pending quiesce
913 * event, wake up the worker
915 if ((proc_type != SQS_WORKER) &&
916 (sqp->sq_state & SQS_WORKER_THR_CONTROL))
917 cv_signal(&sqp->sq_worker_cv);
922 * Quiesce, Restart, or Cleanup of the squeue poll thread.
924 * Quiesce and Restart: After an squeue poll thread has been quiesced, it does
925 * not attempt to poll the underlying soft ring any more. The quiesce is
926 * triggered by the mac layer when it wants to quiesce a soft ring. Typically
927 * control operations such as changing the fanout of a NIC or VNIC (dladm
928 * setlinkprop) need to quiesce data flow before changing the wiring.
929 * The operation is done by the mac layer, but it calls back into IP to
930 * quiesce the soft ring. After completing the operation (say increase or
931 * decrease of the fanout) the mac layer then calls back into IP to restart
932 * the quiesced soft ring.
934 * Cleanup: This is triggered when the squeue binding to a soft ring is
935 * removed permanently. Typically interface plumb and unplumb would trigger
936 * this. It can also be triggered from the mac layer when a soft ring is
937 * being deleted say as the result of a fanout reduction. Since squeues are
938 * never deleted, the cleanup marks the squeue as fit for recycling and
939 * moves it to the zeroth squeue set.
941 static void
942 squeue_poll_thr_control(squeue_t *sqp)
944 if (sqp->sq_state & SQS_POLL_THR_RESTART) {
945 /* Restart implies a previous quiesce */
946 ASSERT(sqp->sq_state & SQS_POLL_THR_QUIESCED);
947 sqp->sq_state &= ~(SQS_POLL_THR_QUIESCED |
948 SQS_POLL_THR_RESTART);
949 sqp->sq_state |= SQS_POLL_CAPAB;
950 cv_signal(&sqp->sq_worker_cv);
951 return;
954 if (sqp->sq_state & SQS_POLL_THR_QUIESCE) {
955 sqp->sq_state |= SQS_POLL_THR_QUIESCED;
956 sqp->sq_state &= ~SQS_POLL_THR_QUIESCE;
957 cv_signal(&sqp->sq_worker_cv);
958 return;
963 * POLLING Notes
965 * With polling mode, we want to do as much processing as we possibly can
966 * in worker thread context. The sweet spot is worker thread keeps doing
967 * work all the time in polling mode and writers etc. keep dumping packets
968 * to worker thread. Occassionally, we send the poll thread (running at
969 * lower priority to NIC to get the chain of packets to feed to worker).
970 * Sending the poll thread down to NIC is dependant on 3 criterions
972 * 1) Its always driven from squeue_drain and only if worker thread is
973 * doing the drain.
974 * 2) We clear the backlog once and more packets arrived in between.
975 * Before starting drain again, send the poll thread down if
976 * the drain is being done by worker thread.
977 * 3) Before exiting the squeue_drain, if the poll thread is not already
978 * working and we are the worker thread, try to poll one more time.
980 * For latency sake, we do allow any thread calling squeue_enter
981 * to process its packet provided:
983 * 1) Nothing is queued
984 * 2) If more packets arrived in between, the non worker thread are allowed
985 * to do the drain till their time quanta expired provided SQS_GET_PKTS
986 * wasn't set in between.
988 * Avoiding deadlocks with interrupts
989 * ==================================
991 * One of the big problem is that we can't send poll_thr down while holding
992 * the sq_lock since the thread can block. So we drop the sq_lock before
993 * calling sq_get_pkts(). We keep holding the SQS_PROC as long as the
994 * poll thread is running so that no other thread can acquire the
995 * perimeter in between. If the squeue_drain gets done (no more work
996 * left), it leaves the SQS_PROC set if poll thread is running.
1000 * This is the squeue poll thread. In poll mode, it polls the underlying
1001 * TCP softring and feeds packets into the squeue. The worker thread then
1002 * drains the squeue. The poll thread also responds to control signals for
1003 * quiesceing, restarting, or cleanup of an squeue. These are driven by
1004 * control operations like plumb/unplumb or as a result of dynamic Rx ring
1005 * related operations that are driven from the mac layer.
1007 static void
1008 squeue_polling_thread(squeue_t *sqp)
1010 kmutex_t *lock = &sqp->sq_lock;
1011 kcondvar_t *async = &sqp->sq_poll_cv;
1012 ip_mac_rx_t sq_get_pkts;
1013 ip_accept_t ip_accept;
1014 ill_rx_ring_t *sq_rx_ring;
1015 ill_t *sq_ill;
1016 mblk_t *head, *tail, *mp;
1017 uint_t cnt;
1018 void *sq_mac_handle;
1019 callb_cpr_t cprinfo;
1020 size_t bytes_to_pickup;
1021 uint32_t ctl_state;
1023 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_poll");
1024 mutex_enter(lock);
1026 for (;;) {
1027 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1028 cv_wait(async, lock);
1029 CALLB_CPR_SAFE_END(&cprinfo, lock);
1031 ctl_state = sqp->sq_state & (SQS_POLL_THR_CONTROL |
1032 SQS_POLL_THR_QUIESCED);
1033 if (ctl_state != 0) {
1035 * If the squeue is quiesced, then wait for a control
1036 * request. A quiesced squeue must not poll the
1037 * underlying soft ring.
1039 if (ctl_state == SQS_POLL_THR_QUIESCED)
1040 continue;
1042 * Act on control requests to quiesce, cleanup or
1043 * restart an squeue
1045 squeue_poll_thr_control(sqp);
1046 continue;
1049 if (!(sqp->sq_state & SQS_POLL_CAPAB))
1050 continue;
1052 ASSERT((sqp->sq_state &
1053 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1054 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1056 poll_again:
1057 sq_rx_ring = sqp->sq_rx_ring;
1058 sq_get_pkts = sq_rx_ring->rr_rx;
1059 sq_mac_handle = sq_rx_ring->rr_rx_handle;
1060 ip_accept = sq_rx_ring->rr_ip_accept;
1061 sq_ill = sq_rx_ring->rr_ill;
1062 bytes_to_pickup = MAX_BYTES_TO_PICKUP;
1063 mutex_exit(lock);
1064 head = sq_get_pkts(sq_mac_handle, bytes_to_pickup);
1065 mp = NULL;
1066 if (head != NULL) {
1068 * We got the packet chain from the mac layer. It
1069 * would be nice to be able to process it inline
1070 * for better performance but we need to give
1071 * IP a chance to look at this chain to ensure
1072 * that packets are really meant for this squeue
1073 * and do the IP processing.
1075 mp = ip_accept(sq_ill, sq_rx_ring, sqp, head,
1076 &tail, &cnt);
1078 mutex_enter(lock);
1079 if (mp != NULL) {
1081 * The ip_accept function has already added an
1082 * ip_recv_attr_t mblk if that is needed.
1084 ENQUEUE_CHAIN(sqp, mp, tail, cnt);
1086 ASSERT((sqp->sq_state &
1087 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS)) ==
1088 (SQS_PROC|SQS_POLLING|SQS_GET_PKTS));
1090 if (sqp->sq_first != NULL && !(sqp->sq_state & SQS_WORKER)) {
1092 * We have packets to process and worker thread
1093 * is not running. Check to see if poll thread is
1094 * allowed to process. Let it do processing only if it
1095 * picked up some packets from the NIC otherwise
1096 * wakeup the worker thread.
1098 if (mp != NULL) {
1099 hrtime_t now;
1101 now = gethrtime();
1102 sqp->sq_run = curthread;
1103 sqp->sq_drain(sqp, SQS_POLL_PROC, now +
1104 squeue_drain_ns);
1105 sqp->sq_run = NULL;
1107 if (sqp->sq_first == NULL)
1108 goto poll_again;
1111 * Couldn't do the entire drain because the
1112 * time limit expired, let the
1113 * worker thread take over.
1117 sqp->sq_awaken = ddi_get_lbolt();
1119 * Put the SQS_PROC_HELD on so the worker
1120 * thread can distinguish where its called from. We
1121 * can remove the SQS_PROC flag here and turn off the
1122 * polling so that it wouldn't matter who gets the
1123 * processing but we get better performance this way
1124 * and save the cost of turn polling off and possibly
1125 * on again as soon as we start draining again.
1127 * We can't remove the SQS_PROC flag without turning
1128 * polling off until we can guarantee that control
1129 * will return to squeue_drain immediately.
1131 sqp->sq_state |= SQS_PROC_HELD;
1132 sqp->sq_state &= ~SQS_GET_PKTS;
1133 cv_signal(&sqp->sq_worker_cv);
1134 } else if (sqp->sq_first == NULL &&
1135 !(sqp->sq_state & SQS_WORKER)) {
1137 * Nothing queued and worker thread not running.
1138 * Since we hold the proc, no other thread is
1139 * processing the squeue. This means that there
1140 * is no work to be done and nothing is queued
1141 * in squeue or in NIC. Turn polling off and go
1142 * back to interrupt mode.
1144 sqp->sq_state &= ~(SQS_PROC|SQS_GET_PKTS);
1145 /* LINTED: constant in conditional context */
1146 SQS_POLLING_OFF(sqp, B_TRUE, sq_rx_ring);
1149 * If there is a pending control operation
1150 * wake up the worker, since it is currently
1151 * not running.
1153 if (sqp->sq_state & SQS_WORKER_THR_CONTROL)
1154 cv_signal(&sqp->sq_worker_cv);
1155 } else {
1157 * Worker thread is already running. We don't need
1158 * to do anything. Indicate that poll thread is done.
1160 sqp->sq_state &= ~SQS_GET_PKTS;
1162 if (sqp->sq_state & SQS_POLL_THR_CONTROL) {
1164 * Act on control requests to quiesce, cleanup or
1165 * restart an squeue
1167 squeue_poll_thr_control(sqp);
1173 * The squeue worker thread acts on any control requests to quiesce, cleanup
1174 * or restart an ill_rx_ring_t by calling this function. The worker thread
1175 * synchronizes with the squeue poll thread to complete the request and finally
1176 * wakes up the requestor when the request is completed.
1178 static void
1179 squeue_worker_thr_control(squeue_t *sqp)
1181 ill_t *ill;
1182 ill_rx_ring_t *rx_ring;
1184 ASSERT(MUTEX_HELD(&sqp->sq_lock));
1186 if (sqp->sq_state & SQS_POLL_RESTART) {
1187 /* Restart implies a previous quiesce. */
1188 ASSERT((sqp->sq_state & (SQS_PROC_HELD |
1189 SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER)) ==
1190 (SQS_POLL_QUIESCE_DONE | SQS_PROC | SQS_WORKER));
1192 * Request the squeue poll thread to restart and wait till
1193 * it actually restarts.
1195 sqp->sq_state &= ~SQS_POLL_QUIESCE_DONE;
1196 sqp->sq_state |= SQS_POLL_THR_RESTART;
1197 cv_signal(&sqp->sq_poll_cv);
1198 while (sqp->sq_state & SQS_POLL_THR_QUIESCED)
1199 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1200 sqp->sq_state &= ~(SQS_POLL_RESTART | SQS_PROC |
1201 SQS_WORKER);
1203 * Signal any waiter that is waiting for the restart
1204 * to complete
1206 sqp->sq_state |= SQS_POLL_RESTART_DONE;
1207 cv_signal(&sqp->sq_ctrlop_done_cv);
1208 return;
1211 if (sqp->sq_state & SQS_PROC_HELD) {
1212 /* The squeue poll thread handed control to us */
1213 ASSERT(sqp->sq_state & SQS_PROC);
1217 * Prevent any other thread from processing the squeue
1218 * until we finish the control actions by setting SQS_PROC.
1219 * But allow ourself to reenter by setting SQS_WORKER
1221 sqp->sq_state |= (SQS_PROC | SQS_WORKER);
1223 /* Signal the squeue poll thread and wait for it to quiesce itself */
1224 if (!(sqp->sq_state & SQS_POLL_THR_QUIESCED)) {
1225 sqp->sq_state |= SQS_POLL_THR_QUIESCE;
1226 cv_signal(&sqp->sq_poll_cv);
1227 while (!(sqp->sq_state & SQS_POLL_THR_QUIESCED))
1228 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1231 rx_ring = sqp->sq_rx_ring;
1232 ill = rx_ring->rr_ill;
1234 * The lock hierarchy is as follows.
1235 * cpu_lock -> ill_lock -> sqset_lock -> sq_lock
1237 mutex_exit(&sqp->sq_lock);
1238 mutex_enter(&ill->ill_lock);
1239 mutex_enter(&sqp->sq_lock);
1241 SQS_POLLING_OFF(sqp, (sqp->sq_state & SQS_POLL_CAPAB) != 0,
1242 sqp->sq_rx_ring);
1243 sqp->sq_state &= ~(SQS_POLL_CAPAB | SQS_GET_PKTS | SQS_PROC_HELD);
1244 if (sqp->sq_state & SQS_POLL_CLEANUP) {
1246 * Disassociate this squeue from its ill_rx_ring_t.
1247 * The rr_sqp, sq_rx_ring fields are protected by the
1248 * corresponding squeue, ill_lock* and sq_lock. Holding any
1249 * of them will ensure that the ring to squeue mapping does
1250 * not change.
1252 ASSERT(!(sqp->sq_state & SQS_DEFAULT));
1254 sqp->sq_rx_ring = NULL;
1255 rx_ring->rr_sqp = NULL;
1257 sqp->sq_state &= ~(SQS_POLL_CLEANUP | SQS_POLL_THR_QUIESCED |
1258 SQS_POLL_QUIESCE_DONE);
1259 sqp->sq_ill = NULL;
1261 rx_ring->rr_rx_handle = NULL;
1262 rx_ring->rr_intr_handle = NULL;
1263 rx_ring->rr_intr_enable = NULL;
1264 rx_ring->rr_intr_disable = NULL;
1265 sqp->sq_state |= SQS_POLL_CLEANUP_DONE;
1266 } else {
1267 sqp->sq_state &= ~SQS_POLL_QUIESCE;
1268 sqp->sq_state |= SQS_POLL_QUIESCE_DONE;
1271 * Signal any waiter that is waiting for the quiesce or cleanup
1272 * to complete and also wait for it to actually see and reset the
1273 * SQS_POLL_CLEANUP_DONE.
1275 cv_signal(&sqp->sq_ctrlop_done_cv);
1276 mutex_exit(&ill->ill_lock);
1277 if (sqp->sq_state & SQS_POLL_CLEANUP_DONE) {
1278 cv_wait(&sqp->sq_worker_cv, &sqp->sq_lock);
1279 sqp->sq_state &= ~(SQS_PROC | SQS_WORKER);
1283 static void
1284 squeue_worker(squeue_t *sqp)
1286 kmutex_t *lock = &sqp->sq_lock;
1287 kcondvar_t *async = &sqp->sq_worker_cv;
1288 callb_cpr_t cprinfo;
1289 hrtime_t now;
1291 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, "sq_worker");
1292 mutex_enter(lock);
1294 for (;;) {
1295 for (;;) {
1297 * If the poll thread has handed control to us
1298 * we need to break out of the wait.
1300 if (sqp->sq_state & SQS_PROC_HELD)
1301 break;
1304 * If the squeue is not being processed and we either
1305 * have messages to drain or some thread has signaled
1306 * some control activity we need to break
1308 if (!(sqp->sq_state & SQS_PROC) &&
1309 ((sqp->sq_state & SQS_WORKER_THR_CONTROL) ||
1310 (sqp->sq_first != NULL)))
1311 break;
1314 * If we have started some control action, then check
1315 * for the SQS_WORKER flag (since we don't
1316 * release the squeue) to make sure we own the squeue
1317 * and break out
1319 if ((sqp->sq_state & SQS_WORKER_THR_CONTROL) &&
1320 (sqp->sq_state & SQS_WORKER))
1321 break;
1323 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1324 cv_wait(async, lock);
1325 CALLB_CPR_SAFE_END(&cprinfo, lock);
1327 if (sqp->sq_state & SQS_WORKER_THR_CONTROL) {
1328 squeue_worker_thr_control(sqp);
1329 continue;
1331 ASSERT(!(sqp->sq_state & (SQS_POLL_THR_QUIESCED |
1332 SQS_POLL_CLEANUP_DONE | SQS_POLL_QUIESCE_DONE |
1333 SQS_WORKER_THR_CONTROL | SQS_POLL_THR_CONTROL)));
1335 if (sqp->sq_state & SQS_PROC_HELD)
1336 sqp->sq_state &= ~SQS_PROC_HELD;
1338 now = gethrtime();
1339 sqp->sq_run = curthread;
1340 sqp->sq_drain(sqp, SQS_WORKER, now + squeue_drain_ns);
1341 sqp->sq_run = NULL;
1345 uintptr_t *
1346 squeue_getprivate(squeue_t *sqp, sqprivate_t p)
1348 ASSERT(p < SQPRIVATE_MAX);
1350 return (&sqp->sq_private[p]);
1353 /* ARGSUSED */
1354 void
1355 squeue_wakeup_conn(void *arg, mblk_t *mp, void *arg2, ip_recv_attr_t *dummy)
1357 conn_t *connp = (conn_t *)arg;
1358 squeue_t *sqp = connp->conn_sqp;
1361 * Mark the squeue as paused before waking up the thread stuck
1362 * in squeue_synch_enter().
1364 mutex_enter(&sqp->sq_lock);
1365 sqp->sq_state |= SQS_PAUSE;
1368 * Notify the thread that it's OK to proceed; that is done by
1369 * clearing the MSGWAITSYNC flag. The synch thread will free the mblk.
1371 ASSERT(mp->b_flag & MSGWAITSYNC);
1372 mp->b_flag &= ~MSGWAITSYNC;
1373 cv_broadcast(&connp->conn_sq_cv);
1376 * We are doing something on behalf of another thread, so we have to
1377 * pause and wait until it finishes.
1379 while (sqp->sq_state & SQS_PAUSE) {
1380 cv_wait(&sqp->sq_synch_cv, &sqp->sq_lock);
1382 mutex_exit(&sqp->sq_lock);
1386 squeue_synch_enter(conn_t *connp, mblk_t *use_mp)
1388 squeue_t *sqp;
1390 again:
1391 sqp = connp->conn_sqp;
1393 mutex_enter(&sqp->sq_lock);
1394 if (sqp->sq_first == NULL && !(sqp->sq_state & SQS_PROC)) {
1396 * We are OK to proceed if the squeue is empty, and
1397 * no one owns the squeue.
1399 * The caller won't own the squeue as this is called from the
1400 * application.
1402 ASSERT(sqp->sq_run == NULL);
1404 sqp->sq_state |= SQS_PROC;
1405 sqp->sq_run = curthread;
1406 mutex_exit(&sqp->sq_lock);
1409 * Handle squeue switching. The conn's squeue can only change
1410 * while there is a thread in the squeue, which is why we do
1411 * the check after entering the squeue. If it has changed, exit
1412 * this squeue and redo everything with the new sqeueue.
1414 if (sqp != connp->conn_sqp) {
1415 mutex_enter(&sqp->sq_lock);
1416 sqp->sq_state &= ~SQS_PROC;
1417 sqp->sq_run = NULL;
1418 mutex_exit(&sqp->sq_lock);
1419 goto again;
1421 #if SQUEUE_DEBUG
1422 sqp->sq_curmp = NULL;
1423 sqp->sq_curproc = NULL;
1424 sqp->sq_connp = connp;
1425 #endif
1426 connp->conn_on_sqp = B_TRUE;
1427 return (0);
1428 } else {
1429 mblk_t *mp;
1431 mp = (use_mp == NULL) ? allocb(0, BPRI_MED) : use_mp;
1432 if (mp == NULL) {
1433 mutex_exit(&sqp->sq_lock);
1434 return (ENOMEM);
1438 * We mark the mblk as awaiting synchronous squeue access
1439 * by setting the MSGWAITSYNC flag. Once squeue_wakeup_conn
1440 * fires, MSGWAITSYNC is cleared, at which point we know we
1441 * have exclusive access.
1443 mp->b_flag |= MSGWAITSYNC;
1445 CONN_INC_REF(connp);
1446 SET_SQUEUE(mp, squeue_wakeup_conn, connp);
1447 ENQUEUE_CHAIN(sqp, mp, mp, 1);
1449 ASSERT(sqp->sq_run != curthread);
1451 /* Wait until the enqueued mblk get processed. */
1452 while (mp->b_flag & MSGWAITSYNC)
1453 cv_wait(&connp->conn_sq_cv, &sqp->sq_lock);
1454 mutex_exit(&sqp->sq_lock);
1456 if (use_mp == NULL)
1457 freeb(mp);
1459 return (0);
1463 void
1464 squeue_synch_exit(conn_t *connp)
1466 squeue_t *sqp = connp->conn_sqp;
1468 mutex_enter(&sqp->sq_lock);
1469 if (sqp->sq_run == curthread) {
1470 ASSERT(sqp->sq_state & SQS_PROC);
1472 sqp->sq_state &= ~SQS_PROC;
1473 sqp->sq_run = NULL;
1474 connp->conn_on_sqp = B_FALSE;
1476 if (sqp->sq_first == NULL) {
1477 mutex_exit(&sqp->sq_lock);
1478 } else {
1480 * If this was a normal thread, then it would
1481 * (most likely) continue processing the pending
1482 * requests. Since the just completed operation
1483 * was executed synchronously, the thread should
1484 * not be delayed. To compensate, wake up the
1485 * worker thread right away when there are outstanding
1486 * requests.
1488 sqp->sq_awaken = ddi_get_lbolt();
1489 cv_signal(&sqp->sq_worker_cv);
1490 mutex_exit(&sqp->sq_lock);
1492 } else {
1494 * The caller doesn't own the squeue, clear the SQS_PAUSE flag,
1495 * and wake up the squeue owner, such that owner can continue
1496 * processing.
1498 ASSERT(sqp->sq_state & SQS_PAUSE);
1499 sqp->sq_state &= ~SQS_PAUSE;
1501 /* There should be only one thread blocking on sq_synch_cv. */
1502 cv_signal(&sqp->sq_synch_cv);
1503 mutex_exit(&sqp->sq_lock);