5042 stop using deprecated atomic functions
[unleashed.git] / usr / src / uts / common / io / stream.c
blobe9af19ca182c879ecdea46fa8aadba619ddabe8b
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
56 * This file contains all the STREAMS utility routines that may
57 * be used by modules and drivers.
61 * STREAMS message allocator: principles of operation
63 * The streams message allocator consists of all the routines that
64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65 * dupb(), freeb() and freemsg(). What follows is a high-level view
66 * of how the allocator works.
68 * Every streams message consists of one or more mblks, a dblk, and data.
69 * All mblks for all types of messages come from a common mblk_cache.
70 * The dblk and data come in several flavors, depending on how the
71 * message is allocated:
73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74 * fixed-size dblk/data caches. For message sizes that are multiples of
75 * PAGESIZE, dblks are allocated separately from the buffer.
76 * The associated buffer is allocated by the constructor using kmem_alloc().
77 * For all other message sizes, dblk and its associated data is allocated
78 * as a single contiguous chunk of memory.
79 * Objects in these caches consist of a dblk plus its associated data.
80 * allocb() determines the nearest-size cache by table lookup:
81 * the dblk_cache[] array provides the mapping from size to dblk cache.
83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84 * kmem_alloc()'ing a buffer for the data and supplying that
85 * buffer to gesballoc(), described below.
87 * (3) The four flavors of [d]esballoc[a] are all implemented by a
88 * common routine, gesballoc() ("generic esballoc"). gesballoc()
89 * allocates a dblk from the global dblk_esb_cache and sets db_base,
90 * db_lim and db_frtnp to describe the caller-supplied buffer.
92 * While there are several routines to allocate messages, there is only
93 * one routine to free messages: freeb(). freeb() simply invokes the
94 * dblk's free method, dbp->db_free(), which is set at allocation time.
96 * dupb() creates a new reference to a message by allocating a new mblk,
97 * incrementing the dblk reference count and setting the dblk's free
98 * method to dblk_decref(). The dblk's original free method is retained
99 * in db_lastfree. dblk_decref() decrements the reference count on each
100 * freeb(). If this is not the last reference it just frees the mblk;
101 * if this *is* the last reference, it restores db_free to db_lastfree,
102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104 * The implementation makes aggressive use of kmem object caching for
105 * maximum performance. This makes the code simple and compact, but
106 * also a bit abstruse in some places. The invariants that constitute a
107 * message's constructed state, described below, are more subtle than usual.
109 * Every dblk has an "attached mblk" as part of its constructed state.
110 * The mblk is allocated by the dblk's constructor and remains attached
111 * until the message is either dup'ed or pulled up. In the dupb() case
112 * the mblk association doesn't matter until the last free, at which time
113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
114 * the mblk association because it swaps the leading mblks of two messages,
115 * so it is responsible for swapping their db_mblk pointers accordingly.
116 * From a constructed-state viewpoint it doesn't matter that a dblk's
117 * attached mblk can change while the message is allocated; all that
118 * matters is that the dblk has *some* attached mblk when it's freed.
120 * The sizes of the allocb() small-message caches are not magical.
121 * They represent a good trade-off between internal and external
122 * fragmentation for current workloads. They should be reevaluated
123 * periodically, especially if allocations larger than DBLK_MAX_CACHE
124 * become common. We use 64-byte alignment so that dblks don't
125 * straddle cache lines unnecessarily.
127 #define DBLK_MAX_CACHE 73728
128 #define DBLK_CACHE_ALIGN 64
129 #define DBLK_MIN_SIZE 8
130 #define DBLK_SIZE_SHIFT 3
132 #ifdef _BIG_ENDIAN
133 #define DBLK_RTFU_SHIFT(field) \
134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define DBLK_RTFU_SHIFT(field) \
137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
140 #define DBLK_RTFU(ref, type, flags, uioflag) \
141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 DBLK_MAX_CACHE, 0
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
179 * Patchable mblk/dblk kmem_cache flags.
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 dblk_t *dbp = buf;
188 ssize_t msg_size = (ssize_t)cdrarg;
189 size_t index;
191 ASSERT(msg_size != 0);
193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 return (-1);
199 if ((msg_size & PAGEOFFSET) == 0) {
200 dbp->db_base = kmem_alloc(msg_size, kmflags);
201 if (dbp->db_base == NULL) {
202 kmem_cache_free(mblk_cache, dbp->db_mblk);
203 return (-1);
205 } else {
206 dbp->db_base = (unsigned char *)&dbp[1];
209 dbp->db_mblk->b_datap = dbp;
210 dbp->db_cache = dblk_cache[index];
211 dbp->db_lim = dbp->db_base + msg_size;
212 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 dbp->db_frtnp = NULL;
214 dbp->db_fthdr = NULL;
215 dbp->db_credp = NULL;
216 dbp->db_cpid = -1;
217 dbp->db_struioflag = 0;
218 dbp->db_struioun.cksum.flags = 0;
219 return (0);
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 dblk_t *dbp = buf;
228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 return (-1);
230 dbp->db_mblk->b_datap = dbp;
231 dbp->db_cache = dblk_esb_cache;
232 dbp->db_fthdr = NULL;
233 dbp->db_credp = NULL;
234 dbp->db_cpid = -1;
235 dbp->db_struioflag = 0;
236 dbp->db_struioun.cksum.flags = 0;
237 return (0);
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 dblk_t *dbp = buf;
244 bcache_t *bcp = cdrarg;
246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 return (-1);
249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 if (dbp->db_base == NULL) {
251 kmem_cache_free(mblk_cache, dbp->db_mblk);
252 return (-1);
255 dbp->db_mblk->b_datap = dbp;
256 dbp->db_cache = (void *)bcp;
257 dbp->db_lim = dbp->db_base + bcp->size;
258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 dbp->db_frtnp = NULL;
260 dbp->db_fthdr = NULL;
261 dbp->db_credp = NULL;
262 dbp->db_cpid = -1;
263 dbp->db_struioflag = 0;
264 dbp->db_struioun.cksum.flags = 0;
265 return (0);
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
272 dblk_t *dbp = buf;
273 ssize_t msg_size = (ssize_t)cdrarg;
275 ASSERT(dbp->db_mblk->b_datap == dbp);
276 ASSERT(msg_size != 0);
277 ASSERT(dbp->db_struioflag == 0);
278 ASSERT(dbp->db_struioun.cksum.flags == 0);
280 if ((msg_size & PAGEOFFSET) == 0) {
281 kmem_free(dbp->db_base, msg_size);
284 kmem_cache_free(mblk_cache, dbp->db_mblk);
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
290 dblk_t *dbp = buf;
291 bcache_t *bcp = cdrarg;
293 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295 ASSERT(dbp->db_mblk->b_datap == dbp);
296 ASSERT(dbp->db_struioflag == 0);
297 ASSERT(dbp->db_struioun.cksum.flags == 0);
299 kmem_cache_free(mblk_cache, dbp->db_mblk);
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 ftblk_t *fbp = buf;
307 int i;
309 bzero(fbp, sizeof (ftblk_t));
310 if (str_ftstack != 0) {
311 for (i = 0; i < FTBLK_EVNTS; i++)
312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 return (0);
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
322 ftblk_t *fbp = buf;
323 int i;
325 if (str_ftstack != 0) {
326 for (i = 0; i < FTBLK_EVNTS; i++) {
327 if (fbp->ev[i].stk != NULL) {
328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 fbp->ev[i].stk = NULL;
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 fthdr_t *fhp = buf;
340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
346 fthdr_t *fhp = buf;
348 ftblk_destructor(&fhp->first, cdrarg);
351 void
352 streams_msg_init(void)
354 char name[40];
355 size_t size;
356 size_t lastsize = DBLK_MIN_SIZE;
357 size_t *sizep;
358 struct kmem_cache *cp;
359 size_t tot_size;
360 int offset;
362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367 if ((offset = (size & PAGEOFFSET)) != 0) {
369 * We are in the middle of a page, dblk should
370 * be allocated on the same page
372 tot_size = size + sizeof (dblk_t);
373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 < PAGESIZE);
375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377 } else {
380 * buf size is multiple of page size, dblk and
381 * buffer are allocated separately.
384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 tot_size = sizeof (dblk_t);
388 (void) sprintf(name, "streams_dblk_%ld", size);
389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 NULL, dblk_kmem_flags);
393 while (lastsize <= size) {
394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 lastsize += DBLK_MIN_SIZE;
399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407 /* Initialize Multidata caches */
408 mmd_init();
410 /* initialize throttling queue for esballoc */
411 esballoc_queue_init();
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
418 dblk_t *dbp;
419 mblk_t *mp;
420 size_t index;
422 index = (size - 1) >> DBLK_SIZE_SHIFT;
424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 if (size != 0) {
426 mp = allocb_oversize(size, KM_NOSLEEP);
427 goto out;
429 index = 0;
432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 mp = NULL;
434 goto out;
437 mp = dbp->db_mblk;
438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 mp->b_rptr = mp->b_wptr = dbp->db_base;
441 mp->b_queue = NULL;
442 MBLK_BAND_FLAG_WORD(mp) = 0;
443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
447 return (mp);
451 * Allocate an mblk taking db_credp and db_cpid from the template.
452 * Allow the cred to be NULL.
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
457 mblk_t *mp = allocb(size, 0);
459 if (mp != NULL) {
460 dblk_t *src = tmpl->b_datap;
461 dblk_t *dst = mp->b_datap;
462 cred_t *cr;
463 pid_t cpid;
465 cr = msg_getcred(tmpl, &cpid);
466 if (cr != NULL)
467 crhold(dst->db_credp = cr);
468 dst->db_cpid = cpid;
469 dst->db_type = src->db_type;
471 return (mp);
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
477 mblk_t *mp = allocb(size, 0);
479 ASSERT(cr != NULL);
480 if (mp != NULL) {
481 dblk_t *dbp = mp->b_datap;
483 crhold(dbp->db_credp = cr);
484 dbp->db_cpid = cpid;
486 return (mp);
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
492 mblk_t *mp = allocb_wait(size, 0, flags, error);
494 ASSERT(cr != NULL);
495 if (mp != NULL) {
496 dblk_t *dbp = mp->b_datap;
498 crhold(dbp->db_credp = cr);
499 dbp->db_cpid = cpid;
502 return (mp);
506 * Extract the db_cred (and optionally db_cpid) from a message.
507 * We find the first mblk which has a non-NULL db_cred and use that.
508 * If none found we return NULL.
509 * Does NOT get a hold on the cred.
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
514 cred_t *cr = NULL;
515 cred_t *cr2;
516 mblk_t *mp2;
518 while (mp != NULL) {
519 dblk_t *dbp = mp->b_datap;
521 cr = dbp->db_credp;
522 if (cr == NULL) {
523 mp = mp->b_cont;
524 continue;
526 if (cpidp != NULL)
527 *cpidp = dbp->db_cpid;
529 #ifdef DEBUG
531 * Normally there should at most one db_credp in a message.
532 * But if there are multiple (as in the case of some M_IOC*
533 * and some internal messages in TCP/IP bind logic) then
534 * they must be identical in the normal case.
535 * However, a socket can be shared between different uids
536 * in which case data queued in TCP would be from different
537 * creds. Thus we can only assert for the zoneid being the
538 * same. Due to Multi-level Level Ports for TX, some
539 * cred_t can have a NULL cr_zone, and we skip the comparison
540 * in that case.
542 mp2 = mp->b_cont;
543 while (mp2 != NULL) {
544 cr2 = DB_CRED(mp2);
545 if (cr2 != NULL) {
546 DTRACE_PROBE2(msg__getcred,
547 cred_t *, cr, cred_t *, cr2);
548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 crgetzone(cr) == NULL ||
550 crgetzone(cr2) == NULL);
552 mp2 = mp2->b_cont;
554 #endif
555 return (cr);
557 if (cpidp != NULL)
558 *cpidp = NOPID;
559 return (NULL);
563 * Variant of msg_getcred which, when a cred is found
564 * 1. Returns with a hold on the cred
565 * 2. Clears the first cred in the mblk.
566 * This is more efficient to use than a msg_getcred() + crhold() when
567 * the message is freed after the cred has been extracted.
569 * The caller is responsible for ensuring that there is no other reference
570 * on the message since db_credp can not be cleared when there are other
571 * references.
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
576 cred_t *cr = NULL;
577 cred_t *cr2;
578 mblk_t *mp2;
580 while (mp != NULL) {
581 dblk_t *dbp = mp->b_datap;
583 cr = dbp->db_credp;
584 if (cr == NULL) {
585 mp = mp->b_cont;
586 continue;
588 ASSERT(dbp->db_ref == 1);
589 dbp->db_credp = NULL;
590 if (cpidp != NULL)
591 *cpidp = dbp->db_cpid;
592 #ifdef DEBUG
594 * Normally there should at most one db_credp in a message.
595 * But if there are multiple (as in the case of some M_IOC*
596 * and some internal messages in TCP/IP bind logic) then
597 * they must be identical in the normal case.
598 * However, a socket can be shared between different uids
599 * in which case data queued in TCP would be from different
600 * creds. Thus we can only assert for the zoneid being the
601 * same. Due to Multi-level Level Ports for TX, some
602 * cred_t can have a NULL cr_zone, and we skip the comparison
603 * in that case.
605 mp2 = mp->b_cont;
606 while (mp2 != NULL) {
607 cr2 = DB_CRED(mp2);
608 if (cr2 != NULL) {
609 DTRACE_PROBE2(msg__extractcred,
610 cred_t *, cr, cred_t *, cr2);
611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 crgetzone(cr) == NULL ||
613 crgetzone(cr2) == NULL);
615 mp2 = mp2->b_cont;
617 #endif
618 return (cr);
620 return (NULL);
623 * Get the label for a message. Uses the first mblk in the message
624 * which has a non-NULL db_credp.
625 * Returns NULL if there is no credp.
627 extern struct ts_label_s *
628 msg_getlabel(const mblk_t *mp)
630 cred_t *cr = msg_getcred(mp, NULL);
632 if (cr == NULL)
633 return (NULL);
635 return (crgetlabel(cr));
638 void
639 freeb(mblk_t *mp)
641 dblk_t *dbp = mp->b_datap;
643 ASSERT(dbp->db_ref > 0);
644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
649 dbp->db_free(mp, dbp);
652 void
653 freemsg(mblk_t *mp)
655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 while (mp) {
657 dblk_t *dbp = mp->b_datap;
658 mblk_t *mp_cont = mp->b_cont;
660 ASSERT(dbp->db_ref > 0);
661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
665 dbp->db_free(mp, dbp);
666 mp = mp_cont;
671 * Reallocate a block for another use. Try hard to use the old block.
672 * If the old data is wanted (copy), leave b_wptr at the end of the data,
673 * otherwise return b_wptr = b_rptr.
675 * This routine is private and unstable.
677 mblk_t *
678 reallocb(mblk_t *mp, size_t size, uint_t copy)
680 mblk_t *mp1;
681 unsigned char *old_rptr;
682 ptrdiff_t cur_size;
684 if (mp == NULL)
685 return (allocb(size, BPRI_HI));
687 cur_size = mp->b_wptr - mp->b_rptr;
688 old_rptr = mp->b_rptr;
690 ASSERT(mp->b_datap->db_ref != 0);
692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
694 * If the data is wanted and it will fit where it is, no
695 * work is required.
697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 return (mp);
700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 mp1 = mp;
702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 /* XXX other mp state could be copied too, db_flags ... ? */
704 mp1->b_cont = mp->b_cont;
705 } else {
706 return (NULL);
709 if (copy) {
710 bcopy(old_rptr, mp1->b_rptr, cur_size);
711 mp1->b_wptr = mp1->b_rptr + cur_size;
714 if (mp != mp1)
715 freeb(mp);
717 return (mp1);
720 static void
721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
723 ASSERT(dbp->db_mblk == mp);
724 if (dbp->db_fthdr != NULL)
725 str_ftfree(dbp);
727 /* set credp and projid to be 'unspecified' before returning to cache */
728 if (dbp->db_credp != NULL) {
729 crfree(dbp->db_credp);
730 dbp->db_credp = NULL;
732 dbp->db_cpid = -1;
734 /* Reset the struioflag and the checksum flag fields */
735 dbp->db_struioflag = 0;
736 dbp->db_struioun.cksum.flags = 0;
738 /* and the COOKED and/or UIOA flag(s) */
739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
741 kmem_cache_free(dbp->db_cache, dbp);
744 static void
745 dblk_decref(mblk_t *mp, dblk_t *dbp)
747 if (dbp->db_ref != 1) {
748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 -(1 << DBLK_RTFU_SHIFT(db_ref)));
751 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 * have a reference to the dblk, which means another thread
753 * could free it. Therefore we cannot examine the dblk to
754 * determine whether ours was the last reference. Instead,
755 * we extract the new and minimum reference counts from rtfu.
756 * Note that all we're really saying is "if (ref != refmin)".
758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 kmem_cache_free(mblk_cache, mp);
761 return;
764 dbp->db_mblk = mp;
765 dbp->db_free = dbp->db_lastfree;
766 dbp->db_lastfree(mp, dbp);
769 mblk_t *
770 dupb(mblk_t *mp)
772 dblk_t *dbp = mp->b_datap;
773 mblk_t *new_mp;
774 uint32_t oldrtfu, newrtfu;
776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 goto out;
779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 new_mp->b_rptr = mp->b_rptr;
781 new_mp->b_wptr = mp->b_wptr;
782 new_mp->b_datap = dbp;
783 new_mp->b_queue = NULL;
784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
788 dbp->db_free = dblk_decref;
789 do {
790 ASSERT(dbp->db_ref > 0);
791 oldrtfu = DBLK_RTFU_WORD(dbp);
792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
794 * If db_ref is maxed out we can't dup this message anymore.
796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 kmem_cache_free(mblk_cache, new_mp);
798 new_mp = NULL;
799 goto out;
801 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
802 oldrtfu);
804 out:
805 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
806 return (new_mp);
809 static void
810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
812 frtn_t *frp = dbp->db_frtnp;
814 ASSERT(dbp->db_mblk == mp);
815 frp->free_func(frp->free_arg);
816 if (dbp->db_fthdr != NULL)
817 str_ftfree(dbp);
819 /* set credp and projid to be 'unspecified' before returning to cache */
820 if (dbp->db_credp != NULL) {
821 crfree(dbp->db_credp);
822 dbp->db_credp = NULL;
824 dbp->db_cpid = -1;
825 dbp->db_struioflag = 0;
826 dbp->db_struioun.cksum.flags = 0;
828 kmem_cache_free(dbp->db_cache, dbp);
831 /*ARGSUSED*/
832 static void
833 frnop_func(void *arg)
838 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
840 static mblk_t *
841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
842 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
844 dblk_t *dbp;
845 mblk_t *mp;
847 ASSERT(base != NULL && frp != NULL);
849 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
850 mp = NULL;
851 goto out;
854 mp = dbp->db_mblk;
855 dbp->db_base = base;
856 dbp->db_lim = base + size;
857 dbp->db_free = dbp->db_lastfree = lastfree;
858 dbp->db_frtnp = frp;
859 DBLK_RTFU_WORD(dbp) = db_rtfu;
860 mp->b_next = mp->b_prev = mp->b_cont = NULL;
861 mp->b_rptr = mp->b_wptr = base;
862 mp->b_queue = NULL;
863 MBLK_BAND_FLAG_WORD(mp) = 0;
865 out:
866 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
867 return (mp);
870 /*ARGSUSED*/
871 mblk_t *
872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
874 mblk_t *mp;
877 * Note that this is structured to allow the common case (i.e.
878 * STREAMS flowtracing disabled) to call gesballoc() with tail
879 * call optimization.
881 if (!str_ftnever) {
882 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
883 frp, freebs_enqueue, KM_NOSLEEP);
885 if (mp != NULL)
886 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
887 return (mp);
890 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
891 frp, freebs_enqueue, KM_NOSLEEP));
895 * Same as esballoc() but sleeps waiting for memory.
897 /*ARGSUSED*/
898 mblk_t *
899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
901 mblk_t *mp;
904 * Note that this is structured to allow the common case (i.e.
905 * STREAMS flowtracing disabled) to call gesballoc() with tail
906 * call optimization.
908 if (!str_ftnever) {
909 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
910 frp, freebs_enqueue, KM_SLEEP);
912 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
913 return (mp);
916 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
917 frp, freebs_enqueue, KM_SLEEP));
920 /*ARGSUSED*/
921 mblk_t *
922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
924 mblk_t *mp;
927 * Note that this is structured to allow the common case (i.e.
928 * STREAMS flowtracing disabled) to call gesballoc() with tail
929 * call optimization.
931 if (!str_ftnever) {
932 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
933 frp, dblk_lastfree_desb, KM_NOSLEEP);
935 if (mp != NULL)
936 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
937 return (mp);
940 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
941 frp, dblk_lastfree_desb, KM_NOSLEEP));
944 /*ARGSUSED*/
945 mblk_t *
946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
948 mblk_t *mp;
951 * Note that this is structured to allow the common case (i.e.
952 * STREAMS flowtracing disabled) to call gesballoc() with tail
953 * call optimization.
955 if (!str_ftnever) {
956 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
957 frp, freebs_enqueue, KM_NOSLEEP);
959 if (mp != NULL)
960 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
961 return (mp);
964 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
965 frp, freebs_enqueue, KM_NOSLEEP));
968 /*ARGSUSED*/
969 mblk_t *
970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
972 mblk_t *mp;
975 * Note that this is structured to allow the common case (i.e.
976 * STREAMS flowtracing disabled) to call gesballoc() with tail
977 * call optimization.
979 if (!str_ftnever) {
980 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
981 frp, dblk_lastfree_desb, KM_NOSLEEP);
983 if (mp != NULL)
984 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
985 return (mp);
988 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
989 frp, dblk_lastfree_desb, KM_NOSLEEP));
992 static void
993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
995 bcache_t *bcp = dbp->db_cache;
997 ASSERT(dbp->db_mblk == mp);
998 if (dbp->db_fthdr != NULL)
999 str_ftfree(dbp);
1001 /* set credp and projid to be 'unspecified' before returning to cache */
1002 if (dbp->db_credp != NULL) {
1003 crfree(dbp->db_credp);
1004 dbp->db_credp = NULL;
1006 dbp->db_cpid = -1;
1007 dbp->db_struioflag = 0;
1008 dbp->db_struioun.cksum.flags = 0;
1010 mutex_enter(&bcp->mutex);
1011 kmem_cache_free(bcp->dblk_cache, dbp);
1012 bcp->alloc--;
1014 if (bcp->alloc == 0 && bcp->destroy != 0) {
1015 kmem_cache_destroy(bcp->dblk_cache);
1016 kmem_cache_destroy(bcp->buffer_cache);
1017 mutex_exit(&bcp->mutex);
1018 mutex_destroy(&bcp->mutex);
1019 kmem_free(bcp, sizeof (bcache_t));
1020 } else {
1021 mutex_exit(&bcp->mutex);
1025 bcache_t *
1026 bcache_create(char *name, size_t size, uint_t align)
1028 bcache_t *bcp;
1029 char buffer[255];
1031 ASSERT((align & (align - 1)) == 0);
1033 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1034 return (NULL);
1036 bcp->size = size;
1037 bcp->align = align;
1038 bcp->alloc = 0;
1039 bcp->destroy = 0;
1041 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1043 (void) sprintf(buffer, "%s_buffer_cache", name);
1044 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1045 NULL, NULL, NULL, 0);
1046 (void) sprintf(buffer, "%s_dblk_cache", name);
1047 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1048 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1049 NULL, (void *)bcp, NULL, 0);
1051 return (bcp);
1054 void
1055 bcache_destroy(bcache_t *bcp)
1057 ASSERT(bcp != NULL);
1059 mutex_enter(&bcp->mutex);
1060 if (bcp->alloc == 0) {
1061 kmem_cache_destroy(bcp->dblk_cache);
1062 kmem_cache_destroy(bcp->buffer_cache);
1063 mutex_exit(&bcp->mutex);
1064 mutex_destroy(&bcp->mutex);
1065 kmem_free(bcp, sizeof (bcache_t));
1066 } else {
1067 bcp->destroy++;
1068 mutex_exit(&bcp->mutex);
1072 /*ARGSUSED*/
1073 mblk_t *
1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1076 dblk_t *dbp;
1077 mblk_t *mp = NULL;
1079 ASSERT(bcp != NULL);
1081 mutex_enter(&bcp->mutex);
1082 if (bcp->destroy != 0) {
1083 mutex_exit(&bcp->mutex);
1084 goto out;
1087 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1088 mutex_exit(&bcp->mutex);
1089 goto out;
1091 bcp->alloc++;
1092 mutex_exit(&bcp->mutex);
1094 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1096 mp = dbp->db_mblk;
1097 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1098 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1099 mp->b_rptr = mp->b_wptr = dbp->db_base;
1100 mp->b_queue = NULL;
1101 MBLK_BAND_FLAG_WORD(mp) = 0;
1102 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1103 out:
1104 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1106 return (mp);
1109 static void
1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1112 ASSERT(dbp->db_mblk == mp);
1113 if (dbp->db_fthdr != NULL)
1114 str_ftfree(dbp);
1116 /* set credp and projid to be 'unspecified' before returning to cache */
1117 if (dbp->db_credp != NULL) {
1118 crfree(dbp->db_credp);
1119 dbp->db_credp = NULL;
1121 dbp->db_cpid = -1;
1122 dbp->db_struioflag = 0;
1123 dbp->db_struioun.cksum.flags = 0;
1125 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1126 kmem_cache_free(dbp->db_cache, dbp);
1129 static mblk_t *
1130 allocb_oversize(size_t size, int kmflags)
1132 mblk_t *mp;
1133 void *buf;
1135 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1136 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1137 return (NULL);
1138 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1139 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1140 kmem_free(buf, size);
1142 if (mp != NULL)
1143 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1145 return (mp);
1148 mblk_t *
1149 allocb_tryhard(size_t target_size)
1151 size_t size;
1152 mblk_t *bp;
1154 for (size = target_size; size < target_size + 512;
1155 size += DBLK_CACHE_ALIGN)
1156 if ((bp = allocb(size, BPRI_HI)) != NULL)
1157 return (bp);
1158 allocb_tryhard_fails++;
1159 return (NULL);
1163 * This routine is consolidation private for STREAMS internal use
1164 * This routine may only be called from sync routines (i.e., not
1165 * from put or service procedures). It is located here (rather
1166 * than strsubr.c) so that we don't have to expose all of the
1167 * allocb() implementation details in header files.
1169 mblk_t *
1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1172 dblk_t *dbp;
1173 mblk_t *mp;
1174 size_t index;
1176 index = (size -1) >> DBLK_SIZE_SHIFT;
1178 if (flags & STR_NOSIG) {
1179 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1180 if (size != 0) {
1181 mp = allocb_oversize(size, KM_SLEEP);
1182 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1183 (uintptr_t)mp);
1184 return (mp);
1186 index = 0;
1189 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1190 mp = dbp->db_mblk;
1191 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1192 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1193 mp->b_rptr = mp->b_wptr = dbp->db_base;
1194 mp->b_queue = NULL;
1195 MBLK_BAND_FLAG_WORD(mp) = 0;
1196 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1200 } else {
1201 while ((mp = allocb(size, pri)) == NULL) {
1202 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1203 return (NULL);
1207 return (mp);
1211 * Call function 'func' with 'arg' when a class zero block can
1212 * be allocated with priority 'pri'.
1214 bufcall_id_t
1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1217 return (bufcall(1, pri, func, arg));
1221 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1222 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1223 * This provides consistency for all internal allocators of ioctl.
1225 mblk_t *
1226 mkiocb(uint_t cmd)
1228 struct iocblk *ioc;
1229 mblk_t *mp;
1232 * Allocate enough space for any of the ioctl related messages.
1234 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1235 return (NULL);
1237 bzero(mp->b_rptr, sizeof (union ioctypes));
1240 * Set the mblk_t information and ptrs correctly.
1242 mp->b_wptr += sizeof (struct iocblk);
1243 mp->b_datap->db_type = M_IOCTL;
1246 * Fill in the fields.
1248 ioc = (struct iocblk *)mp->b_rptr;
1249 ioc->ioc_cmd = cmd;
1250 ioc->ioc_cr = kcred;
1251 ioc->ioc_id = getiocseqno();
1252 ioc->ioc_flag = IOC_NATIVE;
1253 return (mp);
1257 * test if block of given size can be allocated with a request of
1258 * the given priority.
1259 * 'pri' is no longer used, but is retained for compatibility.
1261 /* ARGSUSED */
1263 testb(size_t size, uint_t pri)
1265 return ((size + sizeof (dblk_t)) <= kmem_avail());
1269 * Call function 'func' with argument 'arg' when there is a reasonably
1270 * good chance that a block of size 'size' can be allocated.
1271 * 'pri' is no longer used, but is retained for compatibility.
1273 /* ARGSUSED */
1274 bufcall_id_t
1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1277 static long bid = 1; /* always odd to save checking for zero */
1278 bufcall_id_t bc_id;
1279 struct strbufcall *bcp;
1281 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1282 return (0);
1284 bcp->bc_func = func;
1285 bcp->bc_arg = arg;
1286 bcp->bc_size = size;
1287 bcp->bc_next = NULL;
1288 bcp->bc_executor = NULL;
1290 mutex_enter(&strbcall_lock);
1292 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1293 * should be no references to bcp since it may be freed by
1294 * runbufcalls(). Since bcp_id field is returned, we save its value in
1295 * the local var.
1297 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1300 * add newly allocated stream event to existing
1301 * linked list of events.
1303 if (strbcalls.bc_head == NULL) {
1304 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1305 } else {
1306 strbcalls.bc_tail->bc_next = bcp;
1307 strbcalls.bc_tail = bcp;
1310 cv_signal(&strbcall_cv);
1311 mutex_exit(&strbcall_lock);
1312 return (bc_id);
1316 * Cancel a bufcall request.
1318 void
1319 unbufcall(bufcall_id_t id)
1321 strbufcall_t *bcp, *pbcp;
1323 mutex_enter(&strbcall_lock);
1324 again:
1325 pbcp = NULL;
1326 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1327 if (id == bcp->bc_id)
1328 break;
1329 pbcp = bcp;
1331 if (bcp) {
1332 if (bcp->bc_executor != NULL) {
1333 if (bcp->bc_executor != curthread) {
1334 cv_wait(&bcall_cv, &strbcall_lock);
1335 goto again;
1337 } else {
1338 if (pbcp)
1339 pbcp->bc_next = bcp->bc_next;
1340 else
1341 strbcalls.bc_head = bcp->bc_next;
1342 if (bcp == strbcalls.bc_tail)
1343 strbcalls.bc_tail = pbcp;
1344 kmem_free(bcp, sizeof (strbufcall_t));
1347 mutex_exit(&strbcall_lock);
1351 * Duplicate a message block by block (uses dupb), returning
1352 * a pointer to the duplicate message.
1353 * Returns a non-NULL value only if the entire message
1354 * was dup'd.
1356 mblk_t *
1357 dupmsg(mblk_t *bp)
1359 mblk_t *head, *nbp;
1361 if (!bp || !(nbp = head = dupb(bp)))
1362 return (NULL);
1364 while (bp->b_cont) {
1365 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1366 freemsg(head);
1367 return (NULL);
1369 nbp = nbp->b_cont;
1370 bp = bp->b_cont;
1372 return (head);
1375 #define DUPB_NOLOAN(bp) \
1376 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1377 copyb((bp)) : dupb((bp)))
1379 mblk_t *
1380 dupmsg_noloan(mblk_t *bp)
1382 mblk_t *head, *nbp;
1384 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1385 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1386 return (NULL);
1388 while (bp->b_cont) {
1389 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1390 freemsg(head);
1391 return (NULL);
1393 nbp = nbp->b_cont;
1394 bp = bp->b_cont;
1396 return (head);
1400 * Copy data from message and data block to newly allocated message and
1401 * data block. Returns new message block pointer, or NULL if error.
1402 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1403 * as in the original even when db_base is not word aligned. (bug 1052877)
1405 mblk_t *
1406 copyb(mblk_t *bp)
1408 mblk_t *nbp;
1409 dblk_t *dp, *ndp;
1410 uchar_t *base;
1411 size_t size;
1412 size_t unaligned;
1414 ASSERT(bp->b_wptr >= bp->b_rptr);
1416 dp = bp->b_datap;
1417 if (dp->db_fthdr != NULL)
1418 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1421 * Special handling for Multidata message; this should be
1422 * removed once a copy-callback routine is made available.
1424 if (dp->db_type == M_MULTIDATA) {
1425 cred_t *cr;
1427 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1428 return (NULL);
1430 nbp->b_flag = bp->b_flag;
1431 nbp->b_band = bp->b_band;
1432 ndp = nbp->b_datap;
1434 /* See comments below on potential issues. */
1435 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1437 ASSERT(ndp->db_type == dp->db_type);
1438 cr = dp->db_credp;
1439 if (cr != NULL)
1440 crhold(ndp->db_credp = cr);
1441 ndp->db_cpid = dp->db_cpid;
1442 return (nbp);
1445 size = dp->db_lim - dp->db_base;
1446 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1447 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1448 return (NULL);
1449 nbp->b_flag = bp->b_flag;
1450 nbp->b_band = bp->b_band;
1451 ndp = nbp->b_datap;
1454 * Well, here is a potential issue. If we are trying to
1455 * trace a flow, and we copy the message, we might lose
1456 * information about where this message might have been.
1457 * So we should inherit the FT data. On the other hand,
1458 * a user might be interested only in alloc to free data.
1459 * So I guess the real answer is to provide a tunable.
1461 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1463 base = ndp->db_base + unaligned;
1464 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1466 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1467 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1469 return (nbp);
1473 * Copy data from message to newly allocated message using new
1474 * data blocks. Returns a pointer to the new message, or NULL if error.
1476 mblk_t *
1477 copymsg(mblk_t *bp)
1479 mblk_t *head, *nbp;
1481 if (!bp || !(nbp = head = copyb(bp)))
1482 return (NULL);
1484 while (bp->b_cont) {
1485 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1486 freemsg(head);
1487 return (NULL);
1489 nbp = nbp->b_cont;
1490 bp = bp->b_cont;
1492 return (head);
1496 * link a message block to tail of message
1498 void
1499 linkb(mblk_t *mp, mblk_t *bp)
1501 ASSERT(mp && bp);
1503 for (; mp->b_cont; mp = mp->b_cont)
1505 mp->b_cont = bp;
1509 * unlink a message block from head of message
1510 * return pointer to new message.
1511 * NULL if message becomes empty.
1513 mblk_t *
1514 unlinkb(mblk_t *bp)
1516 mblk_t *bp1;
1518 bp1 = bp->b_cont;
1519 bp->b_cont = NULL;
1520 return (bp1);
1524 * remove a message block "bp" from message "mp"
1526 * Return pointer to new message or NULL if no message remains.
1527 * Return -1 if bp is not found in message.
1529 mblk_t *
1530 rmvb(mblk_t *mp, mblk_t *bp)
1532 mblk_t *tmp;
1533 mblk_t *lastp = NULL;
1535 ASSERT(mp && bp);
1536 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1537 if (tmp == bp) {
1538 if (lastp)
1539 lastp->b_cont = tmp->b_cont;
1540 else
1541 mp = tmp->b_cont;
1542 tmp->b_cont = NULL;
1543 return (mp);
1545 lastp = tmp;
1547 return ((mblk_t *)-1);
1551 * Concatenate and align first len bytes of common
1552 * message type. Len == -1, means concat everything.
1553 * Returns 1 on success, 0 on failure
1554 * After the pullup, mp points to the pulled up data.
1557 pullupmsg(mblk_t *mp, ssize_t len)
1559 mblk_t *bp, *b_cont;
1560 dblk_t *dbp;
1561 ssize_t n;
1563 ASSERT(mp->b_datap->db_ref > 0);
1564 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1567 * We won't handle Multidata message, since it contains
1568 * metadata which this function has no knowledge of; we
1569 * assert on DEBUG, and return failure otherwise.
1571 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1572 if (mp->b_datap->db_type == M_MULTIDATA)
1573 return (0);
1575 if (len == -1) {
1576 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1577 return (1);
1578 len = xmsgsize(mp);
1579 } else {
1580 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1581 ASSERT(first_mblk_len >= 0);
1583 * If the length is less than that of the first mblk,
1584 * we want to pull up the message into an aligned mblk.
1585 * Though not part of the spec, some callers assume it.
1587 if (len <= first_mblk_len) {
1588 if (str_aligned(mp->b_rptr))
1589 return (1);
1590 len = first_mblk_len;
1591 } else if (xmsgsize(mp) < len)
1592 return (0);
1595 if ((bp = allocb_tmpl(len, mp)) == NULL)
1596 return (0);
1598 dbp = bp->b_datap;
1599 *bp = *mp; /* swap mblks so bp heads the old msg... */
1600 mp->b_datap = dbp; /* ... and mp heads the new message */
1601 mp->b_datap->db_mblk = mp;
1602 bp->b_datap->db_mblk = bp;
1603 mp->b_rptr = mp->b_wptr = dbp->db_base;
1605 do {
1606 ASSERT(bp->b_datap->db_ref > 0);
1607 ASSERT(bp->b_wptr >= bp->b_rptr);
1608 n = MIN(bp->b_wptr - bp->b_rptr, len);
1609 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1610 if (n > 0)
1611 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1612 mp->b_wptr += n;
1613 bp->b_rptr += n;
1614 len -= n;
1615 if (bp->b_rptr != bp->b_wptr)
1616 break;
1617 b_cont = bp->b_cont;
1618 freeb(bp);
1619 bp = b_cont;
1620 } while (len && bp);
1622 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1624 return (1);
1628 * Concatenate and align at least the first len bytes of common message
1629 * type. Len == -1 means concatenate everything. The original message is
1630 * unaltered. Returns a pointer to a new message on success, otherwise
1631 * returns NULL.
1633 mblk_t *
1634 msgpullup(mblk_t *mp, ssize_t len)
1636 mblk_t *newmp;
1637 ssize_t totlen;
1638 ssize_t n;
1641 * We won't handle Multidata message, since it contains
1642 * metadata which this function has no knowledge of; we
1643 * assert on DEBUG, and return failure otherwise.
1645 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1646 if (mp->b_datap->db_type == M_MULTIDATA)
1647 return (NULL);
1649 totlen = xmsgsize(mp);
1651 if ((len > 0) && (len > totlen))
1652 return (NULL);
1655 * Copy all of the first msg type into one new mblk, then dupmsg
1656 * and link the rest onto this.
1659 len = totlen;
1661 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1662 return (NULL);
1664 newmp->b_flag = mp->b_flag;
1665 newmp->b_band = mp->b_band;
1667 while (len > 0) {
1668 n = mp->b_wptr - mp->b_rptr;
1669 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1670 if (n > 0)
1671 bcopy(mp->b_rptr, newmp->b_wptr, n);
1672 newmp->b_wptr += n;
1673 len -= n;
1674 mp = mp->b_cont;
1677 if (mp != NULL) {
1678 newmp->b_cont = dupmsg(mp);
1679 if (newmp->b_cont == NULL) {
1680 freemsg(newmp);
1681 return (NULL);
1685 return (newmp);
1689 * Trim bytes from message
1690 * len > 0, trim from head
1691 * len < 0, trim from tail
1692 * Returns 1 on success, 0 on failure.
1695 adjmsg(mblk_t *mp, ssize_t len)
1697 mblk_t *bp;
1698 mblk_t *save_bp = NULL;
1699 mblk_t *prev_bp;
1700 mblk_t *bcont;
1701 unsigned char type;
1702 ssize_t n;
1703 int fromhead;
1704 int first;
1706 ASSERT(mp != NULL);
1708 * We won't handle Multidata message, since it contains
1709 * metadata which this function has no knowledge of; we
1710 * assert on DEBUG, and return failure otherwise.
1712 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1713 if (mp->b_datap->db_type == M_MULTIDATA)
1714 return (0);
1716 if (len < 0) {
1717 fromhead = 0;
1718 len = -len;
1719 } else {
1720 fromhead = 1;
1723 if (xmsgsize(mp) < len)
1724 return (0);
1726 if (fromhead) {
1727 first = 1;
1728 while (len) {
1729 ASSERT(mp->b_wptr >= mp->b_rptr);
1730 n = MIN(mp->b_wptr - mp->b_rptr, len);
1731 mp->b_rptr += n;
1732 len -= n;
1735 * If this is not the first zero length
1736 * message remove it
1738 if (!first && (mp->b_wptr == mp->b_rptr)) {
1739 bcont = mp->b_cont;
1740 freeb(mp);
1741 mp = save_bp->b_cont = bcont;
1742 } else {
1743 save_bp = mp;
1744 mp = mp->b_cont;
1746 first = 0;
1748 } else {
1749 type = mp->b_datap->db_type;
1750 while (len) {
1751 bp = mp;
1752 save_bp = NULL;
1755 * Find the last message of same type
1757 while (bp && bp->b_datap->db_type == type) {
1758 ASSERT(bp->b_wptr >= bp->b_rptr);
1759 prev_bp = save_bp;
1760 save_bp = bp;
1761 bp = bp->b_cont;
1763 if (save_bp == NULL)
1764 break;
1765 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1766 save_bp->b_wptr -= n;
1767 len -= n;
1770 * If this is not the first message
1771 * and we have taken away everything
1772 * from this message, remove it
1775 if ((save_bp != mp) &&
1776 (save_bp->b_wptr == save_bp->b_rptr)) {
1777 bcont = save_bp->b_cont;
1778 freeb(save_bp);
1779 prev_bp->b_cont = bcont;
1783 return (1);
1787 * get number of data bytes in message
1789 size_t
1790 msgdsize(mblk_t *bp)
1792 size_t count = 0;
1794 for (; bp; bp = bp->b_cont)
1795 if (bp->b_datap->db_type == M_DATA) {
1796 ASSERT(bp->b_wptr >= bp->b_rptr);
1797 count += bp->b_wptr - bp->b_rptr;
1799 return (count);
1803 * Get a message off head of queue
1805 * If queue has no buffers then mark queue
1806 * with QWANTR. (queue wants to be read by
1807 * someone when data becomes available)
1809 * If there is something to take off then do so.
1810 * If queue falls below hi water mark turn off QFULL
1811 * flag. Decrement weighted count of queue.
1812 * Also turn off QWANTR because queue is being read.
1814 * The queue count is maintained on a per-band basis.
1815 * Priority band 0 (normal messages) uses q_count,
1816 * q_lowat, etc. Non-zero priority bands use the
1817 * fields in their respective qband structures
1818 * (qb_count, qb_lowat, etc.) All messages appear
1819 * on the same list, linked via their b_next pointers.
1820 * q_first is the head of the list. q_count does
1821 * not reflect the size of all the messages on the
1822 * queue. It only reflects those messages in the
1823 * normal band of flow. The one exception to this
1824 * deals with high priority messages. They are in
1825 * their own conceptual "band", but are accounted
1826 * against q_count.
1828 * If queue count is below the lo water mark and QWANTW
1829 * is set, enable the closest backq which has a service
1830 * procedure and turn off the QWANTW flag.
1832 * getq could be built on top of rmvq, but isn't because
1833 * of performance considerations.
1835 * A note on the use of q_count and q_mblkcnt:
1836 * q_count is the traditional byte count for messages that
1837 * have been put on a queue. Documentation tells us that
1838 * we shouldn't rely on that count, but some drivers/modules
1839 * do. What was needed, however, is a mechanism to prevent
1840 * runaway streams from consuming all of the resources,
1841 * and particularly be able to flow control zero-length
1842 * messages. q_mblkcnt is used for this purpose. It
1843 * counts the number of mblk's that are being put on
1844 * the queue. The intention here, is that each mblk should
1845 * contain one byte of data and, for the purpose of
1846 * flow-control, logically does. A queue will become
1847 * full when EITHER of these values (q_count and q_mblkcnt)
1848 * reach the highwater mark. It will clear when BOTH
1849 * of them drop below the highwater mark. And it will
1850 * backenable when BOTH of them drop below the lowwater
1851 * mark.
1852 * With this algorithm, a driver/module might be able
1853 * to find a reasonably accurate q_count, and the
1854 * framework can still try and limit resource usage.
1856 mblk_t *
1857 getq(queue_t *q)
1859 mblk_t *bp;
1860 uchar_t band = 0;
1862 bp = getq_noenab(q, 0);
1863 if (bp != NULL)
1864 band = bp->b_band;
1867 * Inlined from qbackenable().
1868 * Quick check without holding the lock.
1870 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1871 return (bp);
1873 qbackenable(q, band);
1874 return (bp);
1878 * Calculate number of data bytes in a single data message block taking
1879 * multidata messages into account.
1882 #define ADD_MBLK_SIZE(mp, size) \
1883 if (DB_TYPE(mp) != M_MULTIDATA) { \
1884 (size) += MBLKL(mp); \
1885 } else { \
1886 uint_t pinuse; \
1888 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
1889 (size) += pinuse; \
1893 * Returns the number of bytes in a message (a message is defined as a
1894 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1895 * also return the number of distinct mblks in the message.
1898 mp_cont_len(mblk_t *bp, int *mblkcnt)
1900 mblk_t *mp;
1901 int mblks = 0;
1902 int bytes = 0;
1904 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1905 ADD_MBLK_SIZE(mp, bytes);
1906 mblks++;
1909 if (mblkcnt != NULL)
1910 *mblkcnt = mblks;
1912 return (bytes);
1916 * Like getq() but does not backenable. This is used by the stream
1917 * head when a putback() is likely. The caller must call qbackenable()
1918 * after it is done with accessing the queue.
1919 * The rbytes arguments to getq_noneab() allows callers to specify a
1920 * the maximum number of bytes to return. If the current amount on the
1921 * queue is less than this then the entire message will be returned.
1922 * A value of 0 returns the entire message and is equivalent to the old
1923 * default behaviour prior to the addition of the rbytes argument.
1925 mblk_t *
1926 getq_noenab(queue_t *q, ssize_t rbytes)
1928 mblk_t *bp, *mp1;
1929 mblk_t *mp2 = NULL;
1930 qband_t *qbp;
1931 kthread_id_t freezer;
1932 int bytecnt = 0, mblkcnt = 0;
1934 /* freezestr should allow its caller to call getq/putq */
1935 freezer = STREAM(q)->sd_freezer;
1936 if (freezer == curthread) {
1937 ASSERT(frozenstr(q));
1938 ASSERT(MUTEX_HELD(QLOCK(q)));
1939 } else
1940 mutex_enter(QLOCK(q));
1942 if ((bp = q->q_first) == 0) {
1943 q->q_flag |= QWANTR;
1944 } else {
1946 * If the caller supplied a byte threshold and there is
1947 * more than this amount on the queue then break up the
1948 * the message appropriately. We can only safely do
1949 * this for M_DATA messages.
1951 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1952 (q->q_count > rbytes)) {
1954 * Inline version of mp_cont_len() which terminates
1955 * when we meet or exceed rbytes.
1957 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1958 mblkcnt++;
1959 ADD_MBLK_SIZE(mp1, bytecnt);
1960 if (bytecnt >= rbytes)
1961 break;
1964 * We need to account for the following scenarios:
1966 * 1) Too much data in the first message:
1967 * mp1 will be the mblk which puts us over our
1968 * byte limit.
1969 * 2) Not enough data in the first message:
1970 * mp1 will be NULL.
1971 * 3) Exactly the right amount of data contained within
1972 * whole mblks:
1973 * mp1->b_cont will be where we break the message.
1975 if (bytecnt > rbytes) {
1977 * Dup/copy mp1 and put what we don't need
1978 * back onto the queue. Adjust the read/write
1979 * and continuation pointers appropriately
1980 * and decrement the current mblk count to
1981 * reflect we are putting an mblk back onto
1982 * the queue.
1983 * When adjusting the message pointers, it's
1984 * OK to use the existing bytecnt and the
1985 * requested amount (rbytes) to calculate the
1986 * the new write offset (b_wptr) of what we
1987 * are taking. However, we cannot use these
1988 * values when calculating the read offset of
1989 * the mblk we are putting back on the queue.
1990 * This is because the begining (b_rptr) of the
1991 * mblk represents some arbitrary point within
1992 * the message.
1993 * It's simplest to do this by advancing b_rptr
1994 * by the new length of mp1 as we don't have to
1995 * remember any intermediate state.
1997 ASSERT(mp1 != NULL);
1998 mblkcnt--;
1999 if ((mp2 = dupb(mp1)) == NULL &&
2000 (mp2 = copyb(mp1)) == NULL) {
2001 bytecnt = mblkcnt = 0;
2002 goto dup_failed;
2004 mp2->b_cont = mp1->b_cont;
2005 mp1->b_wptr -= bytecnt - rbytes;
2006 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2007 mp1->b_cont = NULL;
2008 bytecnt = rbytes;
2009 } else {
2011 * Either there is not enough data in the first
2012 * message or there is no excess data to deal
2013 * with. If mp1 is NULL, we are taking the
2014 * whole message. No need to do anything.
2015 * Otherwise we assign mp1->b_cont to mp2 as
2016 * we will be putting this back onto the head of
2017 * the queue.
2019 if (mp1 != NULL) {
2020 mp2 = mp1->b_cont;
2021 mp1->b_cont = NULL;
2025 * If mp2 is not NULL then we have part of the message
2026 * to put back onto the queue.
2028 if (mp2 != NULL) {
2029 if ((mp2->b_next = bp->b_next) == NULL)
2030 q->q_last = mp2;
2031 else
2032 bp->b_next->b_prev = mp2;
2033 q->q_first = mp2;
2034 } else {
2035 if ((q->q_first = bp->b_next) == NULL)
2036 q->q_last = NULL;
2037 else
2038 q->q_first->b_prev = NULL;
2040 } else {
2042 * Either no byte threshold was supplied, there is
2043 * not enough on the queue or we failed to
2044 * duplicate/copy a data block. In these cases we
2045 * just take the entire first message.
2047 dup_failed:
2048 bytecnt = mp_cont_len(bp, &mblkcnt);
2049 if ((q->q_first = bp->b_next) == NULL)
2050 q->q_last = NULL;
2051 else
2052 q->q_first->b_prev = NULL;
2054 if (bp->b_band == 0) {
2055 q->q_count -= bytecnt;
2056 q->q_mblkcnt -= mblkcnt;
2057 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2058 (q->q_mblkcnt < q->q_hiwat))) {
2059 q->q_flag &= ~QFULL;
2061 } else {
2062 int i;
2064 ASSERT(bp->b_band <= q->q_nband);
2065 ASSERT(q->q_bandp != NULL);
2066 ASSERT(MUTEX_HELD(QLOCK(q)));
2067 qbp = q->q_bandp;
2068 i = bp->b_band;
2069 while (--i > 0)
2070 qbp = qbp->qb_next;
2071 if (qbp->qb_first == qbp->qb_last) {
2072 qbp->qb_first = NULL;
2073 qbp->qb_last = NULL;
2074 } else {
2075 qbp->qb_first = bp->b_next;
2077 qbp->qb_count -= bytecnt;
2078 qbp->qb_mblkcnt -= mblkcnt;
2079 if (qbp->qb_mblkcnt == 0 ||
2080 ((qbp->qb_count < qbp->qb_hiwat) &&
2081 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2082 qbp->qb_flag &= ~QB_FULL;
2085 q->q_flag &= ~QWANTR;
2086 bp->b_next = NULL;
2087 bp->b_prev = NULL;
2089 if (freezer != curthread)
2090 mutex_exit(QLOCK(q));
2092 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2094 return (bp);
2098 * Determine if a backenable is needed after removing a message in the
2099 * specified band.
2100 * NOTE: This routine assumes that something like getq_noenab() has been
2101 * already called.
2103 * For the read side it is ok to hold sd_lock across calling this (and the
2104 * stream head often does).
2105 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2107 void
2108 qbackenable(queue_t *q, uchar_t band)
2110 int backenab = 0;
2111 qband_t *qbp;
2112 kthread_id_t freezer;
2114 ASSERT(q);
2115 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2118 * Quick check without holding the lock.
2119 * OK since after getq() has lowered the q_count these flags
2120 * would not change unless either the qbackenable() is done by
2121 * another thread (which is ok) or the queue has gotten QFULL
2122 * in which case another backenable will take place when the queue
2123 * drops below q_lowat.
2125 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2126 return;
2128 /* freezestr should allow its caller to call getq/putq */
2129 freezer = STREAM(q)->sd_freezer;
2130 if (freezer == curthread) {
2131 ASSERT(frozenstr(q));
2132 ASSERT(MUTEX_HELD(QLOCK(q)));
2133 } else
2134 mutex_enter(QLOCK(q));
2136 if (band == 0) {
2137 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2138 q->q_mblkcnt < q->q_lowat)) {
2139 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2141 } else {
2142 int i;
2144 ASSERT((unsigned)band <= q->q_nband);
2145 ASSERT(q->q_bandp != NULL);
2147 qbp = q->q_bandp;
2148 i = band;
2149 while (--i > 0)
2150 qbp = qbp->qb_next;
2152 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2153 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2154 backenab = qbp->qb_flag & QB_WANTW;
2158 if (backenab == 0) {
2159 if (freezer != curthread)
2160 mutex_exit(QLOCK(q));
2161 return;
2164 /* Have to drop the lock across strwakeq and backenable */
2165 if (backenab & QWANTWSYNC)
2166 q->q_flag &= ~QWANTWSYNC;
2167 if (backenab & (QWANTW|QB_WANTW)) {
2168 if (band != 0)
2169 qbp->qb_flag &= ~QB_WANTW;
2170 else {
2171 q->q_flag &= ~QWANTW;
2175 if (freezer != curthread)
2176 mutex_exit(QLOCK(q));
2178 if (backenab & QWANTWSYNC)
2179 strwakeq(q, QWANTWSYNC);
2180 if (backenab & (QWANTW|QB_WANTW))
2181 backenable(q, band);
2185 * Remove a message from a queue. The queue count and other
2186 * flow control parameters are adjusted and the back queue
2187 * enabled if necessary.
2189 * rmvq can be called with the stream frozen, but other utility functions
2190 * holding QLOCK, and by streams modules without any locks/frozen.
2192 void
2193 rmvq(queue_t *q, mblk_t *mp)
2195 ASSERT(mp != NULL);
2197 rmvq_noenab(q, mp);
2198 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2200 * qbackenable can handle a frozen stream but not a "random"
2201 * qlock being held. Drop lock across qbackenable.
2203 mutex_exit(QLOCK(q));
2204 qbackenable(q, mp->b_band);
2205 mutex_enter(QLOCK(q));
2206 } else {
2207 qbackenable(q, mp->b_band);
2212 * Like rmvq() but without any backenabling.
2213 * This exists to handle SR_CONSOL_DATA in strrput().
2215 void
2216 rmvq_noenab(queue_t *q, mblk_t *mp)
2218 int i;
2219 qband_t *qbp = NULL;
2220 kthread_id_t freezer;
2221 int bytecnt = 0, mblkcnt = 0;
2223 freezer = STREAM(q)->sd_freezer;
2224 if (freezer == curthread) {
2225 ASSERT(frozenstr(q));
2226 ASSERT(MUTEX_HELD(QLOCK(q)));
2227 } else if (MUTEX_HELD(QLOCK(q))) {
2228 /* Don't drop lock on exit */
2229 freezer = curthread;
2230 } else
2231 mutex_enter(QLOCK(q));
2233 ASSERT(mp->b_band <= q->q_nband);
2234 if (mp->b_band != 0) { /* Adjust band pointers */
2235 ASSERT(q->q_bandp != NULL);
2236 qbp = q->q_bandp;
2237 i = mp->b_band;
2238 while (--i > 0)
2239 qbp = qbp->qb_next;
2240 if (mp == qbp->qb_first) {
2241 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2242 qbp->qb_first = mp->b_next;
2243 else
2244 qbp->qb_first = NULL;
2246 if (mp == qbp->qb_last) {
2247 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2248 qbp->qb_last = mp->b_prev;
2249 else
2250 qbp->qb_last = NULL;
2255 * Remove the message from the list.
2257 if (mp->b_prev)
2258 mp->b_prev->b_next = mp->b_next;
2259 else
2260 q->q_first = mp->b_next;
2261 if (mp->b_next)
2262 mp->b_next->b_prev = mp->b_prev;
2263 else
2264 q->q_last = mp->b_prev;
2265 mp->b_next = NULL;
2266 mp->b_prev = NULL;
2268 /* Get the size of the message for q_count accounting */
2269 bytecnt = mp_cont_len(mp, &mblkcnt);
2271 if (mp->b_band == 0) { /* Perform q_count accounting */
2272 q->q_count -= bytecnt;
2273 q->q_mblkcnt -= mblkcnt;
2274 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2275 (q->q_mblkcnt < q->q_hiwat))) {
2276 q->q_flag &= ~QFULL;
2278 } else { /* Perform qb_count accounting */
2279 qbp->qb_count -= bytecnt;
2280 qbp->qb_mblkcnt -= mblkcnt;
2281 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2282 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2283 qbp->qb_flag &= ~QB_FULL;
2286 if (freezer != curthread)
2287 mutex_exit(QLOCK(q));
2289 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2293 * Empty a queue.
2294 * If flag is set, remove all messages. Otherwise, remove
2295 * only non-control messages. If queue falls below its low
2296 * water mark, and QWANTW is set, enable the nearest upstream
2297 * service procedure.
2299 * Historical note: when merging the M_FLUSH code in strrput with this
2300 * code one difference was discovered. flushq did not have a check
2301 * for q_lowat == 0 in the backenabling test.
2303 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2304 * if one exists on the queue.
2306 void
2307 flushq_common(queue_t *q, int flag, int pcproto_flag)
2309 mblk_t *mp, *nmp;
2310 qband_t *qbp;
2311 int backenab = 0;
2312 unsigned char bpri;
2313 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2315 if (q->q_first == NULL)
2316 return;
2318 mutex_enter(QLOCK(q));
2319 mp = q->q_first;
2320 q->q_first = NULL;
2321 q->q_last = NULL;
2322 q->q_count = 0;
2323 q->q_mblkcnt = 0;
2324 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2325 qbp->qb_first = NULL;
2326 qbp->qb_last = NULL;
2327 qbp->qb_count = 0;
2328 qbp->qb_mblkcnt = 0;
2329 qbp->qb_flag &= ~QB_FULL;
2331 q->q_flag &= ~QFULL;
2332 mutex_exit(QLOCK(q));
2333 while (mp) {
2334 nmp = mp->b_next;
2335 mp->b_next = mp->b_prev = NULL;
2337 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2339 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2340 (void) putq(q, mp);
2341 else if (flag || datamsg(mp->b_datap->db_type))
2342 freemsg(mp);
2343 else
2344 (void) putq(q, mp);
2345 mp = nmp;
2347 bpri = 1;
2348 mutex_enter(QLOCK(q));
2349 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2350 if ((qbp->qb_flag & QB_WANTW) &&
2351 (((qbp->qb_count < qbp->qb_lowat) &&
2352 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2353 qbp->qb_lowat == 0)) {
2354 qbp->qb_flag &= ~QB_WANTW;
2355 backenab = 1;
2356 qbf[bpri] = 1;
2357 } else
2358 qbf[bpri] = 0;
2359 bpri++;
2361 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2362 if ((q->q_flag & QWANTW) &&
2363 (((q->q_count < q->q_lowat) &&
2364 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2365 q->q_flag &= ~QWANTW;
2366 backenab = 1;
2367 qbf[0] = 1;
2368 } else
2369 qbf[0] = 0;
2372 * If any band can now be written to, and there is a writer
2373 * for that band, then backenable the closest service procedure.
2375 if (backenab) {
2376 mutex_exit(QLOCK(q));
2377 for (bpri = q->q_nband; bpri != 0; bpri--)
2378 if (qbf[bpri])
2379 backenable(q, bpri);
2380 if (qbf[0])
2381 backenable(q, 0);
2382 } else
2383 mutex_exit(QLOCK(q));
2387 * The real flushing takes place in flushq_common. This is done so that
2388 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2389 * or not. Currently the only place that uses this flag is the stream head.
2391 void
2392 flushq(queue_t *q, int flag)
2394 flushq_common(q, flag, 0);
2398 * Flush the queue of messages of the given priority band.
2399 * There is some duplication of code between flushq and flushband.
2400 * This is because we want to optimize the code as much as possible.
2401 * The assumption is that there will be more messages in the normal
2402 * (priority 0) band than in any other.
2404 * Historical note: when merging the M_FLUSH code in strrput with this
2405 * code one difference was discovered. flushband had an extra check for
2406 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2407 * case. That check does not match the man page for flushband and was not
2408 * in the strrput flush code hence it was removed.
2410 void
2411 flushband(queue_t *q, unsigned char pri, int flag)
2413 mblk_t *mp;
2414 mblk_t *nmp;
2415 mblk_t *last;
2416 qband_t *qbp;
2417 int band;
2419 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2420 if (pri > q->q_nband) {
2421 return;
2423 mutex_enter(QLOCK(q));
2424 if (pri == 0) {
2425 mp = q->q_first;
2426 q->q_first = NULL;
2427 q->q_last = NULL;
2428 q->q_count = 0;
2429 q->q_mblkcnt = 0;
2430 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2431 qbp->qb_first = NULL;
2432 qbp->qb_last = NULL;
2433 qbp->qb_count = 0;
2434 qbp->qb_mblkcnt = 0;
2435 qbp->qb_flag &= ~QB_FULL;
2437 q->q_flag &= ~QFULL;
2438 mutex_exit(QLOCK(q));
2439 while (mp) {
2440 nmp = mp->b_next;
2441 mp->b_next = mp->b_prev = NULL;
2442 if ((mp->b_band == 0) &&
2443 ((flag == FLUSHALL) ||
2444 datamsg(mp->b_datap->db_type)))
2445 freemsg(mp);
2446 else
2447 (void) putq(q, mp);
2448 mp = nmp;
2450 mutex_enter(QLOCK(q));
2451 if ((q->q_flag & QWANTW) &&
2452 (((q->q_count < q->q_lowat) &&
2453 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2454 q->q_flag &= ~QWANTW;
2455 mutex_exit(QLOCK(q));
2457 backenable(q, pri);
2458 } else
2459 mutex_exit(QLOCK(q));
2460 } else { /* pri != 0 */
2461 boolean_t flushed = B_FALSE;
2462 band = pri;
2464 ASSERT(MUTEX_HELD(QLOCK(q)));
2465 qbp = q->q_bandp;
2466 while (--band > 0)
2467 qbp = qbp->qb_next;
2468 mp = qbp->qb_first;
2469 if (mp == NULL) {
2470 mutex_exit(QLOCK(q));
2471 return;
2473 last = qbp->qb_last->b_next;
2475 * rmvq_noenab() and freemsg() are called for each mblk that
2476 * meets the criteria. The loop is executed until the last
2477 * mblk has been processed.
2479 while (mp != last) {
2480 ASSERT(mp->b_band == pri);
2481 nmp = mp->b_next;
2482 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2483 rmvq_noenab(q, mp);
2484 freemsg(mp);
2485 flushed = B_TRUE;
2487 mp = nmp;
2489 mutex_exit(QLOCK(q));
2492 * If any mblk(s) has been freed, we know that qbackenable()
2493 * will need to be called.
2495 if (flushed)
2496 qbackenable(q, pri);
2501 * Return 1 if the queue is not full. If the queue is full, return
2502 * 0 (may not put message) and set QWANTW flag (caller wants to write
2503 * to the queue).
2506 canput(queue_t *q)
2508 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2510 /* this is for loopback transports, they should not do a canput */
2511 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2513 /* Find next forward module that has a service procedure */
2514 q = q->q_nfsrv;
2516 if (!(q->q_flag & QFULL)) {
2517 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2518 return (1);
2520 mutex_enter(QLOCK(q));
2521 if (q->q_flag & QFULL) {
2522 q->q_flag |= QWANTW;
2523 mutex_exit(QLOCK(q));
2524 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2525 return (0);
2527 mutex_exit(QLOCK(q));
2528 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2529 return (1);
2533 * This is the new canput for use with priority bands. Return 1 if the
2534 * band is not full. If the band is full, return 0 (may not put message)
2535 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2536 * write to the queue).
2539 bcanput(queue_t *q, unsigned char pri)
2541 qband_t *qbp;
2543 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2544 if (!q)
2545 return (0);
2547 /* Find next forward module that has a service procedure */
2548 q = q->q_nfsrv;
2550 mutex_enter(QLOCK(q));
2551 if (pri == 0) {
2552 if (q->q_flag & QFULL) {
2553 q->q_flag |= QWANTW;
2554 mutex_exit(QLOCK(q));
2555 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2556 "bcanput:%p %X %d", q, pri, 0);
2557 return (0);
2559 } else { /* pri != 0 */
2560 if (pri > q->q_nband) {
2562 * No band exists yet, so return success.
2564 mutex_exit(QLOCK(q));
2565 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2566 "bcanput:%p %X %d", q, pri, 1);
2567 return (1);
2569 qbp = q->q_bandp;
2570 while (--pri)
2571 qbp = qbp->qb_next;
2572 if (qbp->qb_flag & QB_FULL) {
2573 qbp->qb_flag |= QB_WANTW;
2574 mutex_exit(QLOCK(q));
2575 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2576 "bcanput:%p %X %d", q, pri, 0);
2577 return (0);
2580 mutex_exit(QLOCK(q));
2581 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2582 "bcanput:%p %X %d", q, pri, 1);
2583 return (1);
2587 * Put a message on a queue.
2589 * Messages are enqueued on a priority basis. The priority classes
2590 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2591 * and B_NORMAL (type < QPCTL && band == 0).
2593 * Add appropriate weighted data block sizes to queue count.
2594 * If queue hits high water mark then set QFULL flag.
2596 * If QNOENAB is not set (putq is allowed to enable the queue),
2597 * enable the queue only if the message is PRIORITY,
2598 * or the QWANTR flag is set (indicating that the service procedure
2599 * is ready to read the queue. This implies that a service
2600 * procedure must NEVER put a high priority message back on its own
2601 * queue, as this would result in an infinite loop (!).
2604 putq(queue_t *q, mblk_t *bp)
2606 mblk_t *tmp;
2607 qband_t *qbp = NULL;
2608 int mcls = (int)queclass(bp);
2609 kthread_id_t freezer;
2610 int bytecnt = 0, mblkcnt = 0;
2612 freezer = STREAM(q)->sd_freezer;
2613 if (freezer == curthread) {
2614 ASSERT(frozenstr(q));
2615 ASSERT(MUTEX_HELD(QLOCK(q)));
2616 } else
2617 mutex_enter(QLOCK(q));
2620 * Make sanity checks and if qband structure is not yet
2621 * allocated, do so.
2623 if (mcls == QPCTL) {
2624 if (bp->b_band != 0)
2625 bp->b_band = 0; /* force to be correct */
2626 } else if (bp->b_band != 0) {
2627 int i;
2628 qband_t **qbpp;
2630 if (bp->b_band > q->q_nband) {
2633 * The qband structure for this priority band is
2634 * not on the queue yet, so we have to allocate
2635 * one on the fly. It would be wasteful to
2636 * associate the qband structures with every
2637 * queue when the queues are allocated. This is
2638 * because most queues will only need the normal
2639 * band of flow which can be described entirely
2640 * by the queue itself.
2642 qbpp = &q->q_bandp;
2643 while (*qbpp)
2644 qbpp = &(*qbpp)->qb_next;
2645 while (bp->b_band > q->q_nband) {
2646 if ((*qbpp = allocband()) == NULL) {
2647 if (freezer != curthread)
2648 mutex_exit(QLOCK(q));
2649 return (0);
2651 (*qbpp)->qb_hiwat = q->q_hiwat;
2652 (*qbpp)->qb_lowat = q->q_lowat;
2653 q->q_nband++;
2654 qbpp = &(*qbpp)->qb_next;
2657 ASSERT(MUTEX_HELD(QLOCK(q)));
2658 qbp = q->q_bandp;
2659 i = bp->b_band;
2660 while (--i)
2661 qbp = qbp->qb_next;
2665 * If queue is empty, add the message and initialize the pointers.
2666 * Otherwise, adjust message pointers and queue pointers based on
2667 * the type of the message and where it belongs on the queue. Some
2668 * code is duplicated to minimize the number of conditionals and
2669 * hopefully minimize the amount of time this routine takes.
2671 if (!q->q_first) {
2672 bp->b_next = NULL;
2673 bp->b_prev = NULL;
2674 q->q_first = bp;
2675 q->q_last = bp;
2676 if (qbp) {
2677 qbp->qb_first = bp;
2678 qbp->qb_last = bp;
2680 } else if (!qbp) { /* bp->b_band == 0 */
2683 * If queue class of message is less than or equal to
2684 * that of the last one on the queue, tack on to the end.
2686 tmp = q->q_last;
2687 if (mcls <= (int)queclass(tmp)) {
2688 bp->b_next = NULL;
2689 bp->b_prev = tmp;
2690 tmp->b_next = bp;
2691 q->q_last = bp;
2692 } else {
2693 tmp = q->q_first;
2694 while ((int)queclass(tmp) >= mcls)
2695 tmp = tmp->b_next;
2698 * Insert bp before tmp.
2700 bp->b_next = tmp;
2701 bp->b_prev = tmp->b_prev;
2702 if (tmp->b_prev)
2703 tmp->b_prev->b_next = bp;
2704 else
2705 q->q_first = bp;
2706 tmp->b_prev = bp;
2708 } else { /* bp->b_band != 0 */
2709 if (qbp->qb_first) {
2710 tmp = qbp->qb_last;
2713 * Insert bp after the last message in this band.
2715 bp->b_next = tmp->b_next;
2716 if (tmp->b_next)
2717 tmp->b_next->b_prev = bp;
2718 else
2719 q->q_last = bp;
2720 bp->b_prev = tmp;
2721 tmp->b_next = bp;
2722 } else {
2723 tmp = q->q_last;
2724 if ((mcls < (int)queclass(tmp)) ||
2725 (bp->b_band <= tmp->b_band)) {
2728 * Tack bp on end of queue.
2730 bp->b_next = NULL;
2731 bp->b_prev = tmp;
2732 tmp->b_next = bp;
2733 q->q_last = bp;
2734 } else {
2735 tmp = q->q_first;
2736 while (tmp->b_datap->db_type >= QPCTL)
2737 tmp = tmp->b_next;
2738 while (tmp->b_band >= bp->b_band)
2739 tmp = tmp->b_next;
2742 * Insert bp before tmp.
2744 bp->b_next = tmp;
2745 bp->b_prev = tmp->b_prev;
2746 if (tmp->b_prev)
2747 tmp->b_prev->b_next = bp;
2748 else
2749 q->q_first = bp;
2750 tmp->b_prev = bp;
2752 qbp->qb_first = bp;
2754 qbp->qb_last = bp;
2757 /* Get message byte count for q_count accounting */
2758 bytecnt = mp_cont_len(bp, &mblkcnt);
2760 if (qbp) {
2761 qbp->qb_count += bytecnt;
2762 qbp->qb_mblkcnt += mblkcnt;
2763 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2764 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2765 qbp->qb_flag |= QB_FULL;
2767 } else {
2768 q->q_count += bytecnt;
2769 q->q_mblkcnt += mblkcnt;
2770 if ((q->q_count >= q->q_hiwat) ||
2771 (q->q_mblkcnt >= q->q_hiwat)) {
2772 q->q_flag |= QFULL;
2776 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2778 if ((mcls > QNORM) ||
2779 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2780 qenable_locked(q);
2781 ASSERT(MUTEX_HELD(QLOCK(q)));
2782 if (freezer != curthread)
2783 mutex_exit(QLOCK(q));
2785 return (1);
2789 * Put stuff back at beginning of Q according to priority order.
2790 * See comment on putq above for details.
2793 putbq(queue_t *q, mblk_t *bp)
2795 mblk_t *tmp;
2796 qband_t *qbp = NULL;
2797 int mcls = (int)queclass(bp);
2798 kthread_id_t freezer;
2799 int bytecnt = 0, mblkcnt = 0;
2801 ASSERT(q && bp);
2802 ASSERT(bp->b_next == NULL);
2803 freezer = STREAM(q)->sd_freezer;
2804 if (freezer == curthread) {
2805 ASSERT(frozenstr(q));
2806 ASSERT(MUTEX_HELD(QLOCK(q)));
2807 } else
2808 mutex_enter(QLOCK(q));
2811 * Make sanity checks and if qband structure is not yet
2812 * allocated, do so.
2814 if (mcls == QPCTL) {
2815 if (bp->b_band != 0)
2816 bp->b_band = 0; /* force to be correct */
2817 } else if (bp->b_band != 0) {
2818 int i;
2819 qband_t **qbpp;
2821 if (bp->b_band > q->q_nband) {
2822 qbpp = &q->q_bandp;
2823 while (*qbpp)
2824 qbpp = &(*qbpp)->qb_next;
2825 while (bp->b_band > q->q_nband) {
2826 if ((*qbpp = allocband()) == NULL) {
2827 if (freezer != curthread)
2828 mutex_exit(QLOCK(q));
2829 return (0);
2831 (*qbpp)->qb_hiwat = q->q_hiwat;
2832 (*qbpp)->qb_lowat = q->q_lowat;
2833 q->q_nband++;
2834 qbpp = &(*qbpp)->qb_next;
2837 qbp = q->q_bandp;
2838 i = bp->b_band;
2839 while (--i)
2840 qbp = qbp->qb_next;
2844 * If queue is empty or if message is high priority,
2845 * place on the front of the queue.
2847 tmp = q->q_first;
2848 if ((!tmp) || (mcls == QPCTL)) {
2849 bp->b_next = tmp;
2850 if (tmp)
2851 tmp->b_prev = bp;
2852 else
2853 q->q_last = bp;
2854 q->q_first = bp;
2855 bp->b_prev = NULL;
2856 if (qbp) {
2857 qbp->qb_first = bp;
2858 qbp->qb_last = bp;
2860 } else if (qbp) { /* bp->b_band != 0 */
2861 tmp = qbp->qb_first;
2862 if (tmp) {
2865 * Insert bp before the first message in this band.
2867 bp->b_next = tmp;
2868 bp->b_prev = tmp->b_prev;
2869 if (tmp->b_prev)
2870 tmp->b_prev->b_next = bp;
2871 else
2872 q->q_first = bp;
2873 tmp->b_prev = bp;
2874 } else {
2875 tmp = q->q_last;
2876 if ((mcls < (int)queclass(tmp)) ||
2877 (bp->b_band < tmp->b_band)) {
2880 * Tack bp on end of queue.
2882 bp->b_next = NULL;
2883 bp->b_prev = tmp;
2884 tmp->b_next = bp;
2885 q->q_last = bp;
2886 } else {
2887 tmp = q->q_first;
2888 while (tmp->b_datap->db_type >= QPCTL)
2889 tmp = tmp->b_next;
2890 while (tmp->b_band > bp->b_band)
2891 tmp = tmp->b_next;
2894 * Insert bp before tmp.
2896 bp->b_next = tmp;
2897 bp->b_prev = tmp->b_prev;
2898 if (tmp->b_prev)
2899 tmp->b_prev->b_next = bp;
2900 else
2901 q->q_first = bp;
2902 tmp->b_prev = bp;
2904 qbp->qb_last = bp;
2906 qbp->qb_first = bp;
2907 } else { /* bp->b_band == 0 && !QPCTL */
2910 * If the queue class or band is less than that of the last
2911 * message on the queue, tack bp on the end of the queue.
2913 tmp = q->q_last;
2914 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2915 bp->b_next = NULL;
2916 bp->b_prev = tmp;
2917 tmp->b_next = bp;
2918 q->q_last = bp;
2919 } else {
2920 tmp = q->q_first;
2921 while (tmp->b_datap->db_type >= QPCTL)
2922 tmp = tmp->b_next;
2923 while (tmp->b_band > bp->b_band)
2924 tmp = tmp->b_next;
2927 * Insert bp before tmp.
2929 bp->b_next = tmp;
2930 bp->b_prev = tmp->b_prev;
2931 if (tmp->b_prev)
2932 tmp->b_prev->b_next = bp;
2933 else
2934 q->q_first = bp;
2935 tmp->b_prev = bp;
2939 /* Get message byte count for q_count accounting */
2940 bytecnt = mp_cont_len(bp, &mblkcnt);
2942 if (qbp) {
2943 qbp->qb_count += bytecnt;
2944 qbp->qb_mblkcnt += mblkcnt;
2945 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2946 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2947 qbp->qb_flag |= QB_FULL;
2949 } else {
2950 q->q_count += bytecnt;
2951 q->q_mblkcnt += mblkcnt;
2952 if ((q->q_count >= q->q_hiwat) ||
2953 (q->q_mblkcnt >= q->q_hiwat)) {
2954 q->q_flag |= QFULL;
2958 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2960 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2961 qenable_locked(q);
2962 ASSERT(MUTEX_HELD(QLOCK(q)));
2963 if (freezer != curthread)
2964 mutex_exit(QLOCK(q));
2966 return (1);
2970 * Insert a message before an existing message on the queue. If the
2971 * existing message is NULL, the new messages is placed on the end of
2972 * the queue. The queue class of the new message is ignored. However,
2973 * the priority band of the new message must adhere to the following
2974 * ordering:
2976 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2978 * All flow control parameters are updated.
2980 * insq can be called with the stream frozen, but other utility functions
2981 * holding QLOCK, and by streams modules without any locks/frozen.
2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2986 mblk_t *tmp;
2987 qband_t *qbp = NULL;
2988 int mcls = (int)queclass(mp);
2989 kthread_id_t freezer;
2990 int bytecnt = 0, mblkcnt = 0;
2992 freezer = STREAM(q)->sd_freezer;
2993 if (freezer == curthread) {
2994 ASSERT(frozenstr(q));
2995 ASSERT(MUTEX_HELD(QLOCK(q)));
2996 } else if (MUTEX_HELD(QLOCK(q))) {
2997 /* Don't drop lock on exit */
2998 freezer = curthread;
2999 } else
3000 mutex_enter(QLOCK(q));
3002 if (mcls == QPCTL) {
3003 if (mp->b_band != 0)
3004 mp->b_band = 0; /* force to be correct */
3005 if (emp && emp->b_prev &&
3006 (emp->b_prev->b_datap->db_type < QPCTL))
3007 goto badord;
3009 if (emp) {
3010 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3011 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3012 (emp->b_prev->b_band < mp->b_band))) {
3013 goto badord;
3015 } else {
3016 tmp = q->q_last;
3017 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3018 badord:
3019 cmn_err(CE_WARN,
3020 "insq: attempt to insert message out of order "
3021 "on q %p", (void *)q);
3022 if (freezer != curthread)
3023 mutex_exit(QLOCK(q));
3024 return (0);
3028 if (mp->b_band != 0) {
3029 int i;
3030 qband_t **qbpp;
3032 if (mp->b_band > q->q_nband) {
3033 qbpp = &q->q_bandp;
3034 while (*qbpp)
3035 qbpp = &(*qbpp)->qb_next;
3036 while (mp->b_band > q->q_nband) {
3037 if ((*qbpp = allocband()) == NULL) {
3038 if (freezer != curthread)
3039 mutex_exit(QLOCK(q));
3040 return (0);
3042 (*qbpp)->qb_hiwat = q->q_hiwat;
3043 (*qbpp)->qb_lowat = q->q_lowat;
3044 q->q_nband++;
3045 qbpp = &(*qbpp)->qb_next;
3048 qbp = q->q_bandp;
3049 i = mp->b_band;
3050 while (--i)
3051 qbp = qbp->qb_next;
3054 if ((mp->b_next = emp) != NULL) {
3055 if ((mp->b_prev = emp->b_prev) != NULL)
3056 emp->b_prev->b_next = mp;
3057 else
3058 q->q_first = mp;
3059 emp->b_prev = mp;
3060 } else {
3061 if ((mp->b_prev = q->q_last) != NULL)
3062 q->q_last->b_next = mp;
3063 else
3064 q->q_first = mp;
3065 q->q_last = mp;
3068 /* Get mblk and byte count for q_count accounting */
3069 bytecnt = mp_cont_len(mp, &mblkcnt);
3071 if (qbp) { /* adjust qband pointers and count */
3072 if (!qbp->qb_first) {
3073 qbp->qb_first = mp;
3074 qbp->qb_last = mp;
3075 } else {
3076 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3077 (mp->b_prev->b_band != mp->b_band)))
3078 qbp->qb_first = mp;
3079 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3080 (mp->b_next->b_band != mp->b_band)))
3081 qbp->qb_last = mp;
3083 qbp->qb_count += bytecnt;
3084 qbp->qb_mblkcnt += mblkcnt;
3085 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3086 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3087 qbp->qb_flag |= QB_FULL;
3089 } else {
3090 q->q_count += bytecnt;
3091 q->q_mblkcnt += mblkcnt;
3092 if ((q->q_count >= q->q_hiwat) ||
3093 (q->q_mblkcnt >= q->q_hiwat)) {
3094 q->q_flag |= QFULL;
3098 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3100 if (canenable(q) && (q->q_flag & QWANTR))
3101 qenable_locked(q);
3103 ASSERT(MUTEX_HELD(QLOCK(q)));
3104 if (freezer != curthread)
3105 mutex_exit(QLOCK(q));
3107 return (1);
3111 * Create and put a control message on queue.
3114 putctl(queue_t *q, int type)
3116 mblk_t *bp;
3118 if ((datamsg(type) && (type != M_DELAY)) ||
3119 (bp = allocb_tryhard(0)) == NULL)
3120 return (0);
3121 bp->b_datap->db_type = (unsigned char) type;
3123 put(q, bp);
3125 return (1);
3129 * Control message with a single-byte parameter
3132 putctl1(queue_t *q, int type, int param)
3134 mblk_t *bp;
3136 if ((datamsg(type) && (type != M_DELAY)) ||
3137 (bp = allocb_tryhard(1)) == NULL)
3138 return (0);
3139 bp->b_datap->db_type = (unsigned char)type;
3140 *bp->b_wptr++ = (unsigned char)param;
3142 put(q, bp);
3144 return (1);
3148 putnextctl1(queue_t *q, int type, int param)
3150 mblk_t *bp;
3152 if ((datamsg(type) && (type != M_DELAY)) ||
3153 ((bp = allocb_tryhard(1)) == NULL))
3154 return (0);
3156 bp->b_datap->db_type = (unsigned char)type;
3157 *bp->b_wptr++ = (unsigned char)param;
3159 putnext(q, bp);
3161 return (1);
3165 putnextctl(queue_t *q, int type)
3167 mblk_t *bp;
3169 if ((datamsg(type) && (type != M_DELAY)) ||
3170 ((bp = allocb_tryhard(0)) == NULL))
3171 return (0);
3172 bp->b_datap->db_type = (unsigned char)type;
3174 putnext(q, bp);
3176 return (1);
3180 * Return the queue upstream from this one
3182 queue_t *
3183 backq(queue_t *q)
3185 q = _OTHERQ(q);
3186 if (q->q_next) {
3187 q = q->q_next;
3188 return (_OTHERQ(q));
3190 return (NULL);
3194 * Send a block back up the queue in reverse from this
3195 * one (e.g. to respond to ioctls)
3197 void
3198 qreply(queue_t *q, mblk_t *bp)
3200 ASSERT(q && bp);
3202 putnext(_OTHERQ(q), bp);
3206 * Streams Queue Scheduling
3208 * Queues are enabled through qenable() when they have messages to
3209 * process. They are serviced by queuerun(), which runs each enabled
3210 * queue's service procedure. The call to queuerun() is processor
3211 * dependent - the general principle is that it be run whenever a queue
3212 * is enabled but before returning to user level. For system calls,
3213 * the function runqueues() is called if their action causes a queue
3214 * to be enabled. For device interrupts, queuerun() should be
3215 * called before returning from the last level of interrupt. Beyond
3216 * this, no timing assumptions should be made about queue scheduling.
3220 * Enable a queue: put it on list of those whose service procedures are
3221 * ready to run and set up the scheduling mechanism.
3222 * The broadcast is done outside the mutex -> to avoid the woken thread
3223 * from contending with the mutex. This is OK 'cos the queue has been
3224 * enqueued on the runlist and flagged safely at this point.
3226 void
3227 qenable(queue_t *q)
3229 mutex_enter(QLOCK(q));
3230 qenable_locked(q);
3231 mutex_exit(QLOCK(q));
3234 * Return number of messages on queue
3237 qsize(queue_t *qp)
3239 int count = 0;
3240 mblk_t *mp;
3242 mutex_enter(QLOCK(qp));
3243 for (mp = qp->q_first; mp; mp = mp->b_next)
3244 count++;
3245 mutex_exit(QLOCK(qp));
3246 return (count);
3250 * noenable - set queue so that putq() will not enable it.
3251 * enableok - set queue so that putq() can enable it.
3253 void
3254 noenable(queue_t *q)
3256 mutex_enter(QLOCK(q));
3257 q->q_flag |= QNOENB;
3258 mutex_exit(QLOCK(q));
3261 void
3262 enableok(queue_t *q)
3264 mutex_enter(QLOCK(q));
3265 q->q_flag &= ~QNOENB;
3266 mutex_exit(QLOCK(q));
3270 * Set queue fields.
3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3275 qband_t *qbp = NULL;
3276 queue_t *wrq;
3277 int error = 0;
3278 kthread_id_t freezer;
3280 freezer = STREAM(q)->sd_freezer;
3281 if (freezer == curthread) {
3282 ASSERT(frozenstr(q));
3283 ASSERT(MUTEX_HELD(QLOCK(q)));
3284 } else
3285 mutex_enter(QLOCK(q));
3287 if (what >= QBAD) {
3288 error = EINVAL;
3289 goto done;
3291 if (pri != 0) {
3292 int i;
3293 qband_t **qbpp;
3295 if (pri > q->q_nband) {
3296 qbpp = &q->q_bandp;
3297 while (*qbpp)
3298 qbpp = &(*qbpp)->qb_next;
3299 while (pri > q->q_nband) {
3300 if ((*qbpp = allocband()) == NULL) {
3301 error = EAGAIN;
3302 goto done;
3304 (*qbpp)->qb_hiwat = q->q_hiwat;
3305 (*qbpp)->qb_lowat = q->q_lowat;
3306 q->q_nband++;
3307 qbpp = &(*qbpp)->qb_next;
3310 qbp = q->q_bandp;
3311 i = pri;
3312 while (--i)
3313 qbp = qbp->qb_next;
3315 switch (what) {
3317 case QHIWAT:
3318 if (qbp)
3319 qbp->qb_hiwat = (size_t)val;
3320 else
3321 q->q_hiwat = (size_t)val;
3322 break;
3324 case QLOWAT:
3325 if (qbp)
3326 qbp->qb_lowat = (size_t)val;
3327 else
3328 q->q_lowat = (size_t)val;
3329 break;
3331 case QMAXPSZ:
3332 if (qbp)
3333 error = EINVAL;
3334 else
3335 q->q_maxpsz = (ssize_t)val;
3338 * Performance concern, strwrite looks at the module below
3339 * the stream head for the maxpsz each time it does a write
3340 * we now cache it at the stream head. Check to see if this
3341 * queue is sitting directly below the stream head.
3343 wrq = STREAM(q)->sd_wrq;
3344 if (q != wrq->q_next)
3345 break;
3348 * If the stream is not frozen drop the current QLOCK and
3349 * acquire the sd_wrq QLOCK which protects sd_qn_*
3351 if (freezer != curthread) {
3352 mutex_exit(QLOCK(q));
3353 mutex_enter(QLOCK(wrq));
3355 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3357 if (strmsgsz != 0) {
3358 if (val == INFPSZ)
3359 val = strmsgsz;
3360 else {
3361 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3362 val = MIN(PIPE_BUF, val);
3363 else
3364 val = MIN(strmsgsz, val);
3367 STREAM(q)->sd_qn_maxpsz = val;
3368 if (freezer != curthread) {
3369 mutex_exit(QLOCK(wrq));
3370 mutex_enter(QLOCK(q));
3372 break;
3374 case QMINPSZ:
3375 if (qbp)
3376 error = EINVAL;
3377 else
3378 q->q_minpsz = (ssize_t)val;
3381 * Performance concern, strwrite looks at the module below
3382 * the stream head for the maxpsz each time it does a write
3383 * we now cache it at the stream head. Check to see if this
3384 * queue is sitting directly below the stream head.
3386 wrq = STREAM(q)->sd_wrq;
3387 if (q != wrq->q_next)
3388 break;
3391 * If the stream is not frozen drop the current QLOCK and
3392 * acquire the sd_wrq QLOCK which protects sd_qn_*
3394 if (freezer != curthread) {
3395 mutex_exit(QLOCK(q));
3396 mutex_enter(QLOCK(wrq));
3398 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3400 if (freezer != curthread) {
3401 mutex_exit(QLOCK(wrq));
3402 mutex_enter(QLOCK(q));
3404 break;
3406 case QSTRUIOT:
3407 if (qbp)
3408 error = EINVAL;
3409 else
3410 q->q_struiot = (ushort_t)val;
3411 break;
3413 case QCOUNT:
3414 case QFIRST:
3415 case QLAST:
3416 case QFLAG:
3417 error = EPERM;
3418 break;
3420 default:
3421 error = EINVAL;
3422 break;
3424 done:
3425 if (freezer != curthread)
3426 mutex_exit(QLOCK(q));
3427 return (error);
3431 * Get queue fields.
3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3436 qband_t *qbp = NULL;
3437 int error = 0;
3438 kthread_id_t freezer;
3440 freezer = STREAM(q)->sd_freezer;
3441 if (freezer == curthread) {
3442 ASSERT(frozenstr(q));
3443 ASSERT(MUTEX_HELD(QLOCK(q)));
3444 } else
3445 mutex_enter(QLOCK(q));
3446 if (what >= QBAD) {
3447 error = EINVAL;
3448 goto done;
3450 if (pri != 0) {
3451 int i;
3452 qband_t **qbpp;
3454 if (pri > q->q_nband) {
3455 qbpp = &q->q_bandp;
3456 while (*qbpp)
3457 qbpp = &(*qbpp)->qb_next;
3458 while (pri > q->q_nband) {
3459 if ((*qbpp = allocband()) == NULL) {
3460 error = EAGAIN;
3461 goto done;
3463 (*qbpp)->qb_hiwat = q->q_hiwat;
3464 (*qbpp)->qb_lowat = q->q_lowat;
3465 q->q_nband++;
3466 qbpp = &(*qbpp)->qb_next;
3469 qbp = q->q_bandp;
3470 i = pri;
3471 while (--i)
3472 qbp = qbp->qb_next;
3474 switch (what) {
3475 case QHIWAT:
3476 if (qbp)
3477 *(size_t *)valp = qbp->qb_hiwat;
3478 else
3479 *(size_t *)valp = q->q_hiwat;
3480 break;
3482 case QLOWAT:
3483 if (qbp)
3484 *(size_t *)valp = qbp->qb_lowat;
3485 else
3486 *(size_t *)valp = q->q_lowat;
3487 break;
3489 case QMAXPSZ:
3490 if (qbp)
3491 error = EINVAL;
3492 else
3493 *(ssize_t *)valp = q->q_maxpsz;
3494 break;
3496 case QMINPSZ:
3497 if (qbp)
3498 error = EINVAL;
3499 else
3500 *(ssize_t *)valp = q->q_minpsz;
3501 break;
3503 case QCOUNT:
3504 if (qbp)
3505 *(size_t *)valp = qbp->qb_count;
3506 else
3507 *(size_t *)valp = q->q_count;
3508 break;
3510 case QFIRST:
3511 if (qbp)
3512 *(mblk_t **)valp = qbp->qb_first;
3513 else
3514 *(mblk_t **)valp = q->q_first;
3515 break;
3517 case QLAST:
3518 if (qbp)
3519 *(mblk_t **)valp = qbp->qb_last;
3520 else
3521 *(mblk_t **)valp = q->q_last;
3522 break;
3524 case QFLAG:
3525 if (qbp)
3526 *(uint_t *)valp = qbp->qb_flag;
3527 else
3528 *(uint_t *)valp = q->q_flag;
3529 break;
3531 case QSTRUIOT:
3532 if (qbp)
3533 error = EINVAL;
3534 else
3535 *(short *)valp = q->q_struiot;
3536 break;
3538 default:
3539 error = EINVAL;
3540 break;
3542 done:
3543 if (freezer != curthread)
3544 mutex_exit(QLOCK(q));
3545 return (error);
3549 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3550 * QWANTWSYNC or QWANTR or QWANTW,
3552 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3553 * deferred wakeup will be done. Also if strpoll() in progress then a
3554 * deferred pollwakeup will be done.
3556 void
3557 strwakeq(queue_t *q, int flag)
3559 stdata_t *stp = STREAM(q);
3560 pollhead_t *pl;
3562 mutex_enter(&stp->sd_lock);
3563 pl = &stp->sd_pollist;
3564 if (flag & QWANTWSYNC) {
3565 ASSERT(!(q->q_flag & QREADR));
3566 if (stp->sd_flag & WSLEEP) {
3567 stp->sd_flag &= ~WSLEEP;
3568 cv_broadcast(&stp->sd_wrq->q_wait);
3569 } else {
3570 stp->sd_wakeq |= WSLEEP;
3573 mutex_exit(&stp->sd_lock);
3574 pollwakeup(pl, POLLWRNORM);
3575 mutex_enter(&stp->sd_lock);
3577 if (stp->sd_sigflags & S_WRNORM)
3578 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3579 } else if (flag & QWANTR) {
3580 if (stp->sd_flag & RSLEEP) {
3581 stp->sd_flag &= ~RSLEEP;
3582 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3583 } else {
3584 stp->sd_wakeq |= RSLEEP;
3587 mutex_exit(&stp->sd_lock);
3588 pollwakeup(pl, POLLIN | POLLRDNORM);
3589 mutex_enter(&stp->sd_lock);
3592 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3594 if (events)
3595 strsendsig(stp->sd_siglist, events, 0, 0);
3597 } else {
3598 if (stp->sd_flag & WSLEEP) {
3599 stp->sd_flag &= ~WSLEEP;
3600 cv_broadcast(&stp->sd_wrq->q_wait);
3603 mutex_exit(&stp->sd_lock);
3604 pollwakeup(pl, POLLWRNORM);
3605 mutex_enter(&stp->sd_lock);
3607 if (stp->sd_sigflags & S_WRNORM)
3608 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3610 mutex_exit(&stp->sd_lock);
3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3616 stdata_t *stp = STREAM(q);
3617 int typ = STRUIOT_STANDARD;
3618 uio_t *uiop = &dp->d_uio;
3619 dblk_t *dbp;
3620 ssize_t uiocnt;
3621 ssize_t cnt;
3622 unsigned char *ptr;
3623 ssize_t resid;
3624 int error = 0;
3625 on_trap_data_t otd;
3626 queue_t *stwrq;
3629 * Plumbing may change while taking the type so store the
3630 * queue in a temporary variable. It doesn't matter even
3631 * if the we take the type from the previous plumbing,
3632 * that's because if the plumbing has changed when we were
3633 * holding the queue in a temporary variable, we can continue
3634 * processing the message the way it would have been processed
3635 * in the old plumbing, without any side effects but a bit
3636 * extra processing for partial ip header checksum.
3638 * This has been done to avoid holding the sd_lock which is
3639 * very hot.
3642 stwrq = stp->sd_struiowrq;
3643 if (stwrq)
3644 typ = stwrq->q_struiot;
3646 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3647 dbp = mp->b_datap;
3648 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3649 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3650 cnt = MIN(uiocnt, uiop->uio_resid);
3651 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3652 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3654 * Either this mblk has already been processed
3655 * or there is no more room in this mblk (?).
3657 continue;
3659 switch (typ) {
3660 case STRUIOT_STANDARD:
3661 if (noblock) {
3662 if (on_trap(&otd, OT_DATA_ACCESS)) {
3663 no_trap();
3664 error = EWOULDBLOCK;
3665 goto out;
3668 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3669 if (noblock)
3670 no_trap();
3671 goto out;
3673 if (noblock)
3674 no_trap();
3675 break;
3677 default:
3678 error = EIO;
3679 goto out;
3681 dbp->db_struioflag |= STRUIO_DONE;
3682 dbp->db_cksumstuff += cnt;
3684 out:
3685 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3687 * A fault has occured and some bytes were moved to the
3688 * current mblk, the uio_t has already been updated by
3689 * the appropriate uio routine, so also update the mblk
3690 * to reflect this in case this same mblk chain is used
3691 * again (after the fault has been handled).
3693 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3694 if (uiocnt >= resid)
3695 dbp->db_cksumstuff += resid;
3697 return (error);
3701 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3702 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3703 * that removeq() will not try to close the queue while a thread is inside the
3704 * queue.
3706 static boolean_t
3707 rwnext_enter(queue_t *qp)
3709 mutex_enter(QLOCK(qp));
3710 if (qp->q_flag & QWCLOSE) {
3711 mutex_exit(QLOCK(qp));
3712 return (B_FALSE);
3714 qp->q_rwcnt++;
3715 ASSERT(qp->q_rwcnt != 0);
3716 mutex_exit(QLOCK(qp));
3717 return (B_TRUE);
3721 * Decrease the count of threads running in sync stream queue and wake up any
3722 * threads blocked in removeq().
3724 static void
3725 rwnext_exit(queue_t *qp)
3727 mutex_enter(QLOCK(qp));
3728 qp->q_rwcnt--;
3729 if (qp->q_flag & QWANTRMQSYNC) {
3730 qp->q_flag &= ~QWANTRMQSYNC;
3731 cv_broadcast(&qp->q_wait);
3733 mutex_exit(QLOCK(qp));
3737 * The purpose of rwnext() is to call the rw procedure of the next
3738 * (downstream) modules queue.
3740 * treated as put entrypoint for perimeter syncronization.
3742 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3743 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3744 * not matter if any regular put entrypoints have been already entered. We
3745 * can't increment one of the sq_putcounts (instead of sq_count) because
3746 * qwait_rw won't know which counter to decrement.
3748 * It would be reasonable to add the lockless FASTPUT logic.
3751 rwnext(queue_t *qp, struiod_t *dp)
3753 queue_t *nqp;
3754 syncq_t *sq;
3755 uint16_t count;
3756 uint16_t flags;
3757 struct qinit *qi;
3758 int (*proc)();
3759 struct stdata *stp;
3760 int isread;
3761 int rval;
3763 stp = STREAM(qp);
3765 * Prevent q_next from changing by holding sd_lock until acquiring
3766 * SQLOCK. Note that a read-side rwnext from the streamhead will
3767 * already have sd_lock acquired. In either case sd_lock is always
3768 * released after acquiring SQLOCK.
3770 * The streamhead read-side holding sd_lock when calling rwnext is
3771 * required to prevent a race condition were M_DATA mblks flowing
3772 * up the read-side of the stream could be bypassed by a rwnext()
3773 * down-call. In this case sd_lock acts as the streamhead perimeter.
3775 if ((nqp = _WR(qp)) == qp) {
3776 isread = 0;
3777 mutex_enter(&stp->sd_lock);
3778 qp = nqp->q_next;
3779 } else {
3780 isread = 1;
3781 if (nqp != stp->sd_wrq)
3782 /* Not streamhead */
3783 mutex_enter(&stp->sd_lock);
3784 qp = _RD(nqp->q_next);
3786 qi = qp->q_qinfo;
3787 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3789 * Not a synchronous module or no r/w procedure for this
3790 * queue, so just return EINVAL and let the caller handle it.
3792 mutex_exit(&stp->sd_lock);
3793 return (EINVAL);
3796 if (rwnext_enter(qp) == B_FALSE) {
3797 mutex_exit(&stp->sd_lock);
3798 return (EINVAL);
3801 sq = qp->q_syncq;
3802 mutex_enter(SQLOCK(sq));
3803 mutex_exit(&stp->sd_lock);
3804 count = sq->sq_count;
3805 flags = sq->sq_flags;
3806 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3808 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3810 * if this queue is being closed, return.
3812 if (qp->q_flag & QWCLOSE) {
3813 mutex_exit(SQLOCK(sq));
3814 rwnext_exit(qp);
3815 return (EINVAL);
3819 * Wait until we can enter the inner perimeter.
3821 sq->sq_flags = flags | SQ_WANTWAKEUP;
3822 cv_wait(&sq->sq_wait, SQLOCK(sq));
3823 count = sq->sq_count;
3824 flags = sq->sq_flags;
3827 if (isread == 0 && stp->sd_struiowrq == NULL ||
3828 isread == 1 && stp->sd_struiordq == NULL) {
3830 * Stream plumbing changed while waiting for inner perimeter
3831 * so just return EINVAL and let the caller handle it.
3833 mutex_exit(SQLOCK(sq));
3834 rwnext_exit(qp);
3835 return (EINVAL);
3837 if (!(flags & SQ_CIPUT))
3838 sq->sq_flags = flags | SQ_EXCL;
3839 sq->sq_count = count + 1;
3840 ASSERT(sq->sq_count != 0); /* Wraparound */
3842 * Note: The only message ordering guarantee that rwnext() makes is
3843 * for the write queue flow-control case. All others (r/w queue
3844 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3845 * the queue's rw procedure. This could be genralized here buy
3846 * running the queue's service procedure, but that wouldn't be
3847 * the most efficent for all cases.
3849 mutex_exit(SQLOCK(sq));
3850 if (! isread && (qp->q_flag & QFULL)) {
3852 * Write queue may be flow controlled. If so,
3853 * mark the queue for wakeup when it's not.
3855 mutex_enter(QLOCK(qp));
3856 if (qp->q_flag & QFULL) {
3857 qp->q_flag |= QWANTWSYNC;
3858 mutex_exit(QLOCK(qp));
3859 rval = EWOULDBLOCK;
3860 goto out;
3862 mutex_exit(QLOCK(qp));
3865 if (! isread && dp->d_mp)
3866 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3867 dp->d_mp->b_datap->db_base);
3869 rval = (*proc)(qp, dp);
3871 if (isread && dp->d_mp)
3872 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3873 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3874 out:
3876 * The queue is protected from being freed by sq_count, so it is
3877 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3879 rwnext_exit(qp);
3881 mutex_enter(SQLOCK(sq));
3882 flags = sq->sq_flags;
3883 ASSERT(sq->sq_count != 0);
3884 sq->sq_count--;
3885 if (flags & SQ_TAIL) {
3886 putnext_tail(sq, qp, flags);
3888 * The only purpose of this ASSERT is to preserve calling stack
3889 * in DEBUG kernel.
3891 ASSERT(flags & SQ_TAIL);
3892 return (rval);
3894 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3896 * Safe to always drop SQ_EXCL:
3897 * Not SQ_CIPUT means we set SQ_EXCL above
3898 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3899 * did a qwriter(INNER) in which case nobody else
3900 * is in the inner perimeter and we are exiting.
3902 * I would like to make the following assertion:
3904 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3905 * sq->sq_count == 0);
3907 * which indicates that if we are both putshared and exclusive,
3908 * we became exclusive while executing the putproc, and the only
3909 * claim on the syncq was the one we dropped a few lines above.
3910 * But other threads that enter putnext while the syncq is exclusive
3911 * need to make a claim as they may need to drop SQLOCK in the
3912 * has_writers case to avoid deadlocks. If these threads are
3913 * delayed or preempted, it is possible that the writer thread can
3914 * find out that there are other claims making the (sq_count == 0)
3915 * test invalid.
3918 sq->sq_flags = flags & ~SQ_EXCL;
3919 if (sq->sq_flags & SQ_WANTWAKEUP) {
3920 sq->sq_flags &= ~SQ_WANTWAKEUP;
3921 cv_broadcast(&sq->sq_wait);
3923 mutex_exit(SQLOCK(sq));
3924 return (rval);
3928 * The purpose of infonext() is to call the info procedure of the next
3929 * (downstream) modules queue.
3931 * treated as put entrypoint for perimeter syncronization.
3933 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3934 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3935 * it does not matter if any regular put entrypoints have been already
3936 * entered.
3939 infonext(queue_t *qp, infod_t *idp)
3941 queue_t *nqp;
3942 syncq_t *sq;
3943 uint16_t count;
3944 uint16_t flags;
3945 struct qinit *qi;
3946 int (*proc)();
3947 struct stdata *stp;
3948 int rval;
3950 stp = STREAM(qp);
3952 * Prevent q_next from changing by holding sd_lock until
3953 * acquiring SQLOCK.
3955 mutex_enter(&stp->sd_lock);
3956 if ((nqp = _WR(qp)) == qp) {
3957 qp = nqp->q_next;
3958 } else {
3959 qp = _RD(nqp->q_next);
3961 qi = qp->q_qinfo;
3962 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3963 mutex_exit(&stp->sd_lock);
3964 return (EINVAL);
3966 sq = qp->q_syncq;
3967 mutex_enter(SQLOCK(sq));
3968 mutex_exit(&stp->sd_lock);
3969 count = sq->sq_count;
3970 flags = sq->sq_flags;
3971 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3973 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3975 * Wait until we can enter the inner perimeter.
3977 sq->sq_flags = flags | SQ_WANTWAKEUP;
3978 cv_wait(&sq->sq_wait, SQLOCK(sq));
3979 count = sq->sq_count;
3980 flags = sq->sq_flags;
3983 if (! (flags & SQ_CIPUT))
3984 sq->sq_flags = flags | SQ_EXCL;
3985 sq->sq_count = count + 1;
3986 ASSERT(sq->sq_count != 0); /* Wraparound */
3987 mutex_exit(SQLOCK(sq));
3989 rval = (*proc)(qp, idp);
3991 mutex_enter(SQLOCK(sq));
3992 flags = sq->sq_flags;
3993 ASSERT(sq->sq_count != 0);
3994 sq->sq_count--;
3995 if (flags & SQ_TAIL) {
3996 putnext_tail(sq, qp, flags);
3998 * The only purpose of this ASSERT is to preserve calling stack
3999 * in DEBUG kernel.
4001 ASSERT(flags & SQ_TAIL);
4002 return (rval);
4004 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4006 * XXXX
4007 * I am not certain the next comment is correct here. I need to consider
4008 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4009 * might cause other problems. It just might be safer to drop it if
4010 * !SQ_CIPUT because that is when we set it.
4013 * Safe to always drop SQ_EXCL:
4014 * Not SQ_CIPUT means we set SQ_EXCL above
4015 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4016 * did a qwriter(INNER) in which case nobody else
4017 * is in the inner perimeter and we are exiting.
4019 * I would like to make the following assertion:
4021 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4022 * sq->sq_count == 0);
4024 * which indicates that if we are both putshared and exclusive,
4025 * we became exclusive while executing the putproc, and the only
4026 * claim on the syncq was the one we dropped a few lines above.
4027 * But other threads that enter putnext while the syncq is exclusive
4028 * need to make a claim as they may need to drop SQLOCK in the
4029 * has_writers case to avoid deadlocks. If these threads are
4030 * delayed or preempted, it is possible that the writer thread can
4031 * find out that there are other claims making the (sq_count == 0)
4032 * test invalid.
4035 sq->sq_flags = flags & ~SQ_EXCL;
4036 mutex_exit(SQLOCK(sq));
4037 return (rval);
4041 * Return nonzero if the queue is responsible for struio(), else return 0.
4044 isuioq(queue_t *q)
4046 if (q->q_flag & QREADR)
4047 return (STREAM(q)->sd_struiordq == q);
4048 else
4049 return (STREAM(q)->sd_struiowrq == q);
4052 #if defined(__sparc)
4053 int disable_putlocks = 0;
4054 #else
4055 int disable_putlocks = 1;
4056 #endif
4059 * called by create_putlock.
4061 static void
4062 create_syncq_putlocks(queue_t *q)
4064 syncq_t *sq = q->q_syncq;
4065 ciputctrl_t *cip;
4066 int i;
4068 ASSERT(sq != NULL);
4070 ASSERT(disable_putlocks == 0);
4071 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4072 ASSERT(ciputctrl_cache != NULL);
4074 if (!(sq->sq_type & SQ_CIPUT))
4075 return;
4077 for (i = 0; i <= 1; i++) {
4078 if (sq->sq_ciputctrl == NULL) {
4079 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4080 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4081 mutex_enter(SQLOCK(sq));
4082 if (sq->sq_ciputctrl != NULL) {
4083 mutex_exit(SQLOCK(sq));
4084 kmem_cache_free(ciputctrl_cache, cip);
4085 } else {
4086 ASSERT(sq->sq_nciputctrl == 0);
4087 sq->sq_nciputctrl = n_ciputctrl - 1;
4089 * putnext checks sq_ciputctrl without holding
4090 * SQLOCK. if it is not NULL putnext assumes
4091 * sq_nciputctrl is initialized. membar below
4092 * insures that.
4094 membar_producer();
4095 sq->sq_ciputctrl = cip;
4096 mutex_exit(SQLOCK(sq));
4099 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4100 if (i == 1)
4101 break;
4102 q = _OTHERQ(q);
4103 if (!(q->q_flag & QPERQ)) {
4104 ASSERT(sq == q->q_syncq);
4105 break;
4107 ASSERT(q->q_syncq != NULL);
4108 ASSERT(sq != q->q_syncq);
4109 sq = q->q_syncq;
4110 ASSERT(sq->sq_type & SQ_CIPUT);
4115 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4116 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4117 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4118 * starting from q and down to the driver.
4120 * This should be called after the affected queues are part of stream
4121 * geometry. It should be called from driver/module open routine after
4122 * qprocson() call. It is also called from nfs syscall where it is known that
4123 * stream is configured and won't change its geometry during create_putlock
4124 * call.
4126 * caller normally uses 0 value for the stream argument to speed up MT putnext
4127 * into the perimeter of q for example because its perimeter is per module
4128 * (e.g. IP).
4130 * caller normally uses non 0 value for the stream argument to hint the system
4131 * that the stream of q is a very contended global system stream
4132 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4133 * particularly MT hot.
4135 * Caller insures stream plumbing won't happen while we are here and therefore
4136 * q_next can be safely used.
4139 void
4140 create_putlocks(queue_t *q, int stream)
4142 ciputctrl_t *cip;
4143 struct stdata *stp = STREAM(q);
4145 q = _WR(q);
4146 ASSERT(stp != NULL);
4148 if (disable_putlocks != 0)
4149 return;
4151 if (n_ciputctrl < min_n_ciputctrl)
4152 return;
4154 ASSERT(ciputctrl_cache != NULL);
4156 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4157 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4158 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4159 mutex_enter(&stp->sd_lock);
4160 if (stp->sd_ciputctrl != NULL) {
4161 mutex_exit(&stp->sd_lock);
4162 kmem_cache_free(ciputctrl_cache, cip);
4163 } else {
4164 ASSERT(stp->sd_nciputctrl == 0);
4165 stp->sd_nciputctrl = n_ciputctrl - 1;
4167 * putnext checks sd_ciputctrl without holding
4168 * sd_lock. if it is not NULL putnext assumes
4169 * sd_nciputctrl is initialized. membar below
4170 * insures that.
4172 membar_producer();
4173 stp->sd_ciputctrl = cip;
4174 mutex_exit(&stp->sd_lock);
4178 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4180 while (_SAMESTR(q)) {
4181 create_syncq_putlocks(q);
4182 if (stream == 0)
4183 return;
4184 q = q->q_next;
4186 ASSERT(q != NULL);
4187 create_syncq_putlocks(q);
4191 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4192 * through a stream.
4194 * Data currently record per-event is a timestamp, module/driver name,
4195 * downstream module/driver name, optional callstack, event type and a per
4196 * type datum. Much of the STREAMS framework is instrumented for automatic
4197 * flow tracing (when enabled). Events can be defined and used by STREAMS
4198 * modules and drivers.
4200 * Global objects:
4202 * str_ftevent() - Add a flow-trace event to a dblk.
4203 * str_ftfree() - Free flow-trace data
4205 * Local objects:
4207 * fthdr_cache - pointer to the kmem cache for trace header.
4208 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4211 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4212 int str_ftstack = 0; /* Don't record event call stacks */
4214 void
4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4217 ftblk_t *bp = hp->tail;
4218 ftblk_t *nbp;
4219 ftevnt_t *ep;
4220 int ix, nix;
4222 ASSERT(hp != NULL);
4224 for (;;) {
4225 if ((ix = bp->ix) == FTBLK_EVNTS) {
4227 * Tail doesn't have room, so need a new tail.
4229 * To make this MT safe, first, allocate a new
4230 * ftblk, and initialize it. To make life a
4231 * little easier, reserve the first slot (mostly
4232 * by making ix = 1). When we are finished with
4233 * the initialization, CAS this pointer to the
4234 * tail. If this succeeds, this is the new
4235 * "next" block. Otherwise, another thread
4236 * got here first, so free the block and start
4237 * again.
4239 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4240 if (nbp == NULL) {
4241 /* no mem, so punt */
4242 str_ftnever++;
4243 /* free up all flow data? */
4244 return;
4246 nbp->nxt = NULL;
4247 nbp->ix = 1;
4249 * Just in case there is another thread about
4250 * to get the next index, we need to make sure
4251 * the value is there for it.
4253 membar_producer();
4254 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4255 /* CAS was successful */
4256 bp->nxt = nbp;
4257 membar_producer();
4258 bp = nbp;
4259 ix = 0;
4260 goto cas_good;
4261 } else {
4262 kmem_cache_free(ftblk_cache, nbp);
4263 bp = hp->tail;
4264 continue;
4267 nix = ix + 1;
4268 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4269 cas_good:
4270 if (curthread != hp->thread) {
4271 hp->thread = curthread;
4272 evnt |= FTEV_CS;
4274 if (CPU->cpu_seqid != hp->cpu_seqid) {
4275 hp->cpu_seqid = CPU->cpu_seqid;
4276 evnt |= FTEV_PS;
4278 ep = &bp->ev[ix];
4279 break;
4283 if (evnt & FTEV_QMASK) {
4284 queue_t *qp = p;
4286 if (!(qp->q_flag & QREADR))
4287 evnt |= FTEV_ISWR;
4289 ep->mid = Q2NAME(qp);
4292 * We only record the next queue name for FTEV_PUTNEXT since
4293 * that's the only time we *really* need it, and the putnext()
4294 * code ensures that qp->q_next won't vanish. (We could use
4295 * claimstr()/releasestr() but at a performance cost.)
4297 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4298 ep->midnext = Q2NAME(qp->q_next);
4299 else
4300 ep->midnext = NULL;
4301 } else {
4302 ep->mid = p;
4303 ep->midnext = NULL;
4306 if (ep->stk != NULL)
4307 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4309 ep->ts = gethrtime();
4310 ep->evnt = evnt;
4311 ep->data = data;
4312 hp->hash = (hp->hash << 9) + hp->hash;
4313 hp->hash += (evnt << 16) | data;
4314 hp->hash += (uintptr_t)ep->mid;
4318 * Free flow-trace data.
4320 void
4321 str_ftfree(dblk_t *dbp)
4323 fthdr_t *hp = dbp->db_fthdr;
4324 ftblk_t *bp = &hp->first;
4325 ftblk_t *nbp;
4327 if (bp != hp->tail || bp->ix != 0) {
4329 * Clear out the hash, have the tail point to itself, and free
4330 * any continuation blocks.
4332 bp = hp->first.nxt;
4333 hp->tail = &hp->first;
4334 hp->hash = 0;
4335 hp->first.nxt = NULL;
4336 hp->first.ix = 0;
4337 while (bp != NULL) {
4338 nbp = bp->nxt;
4339 kmem_cache_free(ftblk_cache, bp);
4340 bp = nbp;
4343 kmem_cache_free(fthdr_cache, hp);
4344 dbp->db_fthdr = NULL;