4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 * STREAMS Buffering module
32 * This streams module collects incoming messages from modules below
33 * it on the stream and buffers them up into a smaller number of
34 * aggregated messages. Its main purpose is to reduce overhead by
35 * cutting down on the number of read (or getmsg) calls its client
37 * - only M_DATA is buffered.
38 * - multithreading assumes configured as D_MTQPAIR
39 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
41 * - in order message transmission. This is enforced for messages other
42 * than high priority messages.
43 * - zero length messages on the read side are not passed up the
44 * stream but used internally for synchronization.
46 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
47 * (conversion is the default for backwards compatibility
48 * hence the negative logic).
49 * - SB_NO_HEADER - no headers in buffered data.
50 * (adding headers is the default for backwards compatibility
51 * hence the negative logic).
52 * - SB_DEFER_CHUNK - provides improved response time in question-answer
53 * applications. Buffering is not enabled until the second message
54 * is received on the read side within the sb_ticks interval.
55 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
56 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
57 * data being immediately sent upstream.
58 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
59 * the blocked flow condition downstream. If this flag is clear (default)
60 * messages will be dropped if the upstream flow is blocked.
64 #include <sys/types.h>
65 #include <sys/errno.h>
66 #include <sys/debug.h>
67 #include <sys/stropts.h>
69 #include <sys/stream.h>
72 #include <sys/sunddi.h>
74 #include <sys/strsun.h>
75 #include <sys/bufmod.h>
76 #include <sys/modctl.h>
77 #include <sys/isa_defs.h>
80 * Per-Stream state information.
82 * If sb_ticks is negative, we don't deliver chunks until they're
83 * full. If it's zero, we deliver every packet as it arrives. (In
84 * this case we force sb_chunk to zero, to make the implementation
85 * easier.) Otherwise, sb_ticks gives the number of ticks in a
86 * buffering interval. The interval begins when the a read side data
87 * message is received and a timeout is not active. If sb_snap is
88 * zero, no truncation of the msg is done.
91 queue_t
*sb_rq
; /* our rq */
92 mblk_t
*sb_mp
; /* partial chunk */
93 mblk_t
*sb_head
; /* pre-allocated space for the next header */
94 mblk_t
*sb_tail
; /* first mblk of last message appended */
95 uint_t sb_mlen
; /* sb_mp length */
96 uint_t sb_mcount
; /* input msg count in sb_mp */
97 uint_t sb_chunk
; /* max chunk size */
98 clock_t sb_ticks
; /* timeout interval */
99 timeout_id_t sb_timeoutid
; /* qtimeout() id */
100 uint_t sb_drops
; /* cumulative # discarded msgs */
101 uint_t sb_snap
; /* snapshot length */
102 uint_t sb_flags
; /* flags field */
103 uint_t sb_state
; /* state variable */
107 * Function prototypes.
109 static int sbopen(queue_t
*, dev_t
*, int, int, cred_t
*);
110 static int sbclose(queue_t
*, int, cred_t
*);
111 static void sbwput(queue_t
*, mblk_t
*);
112 static void sbrput(queue_t
*, mblk_t
*);
113 static void sbrsrv(queue_t
*);
114 static void sbioctl(queue_t
*, mblk_t
*);
115 static void sbaddmsg(queue_t
*, mblk_t
*);
116 static void sbtick(void *);
117 static void sbclosechunk(struct sb
*);
118 static void sbsendit(queue_t
*, mblk_t
*);
120 static struct module_info sb_minfo
= {
122 "bufmod", /* mi_idname */
124 INFPSZ
, /* mi_maxpsz */
129 static struct qinit sb_rinit
= {
130 (int (*)())sbrput
, /* qi_putp */
131 (int (*)())sbrsrv
, /* qi_srvp */
132 sbopen
, /* qi_qopen */
133 sbclose
, /* qi_qclose */
134 NULL
, /* qi_qadmin */
135 &sb_minfo
, /* qi_minfo */
139 static struct qinit sb_winit
= {
140 (int (*)())sbwput
, /* qi_putp */
143 NULL
, /* qi_qclose */
144 NULL
, /* qi_qadmin */
145 &sb_minfo
, /* qi_minfo */
149 static struct streamtab sb_info
= {
150 &sb_rinit
, /* st_rdinit */
151 &sb_winit
, /* st_wrinit */
152 NULL
, /* st_muxrinit */
153 NULL
/* st_muxwinit */
158 * This is the loadable module wrapper.
161 static struct fmodsw fsw
= {
168 * Module linkage information for the kernel.
171 static struct modlstrmod modlstrmod
= {
172 &mod_strmodops
, "streams buffer mod", &fsw
175 static struct modlinkage modlinkage
= {
176 MODREV_1
, &modlstrmod
, NULL
183 return (mod_install(&modlinkage
));
189 return (mod_remove(&modlinkage
));
193 _info(struct modinfo
*modinfop
)
195 return (mod_info(&modlinkage
, modinfop
));
201 sbopen(queue_t
*rq
, dev_t
*dev
, int oflag
, int sflag
, cred_t
*crp
)
206 if (sflag
!= MODOPEN
)
213 * Allocate and initialize per-Stream structure.
215 sbp
= kmem_alloc(sizeof (struct sb
), KM_SLEEP
);
218 sbp
->sb_chunk
= SB_DFLT_CHUNK
;
219 sbp
->sb_tail
= sbp
->sb_mp
= sbp
->sb_head
= NULL
;
222 sbp
->sb_timeoutid
= 0;
228 rq
->q_ptr
= WR(rq
)->q_ptr
= sbp
;
238 sbclose(queue_t
*rq
, int flag
, cred_t
*credp
)
240 struct sb
*sbp
= (struct sb
*)rq
->q_ptr
;
246 * Cancel an outstanding timeout
248 if (sbp
->sb_timeoutid
!= 0) {
249 (void) quntimeout(rq
, sbp
->sb_timeoutid
);
250 sbp
->sb_timeoutid
= 0;
253 * Free the current chunk.
257 sbp
->sb_tail
= sbp
->sb_mp
= sbp
->sb_head
= NULL
;
262 * Free the per-Stream structure.
264 kmem_free((caddr_t
)sbp
, sizeof (struct sb
));
265 rq
->q_ptr
= WR(rq
)->q_ptr
= NULL
;
271 * the correction factor is introduced to compensate for
272 * whatever assumptions the modules below have made about
273 * how much traffic is flowing through the stream and the fact
274 * that bufmod may be snipping messages with the sb_snap length.
276 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
277 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
281 sbioc(queue_t
*wq
, mblk_t
*mp
)
284 struct sb
*sbp
= (struct sb
*)wq
->q_ptr
;
288 iocp
= (struct iocblk
*)mp
->b_rptr
;
290 switch (iocp
->ioc_cmd
) {
295 miocack(wq
, mp
, 0, 0);
299 #ifdef _SYSCALL32_IMPL
300 if ((iocp
->ioc_flag
& IOC_MODELS
) != IOC_NATIVE
) {
301 struct timeval32
*t32
;
303 t32
= (struct timeval32
*)mp
->b_cont
->b_rptr
;
304 if (t32
->tv_sec
< 0 || t32
->tv_usec
< 0) {
305 miocnak(wq
, mp
, 0, EINVAL
);
308 ticks
= TIMEVAL_TO_TICK(t32
);
310 #endif /* _SYSCALL32_IMPL */
314 tb
= (struct timeval
*)mp
->b_cont
->b_rptr
;
316 if (tb
->tv_sec
< 0 || tb
->tv_usec
< 0) {
317 miocnak(wq
, mp
, 0, EINVAL
);
320 ticks
= TIMEVAL_TO_TICK(tb
);
322 sbp
->sb_ticks
= ticks
;
325 miocack(wq
, mp
, 0, 0);
331 * set up hi/lo water marks on stream head read queue.
332 * unlikely to run out of resources. Fix at later date.
334 if ((mop
= allocb(sizeof (struct stroptions
),
335 BPRI_MED
)) != NULL
) {
336 struct stroptions
*sop
;
339 chunk
= *(uint_t
*)mp
->b_cont
->b_rptr
;
340 mop
->b_datap
->db_type
= M_SETOPTS
;
341 mop
->b_wptr
+= sizeof (struct stroptions
);
342 sop
= (struct stroptions
*)mop
->b_rptr
;
343 sop
->so_flags
= SO_HIWAT
| SO_LOWAT
;
344 sop
->so_hiwat
= SNIT_HIWAT(chunk
, 1);
345 sop
->so_lowat
= SNIT_LOWAT(chunk
, 1);
349 sbp
->sb_chunk
= *(uint_t
*)mp
->b_cont
->b_rptr
;
350 miocack(wq
, mp
, 0, 0);
355 sbp
->sb_flags
= *(uint_t
*)mp
->b_cont
->b_rptr
;
356 miocack(wq
, mp
, 0, 0);
361 * if chunking dont worry about effects of
362 * snipping of message size on head flow control
363 * since it has a relatively small bearing on the
364 * data rate onto the streamn head.
366 if (!sbp
->sb_chunk
) {
368 * set up hi/lo water marks on stream head read queue.
369 * unlikely to run out of resources. Fix at later date.
371 if ((mop
= allocb(sizeof (struct stroptions
),
372 BPRI_MED
)) != NULL
) {
373 struct stroptions
*sop
;
377 snap
= *(uint_t
*)mp
->b_cont
->b_rptr
;
378 mop
->b_datap
->db_type
= M_SETOPTS
;
379 mop
->b_wptr
+= sizeof (struct stroptions
);
380 sop
= (struct stroptions
*)mop
->b_rptr
;
381 sop
->so_flags
= SO_HIWAT
| SO_LOWAT
;
382 fudge
= snap
<= 100 ? 4 :
385 sop
->so_hiwat
= SNIT_HIWAT(snap
, fudge
);
386 sop
->so_lowat
= SNIT_LOWAT(snap
, fudge
);
391 sbp
->sb_snap
= *(uint_t
*)mp
->b_cont
->b_rptr
;
392 miocack(wq
, mp
, 0, 0);
402 * Write-side put procedure. Its main task is to detect ioctls
403 * for manipulating the buffering state and hand them to sbioctl.
404 * Other message types are passed on through.
407 sbwput(queue_t
*wq
, mblk_t
*mp
)
409 struct sb
*sbp
= (struct sb
*)wq
->q_ptr
;
410 struct copyresp
*resp
;
412 if (sbp
->sb_flags
& SB_SEND_ON_WRITE
)
414 switch (mp
->b_datap
->db_type
) {
420 resp
= (struct copyresp
*)mp
->b_rptr
;
423 * Just free message on failure.
429 switch (resp
->cp_cmd
) {
454 * Read-side put procedure. It's responsible for buffering up incoming
455 * messages and grouping them into aggregates according to the current
456 * buffering parameters.
459 sbrput(queue_t
*rq
, mblk_t
*mp
)
461 struct sb
*sbp
= (struct sb
*)rq
->q_ptr
;
465 switch (mp
->b_datap
->db_type
) {
467 if (sbp
->sb_flags
& SB_NO_PROTO_CVT
) {
473 * Convert M_PROTO to M_DATA.
475 mp
->b_datap
->db_type
= M_DATA
;
480 if ((sbp
->sb_flags
& SB_DEFER_CHUNK
) &&
481 !(sbp
->sb_state
& SB_FRCVD
)) {
484 sbp
->sb_state
|= SB_FRCVD
;
488 if ((sbp
->sb_ticks
> 0) && !(sbp
->sb_timeoutid
))
489 sbp
->sb_timeoutid
= qtimeout(sbp
->sb_rq
, sbtick
,
495 if (*mp
->b_rptr
& FLUSHR
) {
497 * Reset timeout, flush the chunk currently in
498 * progress, and start a new chunk.
500 if (sbp
->sb_timeoutid
) {
501 (void) quntimeout(sbp
->sb_rq
,
503 sbp
->sb_timeoutid
= 0;
507 sbp
->sb_tail
= sbp
->sb_mp
= sbp
->sb_head
= NULL
;
511 flushq(rq
, FLUSHALL
);
518 * Zero-length M_CTL means our timeout() popped.
520 if (MBLKL(mp
) == 0) {
530 if (mp
->b_datap
->db_type
<= QPCTL
) {
534 /* Note: out of band */
542 * read service procedure.
551 * High priority messages shouldn't get here but if
552 * one does, jam it through to avoid infinite loop.
554 while ((mp
= getq(rq
)) != NULL
) {
555 if (!canputnext(rq
) && (mp
->b_datap
->db_type
<= QPCTL
)) {
556 /* should only get here if SB_NO_SROPS */
557 (void) putbq(rq
, mp
);
565 * Handle write-side M_IOCTL messages.
568 sbioctl(queue_t
*wq
, mblk_t
*mp
)
570 struct sb
*sbp
= (struct sb
*)wq
->q_ptr
;
571 struct iocblk
*iocp
= (struct iocblk
*)mp
->b_rptr
;
575 int transparent
= iocp
->ioc_count
;
579 switch (iocp
->ioc_cmd
) {
581 if (iocp
->ioc_count
== TRANSPARENT
) {
582 #ifdef _SYSCALL32_IMPL
583 if ((iocp
->ioc_flag
& IOC_MODELS
) != IOC_NATIVE
) {
584 mcopyin(mp
, NULL
, sizeof (struct timeval32
),
587 #endif /* _SYSCALL32_IMPL */
589 mcopyin(mp
, NULL
, sizeof (*t
), NULL
);
594 * Verify argument length.
596 #ifdef _SYSCALL32_IMPL
597 if ((iocp
->ioc_flag
& IOC_MODELS
) != IOC_NATIVE
) {
598 struct timeval32
*t32
;
600 error
= miocpullup(mp
,
601 sizeof (struct timeval32
));
603 miocnak(wq
, mp
, 0, error
);
606 t32
= (struct timeval32
*)mp
->b_cont
->b_rptr
;
607 if (t32
->tv_sec
< 0 || t32
->tv_usec
< 0) {
608 miocnak(wq
, mp
, 0, EINVAL
);
611 ticks
= TIMEVAL_TO_TICK(t32
);
613 #endif /* _SYSCALL32_IMPL */
615 error
= miocpullup(mp
, sizeof (struct timeval
));
617 miocnak(wq
, mp
, 0, error
);
621 t
= (struct timeval
*)mp
->b_cont
->b_rptr
;
622 if (t
->tv_sec
< 0 || t
->tv_usec
< 0) {
623 miocnak(wq
, mp
, 0, EINVAL
);
626 ticks
= TIMEVAL_TO_TICK(t
);
628 sbp
->sb_ticks
= ticks
;
631 miocack(wq
, mp
, 0, 0);
640 * Verify argument length.
642 if (transparent
!= TRANSPARENT
) {
643 #ifdef _SYSCALL32_IMPL
644 if ((iocp
->ioc_flag
& IOC_MODELS
) != IOC_NATIVE
) {
645 error
= miocpullup(mp
,
646 sizeof (struct timeval32
));
648 miocnak(wq
, mp
, 0, error
);
652 #endif /* _SYSCALL32_IMPL */
653 error
= miocpullup(mp
, sizeof (struct timeval
));
655 miocnak(wq
, mp
, 0, error
);
661 * If infinite timeout, return range error
664 if (sbp
->sb_ticks
< 0) {
665 miocnak(wq
, mp
, 0, ERANGE
);
669 #ifdef _SYSCALL32_IMPL
670 if ((iocp
->ioc_flag
& IOC_MODELS
) != IOC_NATIVE
) {
671 struct timeval32
*t32
;
673 if (transparent
== TRANSPARENT
) {
674 datamp
= allocb(sizeof (*t32
), BPRI_MED
);
675 if (datamp
== NULL
) {
676 miocnak(wq
, mp
, 0, EAGAIN
);
679 mcopyout(mp
, NULL
, sizeof (*t32
), NULL
, datamp
);
682 t32
= (struct timeval32
*)mp
->b_cont
->b_rptr
;
683 TICK_TO_TIMEVAL32(sbp
->sb_ticks
, t32
);
685 if (transparent
== TRANSPARENT
)
688 miocack(wq
, mp
, sizeof (*t32
), 0);
690 #endif /* _SYSCALL32_IMPL */
692 if (transparent
== TRANSPARENT
) {
693 datamp
= allocb(sizeof (*t
), BPRI_MED
);
694 if (datamp
== NULL
) {
695 miocnak(wq
, mp
, 0, EAGAIN
);
698 mcopyout(mp
, NULL
, sizeof (*t
), NULL
, datamp
);
701 t
= (struct timeval
*)mp
->b_cont
->b_rptr
;
702 TICK_TO_TIMEVAL(sbp
->sb_ticks
, t
);
704 if (transparent
== TRANSPARENT
)
707 miocack(wq
, mp
, sizeof (*t
), 0);
714 miocack(wq
, mp
, 0, 0);
718 if (iocp
->ioc_count
== TRANSPARENT
) {
719 mcopyin(mp
, NULL
, sizeof (uint_t
), NULL
);
723 * Verify argument length.
725 error
= miocpullup(mp
, sizeof (uint_t
));
727 miocnak(wq
, mp
, 0, error
);
732 * set up hi/lo water marks on stream head read queue.
733 * unlikely to run out of resources. Fix at later date.
735 if ((mop
= allocb(sizeof (struct stroptions
),
736 BPRI_MED
)) != NULL
) {
737 struct stroptions
*sop
;
740 chunk
= *(uint_t
*)mp
->b_cont
->b_rptr
;
741 mop
->b_datap
->db_type
= M_SETOPTS
;
742 mop
->b_wptr
+= sizeof (struct stroptions
);
743 sop
= (struct stroptions
*)mop
->b_rptr
;
744 sop
->so_flags
= SO_HIWAT
| SO_LOWAT
;
745 sop
->so_hiwat
= SNIT_HIWAT(chunk
, 1);
746 sop
->so_lowat
= SNIT_LOWAT(chunk
, 1);
750 sbp
->sb_chunk
= *(uint_t
*)mp
->b_cont
->b_rptr
;
751 miocack(wq
, mp
, 0, 0);
758 * Verify argument length.
760 if (transparent
!= TRANSPARENT
) {
761 error
= miocpullup(mp
, sizeof (uint_t
));
763 miocnak(wq
, mp
, 0, error
);
768 if (transparent
== TRANSPARENT
) {
769 datamp
= allocb(sizeof (uint_t
), BPRI_MED
);
770 if (datamp
== NULL
) {
771 miocnak(wq
, mp
, 0, EAGAIN
);
774 mcopyout(mp
, NULL
, sizeof (uint_t
), NULL
, datamp
);
777 *(uint_t
*)mp
->b_cont
->b_rptr
= sbp
->sb_chunk
;
779 if (transparent
== TRANSPARENT
)
782 miocack(wq
, mp
, sizeof (uint_t
), 0);
786 if (iocp
->ioc_count
== TRANSPARENT
) {
787 mcopyin(mp
, NULL
, sizeof (uint_t
), NULL
);
791 * Verify argument length.
793 error
= miocpullup(mp
, sizeof (uint_t
));
795 miocnak(wq
, mp
, 0, error
);
800 * if chunking dont worry about effects of
801 * snipping of message size on head flow control
802 * since it has a relatively small bearing on the
803 * data rate onto the streamn head.
805 if (!sbp
->sb_chunk
) {
807 * set up hi/lo water marks on stream
808 * head read queue. unlikely to run out
809 * of resources. Fix at later date.
811 if ((mop
= allocb(sizeof (struct stroptions
),
812 BPRI_MED
)) != NULL
) {
813 struct stroptions
*sop
;
817 snap
= *(uint_t
*)mp
->b_cont
->b_rptr
;
818 mop
->b_datap
->db_type
= M_SETOPTS
;
819 mop
->b_wptr
+= sizeof (*sop
);
820 sop
= (struct stroptions
*)mop
->b_rptr
;
821 sop
->so_flags
= SO_HIWAT
| SO_LOWAT
;
822 fudge
= (snap
<= 100) ? 4 :
823 (snap
<= 400) ? 2 : 1;
824 sop
->so_hiwat
= SNIT_HIWAT(snap
, fudge
);
825 sop
->so_lowat
= SNIT_LOWAT(snap
, fudge
);
830 sbp
->sb_snap
= *(uint_t
*)mp
->b_cont
->b_rptr
;
832 miocack(wq
, mp
, 0, 0);
838 * Verify argument length
840 if (transparent
!= TRANSPARENT
) {
841 error
= miocpullup(mp
, sizeof (uint_t
));
843 miocnak(wq
, mp
, 0, error
);
848 if (transparent
== TRANSPARENT
) {
849 datamp
= allocb(sizeof (uint_t
), BPRI_MED
);
850 if (datamp
== NULL
) {
851 miocnak(wq
, mp
, 0, EAGAIN
);
854 mcopyout(mp
, NULL
, sizeof (uint_t
), NULL
, datamp
);
857 *(uint_t
*)mp
->b_cont
->b_rptr
= sbp
->sb_snap
;
859 if (transparent
== TRANSPARENT
)
862 miocack(wq
, mp
, sizeof (uint_t
), 0);
869 if (iocp
->ioc_count
== TRANSPARENT
) {
870 mcopyin(mp
, NULL
, sizeof (uint_t
), NULL
);
873 error
= miocpullup(mp
, sizeof (uint_t
));
875 miocnak(wq
, mp
, 0, error
);
878 sbp
->sb_flags
= *(uint_t
*)mp
->b_cont
->b_rptr
;
879 miocack(wq
, mp
, 0, 0);
885 * Verify argument length
887 if (transparent
!= TRANSPARENT
) {
888 error
= miocpullup(mp
, sizeof (uint_t
));
890 miocnak(wq
, mp
, 0, error
);
895 if (transparent
== TRANSPARENT
) {
896 datamp
= allocb(sizeof (uint_t
), BPRI_MED
);
897 if (datamp
== NULL
) {
898 miocnak(wq
, mp
, 0, EAGAIN
);
901 mcopyout(mp
, NULL
, sizeof (uint_t
), NULL
, datamp
);
904 *(uint_t
*)mp
->b_cont
->b_rptr
= sbp
->sb_flags
;
906 if (transparent
== TRANSPARENT
)
909 miocack(wq
, mp
, sizeof (uint_t
), 0);
920 * Given a length l, calculate the amount of extra storage
921 * required to round it up to the next multiple of the alignment a.
923 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
925 * Calculate additional amount of space required for alignment.
927 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
929 * Smallest possible message size when headers are enabled.
930 * This is used to calculate whether a chunk is nearly full.
932 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
935 * Process a read-side M_DATA message.
937 * If the currently accumulating chunk doesn't have enough room
938 * for the message, close off the chunk, pass it upward, and start
939 * a new one. Then add the message to the current chunk, taking
940 * account of the possibility that the message's size exceeds the
943 * If headers are enabled add an sb_hdr header and trailing alignment padding.
945 * To optimise performance the total number of msgbs should be kept
946 * to a minimum. This is achieved by using any remaining space in message N
947 * for both its own padding as well as the header of message N+1 if possible.
948 * If there's insufficient space we allocate one message to hold this 'wrapper'.
949 * (there's likely to be space beyond message N, since allocb would have
950 * rounded up the required size to one of the dblk_sizes).
954 sbaddmsg(queue_t
*rq
, mblk_t
*mp
)
959 mblk_t
*wrapper
; /* padding for msg N, header for msg N+1 */
960 mblk_t
*last
; /* last mblk of current message */
961 size_t wrapperlen
; /* length of header + padding */
962 size_t origlen
; /* data length before truncation */
963 size_t pad
; /* bytes required to align header */
965 sbp
= (struct sb
*)rq
->q_ptr
;
967 origlen
= msgdsize(mp
);
970 * Truncate the message.
972 if ((sbp
->sb_snap
> 0) && (origlen
> sbp
->sb_snap
) &&
973 (adjmsg(mp
, -(origlen
- sbp
->sb_snap
)) == 1))
974 hp
.sbh_totlen
= hp
.sbh_msglen
= sbp
->sb_snap
;
976 hp
.sbh_totlen
= hp
.sbh_msglen
= origlen
;
978 if (sbp
->sb_flags
& SB_NO_HEADER
) {
981 * Would the inclusion of this message overflow the current
982 * chunk? If so close the chunk off and start a new one.
984 if ((hp
.sbh_totlen
+ sbp
->sb_mlen
) > sbp
->sb_chunk
)
987 * First message too big for chunk - just send it up.
988 * This will always be true when we're not chunking.
990 if (hp
.sbh_totlen
> sbp
->sb_chunk
) {
996 * We now know that the msg will fit in the chunk.
997 * Link it onto the end of the chunk.
998 * Since linkb() walks the entire chain, we keep a pointer to
999 * the first mblk of the last msgb added and call linkb on that
1000 * that last message, rather than performing the
1001 * O(n) linkb() operation on the whole chain.
1002 * sb_head isn't needed in this SB_NO_HEADER mode.
1005 linkb(sbp
->sb_tail
, mp
);
1010 sbp
->sb_mlen
+= hp
.sbh_totlen
;
1013 /* Timestamp must be done immediately */
1015 TIMEVAL_TO_TIMEVAL32(&hp
.sbh_timestamp
, &t
);
1017 pad
= Align(hp
.sbh_totlen
);
1018 hp
.sbh_totlen
+= sizeof (hp
);
1019 hp
.sbh_totlen
+= pad
;
1022 * Would the inclusion of this message overflow the current
1023 * chunk? If so close the chunk off and start a new one.
1025 if ((hp
.sbh_totlen
+ sbp
->sb_mlen
) > sbp
->sb_chunk
)
1028 if (sbp
->sb_head
== NULL
) {
1029 /* Allocate leading header of new chunk */
1030 sbp
->sb_head
= allocb(sizeof (hp
), BPRI_MED
);
1031 if (sbp
->sb_head
== NULL
) {
1033 * Memory allocation failure.
1034 * This will need to be revisited
1035 * since using certain flag combinations
1036 * can result in messages being dropped
1043 sbp
->sb_mp
= sbp
->sb_head
;
1047 * Copy header into message
1049 hp
.sbh_drops
= sbp
->sb_drops
;
1050 hp
.sbh_origlen
= origlen
;
1051 (void) memcpy(sbp
->sb_head
->b_wptr
, (char *)&hp
, sizeof (hp
));
1052 sbp
->sb_head
->b_wptr
+= sizeof (hp
);
1054 ASSERT(sbp
->sb_head
->b_wptr
<= sbp
->sb_head
->b_datap
->db_lim
);
1057 * Join message to the chunk
1059 linkb(sbp
->sb_head
, mp
);
1062 sbp
->sb_mlen
+= hp
.sbh_totlen
;
1065 * If the first message alone is too big for the chunk close
1067 * If the next message would immediately cause the chunk to
1068 * overflow we may as well close the chunk now. The next
1069 * message is certain to be at least SMALLEST_MESSAGE size.
1071 if (hp
.sbh_totlen
+ SMALLEST_MESSAGE
> sbp
->sb_chunk
) {
1077 * Find space for the wrapper. The wrapper consists of:
1079 * 1) Padding for this message (this is to ensure each header
1080 * begins on an 8 byte boundary in the userland buffer).
1082 * 2) Space for the next message's header, in case the next
1083 * next message will fit in this chunk.
1085 * It may be possible to append the wrapper to the last mblk
1086 * of the message, but only if we 'own' the data. If the dblk
1087 * has been shared through dupmsg() we mustn't alter it.
1090 wrapperlen
= (sizeof (hp
) + pad
);
1092 /* Is there space for the wrapper beyond the message's data ? */
1093 for (last
= mp
; last
->b_cont
; last
= last
->b_cont
)
1096 if ((wrapperlen
<= MBLKTAIL(last
)) &&
1097 (last
->b_datap
->db_ref
== 1)) {
1100 * Pad with zeroes to the next pointer boundary
1101 * (we don't want to disclose kernel data to
1102 * users), then advance wptr.
1104 (void) memset(last
->b_wptr
, 0, pad
);
1105 last
->b_wptr
+= pad
;
1107 /* Remember where to write the header information */
1108 sbp
->sb_head
= last
;
1110 /* Have to allocate additional space for the wrapper */
1111 wrapper
= allocb(wrapperlen
, BPRI_MED
);
1112 if (wrapper
== NULL
) {
1118 * Pad with zeroes (we don't want to disclose
1119 * kernel data to users).
1121 (void) memset(wrapper
->b_wptr
, 0, pad
);
1122 wrapper
->b_wptr
+= pad
;
1124 /* Link the wrapper msg onto the end of the chunk */
1126 /* Remember to write the next header in this wrapper */
1127 sbp
->sb_head
= wrapper
;
1133 * Called from timeout().
1134 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1135 * to synchronize with any active module threads (open, close, wput, rput).
1140 struct sb
*sbp
= arg
;
1146 sbp
->sb_timeoutid
= 0; /* timeout has fired */
1148 if (putctl(rq
, M_CTL
) == 0) /* failure */
1149 sbp
->sb_timeoutid
= qtimeout(rq
, sbtick
, sbp
, sbp
->sb_ticks
);
1153 * Close off the currently accumulating chunk and pass
1154 * it upward. Takes care of resetting timers as well.
1156 * This routine is called both directly and as a result
1157 * of the chunk timeout expiring.
1160 sbclosechunk(struct sb
*sbp
)
1167 if (sbp
->sb_timeoutid
) {
1168 (void) quntimeout(sbp
->sb_rq
, sbp
->sb_timeoutid
);
1169 sbp
->sb_timeoutid
= 0;
1176 * If there's currently a chunk in progress, close it off
1177 * and try to send it up.
1184 * Clear old chunk. Ready for new msgs.
1186 sbp
->sb_tail
= sbp
->sb_mp
= sbp
->sb_head
= NULL
;
1189 if (sbp
->sb_flags
& SB_DEFER_CHUNK
)
1190 sbp
->sb_state
&= ~SB_FRCVD
;
1195 sbsendit(queue_t
*rq
, mblk_t
*mp
)
1197 struct sb
*sbp
= (struct sb
*)rq
->q_ptr
;
1199 if (!canputnext(rq
)) {
1200 if (sbp
->sb_flags
& SB_NO_DROPS
)
1201 (void) putq(rq
, mp
);
1204 sbp
->sb_drops
+= sbp
->sb_mcount
;
1209 * If there are messages on the q already, keep
1210 * queueing them since they need to be processed in order.
1212 if (qsize(rq
) > 0) {
1213 /* should only get here if SB_NO_DROPS */
1214 (void) putq(rq
, mp
);