3329 spa_sync() spends 10-20% of its time in spa_free_sync_cb()
[unleashed.git] / usr / src / uts / common / rpc / rpcmod.c
blobfc99ca89b32b89449dff082ee3f6bb5a99a52c61
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 * Copyright 2012 Milan Jurik. All rights reserved.
26 /* Copyright (c) 1990 Mentat Inc. */
28 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
29 /* All Rights Reserved */
32 * Kernel RPC filtering module
35 #include <sys/param.h>
36 #include <sys/types.h>
37 #include <sys/stream.h>
38 #include <sys/stropts.h>
39 #include <sys/strsubr.h>
40 #include <sys/tihdr.h>
41 #include <sys/timod.h>
42 #include <sys/tiuser.h>
43 #include <sys/debug.h>
44 #include <sys/signal.h>
45 #include <sys/pcb.h>
46 #include <sys/user.h>
47 #include <sys/errno.h>
48 #include <sys/cred.h>
49 #include <sys/policy.h>
50 #include <sys/inline.h>
51 #include <sys/cmn_err.h>
52 #include <sys/kmem.h>
53 #include <sys/file.h>
54 #include <sys/sysmacros.h>
55 #include <sys/systm.h>
56 #include <sys/t_lock.h>
57 #include <sys/ddi.h>
58 #include <sys/vtrace.h>
59 #include <sys/callb.h>
60 #include <sys/strsun.h>
62 #include <sys/strlog.h>
63 #include <rpc/rpc_com.h>
64 #include <inet/common.h>
65 #include <rpc/types.h>
66 #include <sys/time.h>
67 #include <rpc/xdr.h>
68 #include <rpc/auth.h>
69 #include <rpc/clnt.h>
70 #include <rpc/rpc_msg.h>
71 #include <rpc/clnt.h>
72 #include <rpc/svc.h>
73 #include <rpc/rpcsys.h>
74 #include <rpc/rpc_rdma.h>
77 * This is the loadable module wrapper.
79 #include <sys/conf.h>
80 #include <sys/modctl.h>
81 #include <sys/syscall.h>
83 extern struct streamtab rpcinfo;
85 static struct fmodsw fsw = {
86 "rpcmod",
87 &rpcinfo,
88 D_NEW|D_MP,
92 * Module linkage information for the kernel.
95 static struct modlstrmod modlstrmod = {
96 &mod_strmodops, "rpc interface str mod", &fsw
100 * For the RPC system call.
102 static struct sysent rpcsysent = {
104 SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
105 rpcsys
108 static struct modlsys modlsys = {
109 &mod_syscallops,
110 "RPC syscall",
111 &rpcsysent
114 #ifdef _SYSCALL32_IMPL
115 static struct modlsys modlsys32 = {
116 &mod_syscallops32,
117 "32-bit RPC syscall",
118 &rpcsysent
120 #endif /* _SYSCALL32_IMPL */
122 static struct modlinkage modlinkage = {
123 MODREV_1,
125 &modlsys,
126 #ifdef _SYSCALL32_IMPL
127 &modlsys32,
128 #endif
129 &modlstrmod,
130 NULL
135 _init(void)
137 int error = 0;
138 callb_id_t cid;
139 int status;
141 svc_init();
142 clnt_init();
143 cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
145 if (error = mod_install(&modlinkage)) {
147 * Could not install module, cleanup previous
148 * initialization work.
150 clnt_fini();
151 if (cid != NULL)
152 (void) callb_delete(cid);
154 return (error);
158 * Load up the RDMA plugins and initialize the stats. Even if the
159 * plugins loadup fails, but rpcmod was successfully installed the
160 * counters still get initialized.
162 rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
163 mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
165 cv_init(&rdma_wait.svc_cv, NULL, CV_DEFAULT, NULL);
166 mutex_init(&rdma_wait.svc_lock, NULL, MUTEX_DEFAULT, NULL);
168 mt_kstat_init();
171 * Get our identification into ldi. This is used for loading
172 * other modules, e.g. rpcib.
174 status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
175 if (status != 0) {
176 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
177 rpcmod_li = NULL;
180 return (error);
184 * The unload entry point fails, because we advertise entry points into
185 * rpcmod from the rest of kRPC: rpcmod_release().
188 _fini(void)
190 return (EBUSY);
194 _info(struct modinfo *modinfop)
196 return (mod_info(&modlinkage, modinfop));
199 extern int nulldev();
201 #define RPCMOD_ID 2049
203 int rmm_open(queue_t *, dev_t *, int, int, cred_t *);
204 int rmm_close(queue_t *, int, cred_t *);
207 * To save instructions, since STREAMS ignores the return value
208 * from these functions, they are defined as void here. Kind of icky, but...
210 void rmm_rput(queue_t *, mblk_t *);
211 void rmm_wput(queue_t *, mblk_t *);
212 void rmm_rsrv(queue_t *);
213 void rmm_wsrv(queue_t *);
215 int rpcmodopen(queue_t *, dev_t *, int, int, cred_t *);
216 int rpcmodclose(queue_t *, int, cred_t *);
217 void rpcmodrput(queue_t *, mblk_t *);
218 void rpcmodwput(queue_t *, mblk_t *);
219 void rpcmodrsrv();
220 void rpcmodwsrv(queue_t *);
222 static void rpcmodwput_other(queue_t *, mblk_t *);
223 static int mir_close(queue_t *q);
224 static int mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
225 cred_t *credp);
226 static void mir_rput(queue_t *q, mblk_t *mp);
227 static void mir_rsrv(queue_t *q);
228 static void mir_wput(queue_t *q, mblk_t *mp);
229 static void mir_wsrv(queue_t *q);
231 static struct module_info rpcmod_info =
232 {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
235 * Read side has no service procedure.
237 static struct qinit rpcmodrinit = {
238 (int (*)())rmm_rput,
239 (int (*)())rmm_rsrv,
240 rmm_open,
241 rmm_close,
242 nulldev,
243 &rpcmod_info,
244 NULL
248 * The write put procedure is simply putnext to conserve stack space.
249 * The write service procedure is not used to queue data, but instead to
250 * synchronize with flow control.
252 static struct qinit rpcmodwinit = {
253 (int (*)())rmm_wput,
254 (int (*)())rmm_wsrv,
255 rmm_open,
256 rmm_close,
257 nulldev,
258 &rpcmod_info,
259 NULL
261 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
263 struct xprt_style_ops {
264 int (*xo_open)();
265 int (*xo_close)();
266 void (*xo_wput)();
267 void (*xo_wsrv)();
268 void (*xo_rput)();
269 void (*xo_rsrv)();
272 static struct xprt_style_ops xprt_clts_ops = {
273 rpcmodopen,
274 rpcmodclose,
275 rpcmodwput,
276 rpcmodwsrv,
277 rpcmodrput,
278 NULL
281 static struct xprt_style_ops xprt_cots_ops = {
282 mir_open,
283 mir_close,
284 mir_wput,
285 mir_wsrv,
286 mir_rput,
287 mir_rsrv
291 * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
293 struct rpcm {
294 void *rm_krpc_cell; /* Reserved for use by KRPC */
295 struct xprt_style_ops *rm_ops;
296 int rm_type; /* Client or server side stream */
297 #define RM_CLOSING 0x1 /* somebody is trying to close slot */
298 uint_t rm_state; /* state of the slot. see above */
299 uint_t rm_ref; /* cnt of external references to slot */
300 kmutex_t rm_lock; /* mutex protecting above fields */
301 kcondvar_t rm_cwait; /* condition for closing */
302 zoneid_t rm_zoneid; /* zone which pushed rpcmod */
305 struct temp_slot {
306 void *cell;
307 struct xprt_style_ops *ops;
308 int type;
309 mblk_t *info_ack;
310 kmutex_t lock;
311 kcondvar_t wait;
314 typedef struct mir_s {
315 void *mir_krpc_cell; /* Reserved for KRPC use. This field */
316 /* must be first in the structure. */
317 struct xprt_style_ops *rm_ops;
318 int mir_type; /* Client or server side stream */
320 mblk_t *mir_head_mp; /* RPC msg in progress */
322 * mir_head_mp points the first mblk being collected in
323 * the current RPC message. Record headers are removed
324 * before data is linked into mir_head_mp.
326 mblk_t *mir_tail_mp; /* Last mblk in mir_head_mp */
328 * mir_tail_mp points to the last mblk in the message
329 * chain starting at mir_head_mp. It is only valid
330 * if mir_head_mp is non-NULL and is used to add new
331 * data blocks to the end of chain quickly.
334 int32_t mir_frag_len; /* Bytes seen in the current frag */
336 * mir_frag_len starts at -4 for beginning of each fragment.
337 * When this length is negative, it indicates the number of
338 * bytes that rpcmod needs to complete the record marker
339 * header. When it is positive or zero, it holds the number
340 * of bytes that have arrived for the current fragment and
341 * are held in mir_header_mp.
344 int32_t mir_frag_header;
346 * Fragment header as collected for the current fragment.
347 * It holds the last-fragment indicator and the number
348 * of bytes in the fragment.
351 unsigned int
352 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */
353 mir_hold_inbound : 1, /* Hold inbound messages on server */
354 /* side until outbound flow control */
355 /* is relieved. */
356 mir_closing : 1, /* The stream is being closed */
357 mir_inrservice : 1, /* data queued or rd srv proc running */
358 mir_inwservice : 1, /* data queued or wr srv proc running */
359 mir_inwflushdata : 1, /* flush M_DATAs when srv runs */
361 * On client streams, mir_clntreq is 0 or 1; it is set
362 * to 1 whenever a new request is sent out (mir_wput)
363 * and cleared when the timer fires (mir_timer). If
364 * the timer fires with this value equal to 0, then the
365 * stream is considered idle and KRPC is notified.
367 mir_clntreq : 1,
369 * On server streams, stop accepting messages
371 mir_svc_no_more_msgs : 1,
372 mir_listen_stream : 1, /* listen end point */
373 mir_unused : 1, /* no longer used */
374 mir_timer_call : 1,
375 mir_junk_fill_thru_bit_31 : 21;
377 int mir_setup_complete; /* server has initialized everything */
378 timeout_id_t mir_timer_id; /* Timer for idle checks */
379 clock_t mir_idle_timeout; /* Allowed idle time before shutdown */
381 * This value is copied from clnt_idle_timeout or
382 * svc_idle_timeout during the appropriate ioctl.
383 * Kept in milliseconds
385 clock_t mir_use_timestamp; /* updated on client with each use */
387 * This value is set to lbolt
388 * every time a client stream sends or receives data.
389 * Even if the timer message arrives, we don't shutdown
390 * client unless:
391 * lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
392 * This value is kept in HZ.
395 uint_t *mir_max_msg_sizep; /* Reference to sanity check size */
397 * This pointer is set to &clnt_max_msg_size or
398 * &svc_max_msg_size during the appropriate ioctl.
400 zoneid_t mir_zoneid; /* zone which pushed rpcmod */
401 /* Server-side fields. */
402 int mir_ref_cnt; /* Reference count: server side only */
403 /* counts the number of references */
404 /* that a kernel RPC server thread */
405 /* (see svc_run()) has on this rpcmod */
406 /* slot. Effectively, it is the */
407 /* number * of unprocessed messages */
408 /* that have been passed up to the */
409 /* KRPC layer */
411 mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */
412 /* T_DISCON_IND */
415 * these fields are for both client and server, but for debugging,
416 * it is easier to have these last in the structure.
418 kmutex_t mir_mutex; /* Mutex and condvar for close */
419 kcondvar_t mir_condvar; /* synchronization. */
420 kcondvar_t mir_timer_cv; /* Timer routine sync. */
421 } mir_t;
423 void tmp_rput(queue_t *q, mblk_t *mp);
425 struct xprt_style_ops tmpops = {
426 NULL,
427 NULL,
428 putnext,
429 NULL,
430 tmp_rput,
431 NULL
434 void
435 tmp_rput(queue_t *q, mblk_t *mp)
437 struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
438 struct T_info_ack *pptr;
440 switch (mp->b_datap->db_type) {
441 case M_PCPROTO:
442 pptr = (struct T_info_ack *)mp->b_rptr;
443 switch (pptr->PRIM_type) {
444 case T_INFO_ACK:
445 mutex_enter(&t->lock);
446 t->info_ack = mp;
447 cv_signal(&t->wait);
448 mutex_exit(&t->lock);
449 return;
450 default:
451 break;
453 default:
454 break;
458 * Not an info-ack, so free it. This is ok because we should
459 * not be receiving data until the open finishes: rpcmod
460 * is pushed well before the end-point is bound to an address.
462 freemsg(mp);
466 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
468 mblk_t *bp;
469 struct temp_slot ts, *t;
470 struct T_info_ack *pptr;
471 int error = 0;
473 ASSERT(q != NULL);
475 * Check for re-opens.
477 if (q->q_ptr) {
478 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
479 "rpcmodopen_end:(%s)", "q->qptr");
480 return (0);
483 t = &ts;
484 bzero(t, sizeof (*t));
485 q->q_ptr = (void *)t;
486 WR(q)->q_ptr = (void *)t;
489 * Allocate the required messages upfront.
491 if ((bp = allocb_cred(sizeof (struct T_info_req) +
492 sizeof (struct T_info_ack), crp, curproc->p_pid)) == NULL) {
493 return (ENOBUFS);
496 mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
497 cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
499 t->ops = &tmpops;
501 qprocson(q);
502 bp->b_datap->db_type = M_PCPROTO;
503 *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
504 bp->b_wptr += sizeof (struct T_info_req);
505 putnext(WR(q), bp);
507 mutex_enter(&t->lock);
508 while (t->info_ack == NULL) {
509 if (cv_wait_sig(&t->wait, &t->lock) == 0) {
510 error = EINTR;
511 break;
514 mutex_exit(&t->lock);
516 if (error)
517 goto out;
519 pptr = (struct T_info_ack *)t->info_ack->b_rptr;
521 if (pptr->SERV_type == T_CLTS) {
522 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
523 ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
524 } else {
525 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
526 ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
529 out:
530 if (error)
531 qprocsoff(q);
533 freemsg(t->info_ack);
534 mutex_destroy(&t->lock);
535 cv_destroy(&t->wait);
537 return (error);
540 void
541 rmm_rput(queue_t *q, mblk_t *mp)
543 (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
546 void
547 rmm_rsrv(queue_t *q)
549 (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
552 void
553 rmm_wput(queue_t *q, mblk_t *mp)
555 (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
558 void
559 rmm_wsrv(queue_t *q)
561 (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
565 rmm_close(queue_t *q, int flag, cred_t *crp)
567 return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
570 static void rpcmod_release(queue_t *, mblk_t *);
572 * rpcmodopen - open routine gets called when the module gets pushed
573 * onto the stream.
575 /*ARGSUSED*/
577 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
579 struct rpcm *rmp;
581 extern void (*rpc_rele)(queue_t *, mblk_t *);
583 TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
586 * Initialize entry points to release a rpcmod slot (and an input
587 * message if supplied) and to send an output message to the module
588 * below rpcmod.
590 if (rpc_rele == NULL)
591 rpc_rele = rpcmod_release;
594 * Only sufficiently privileged users can use this module, and it
595 * is assumed that they will use this module properly, and NOT send
596 * bulk data from downstream.
598 if (secpolicy_rpcmod_open(crp) != 0)
599 return (EPERM);
602 * Allocate slot data structure.
604 rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
606 mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
607 cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
608 rmp->rm_zoneid = rpc_zoneid();
610 * slot type will be set by kRPC client and server ioctl's
612 rmp->rm_type = 0;
614 q->q_ptr = (void *)rmp;
615 WR(q)->q_ptr = (void *)rmp;
617 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
618 return (0);
622 * rpcmodclose - This routine gets called when the module gets popped
623 * off of the stream.
625 /*ARGSUSED*/
627 rpcmodclose(queue_t *q, int flag, cred_t *crp)
629 struct rpcm *rmp;
631 ASSERT(q != NULL);
632 rmp = (struct rpcm *)q->q_ptr;
635 * Mark our state as closing.
637 mutex_enter(&rmp->rm_lock);
638 rmp->rm_state |= RM_CLOSING;
641 * Check and see if there are any messages on the queue. If so, send
642 * the messages, regardless whether the downstream module is ready to
643 * accept data.
645 if (rmp->rm_type == RPC_SERVER) {
646 flushq(q, FLUSHDATA);
648 qenable(WR(q));
650 if (rmp->rm_ref) {
651 mutex_exit(&rmp->rm_lock);
653 * call into SVC to clean the queue
655 svc_queueclean(q);
656 mutex_enter(&rmp->rm_lock);
659 * Block while there are kRPC threads with a reference
660 * to this message.
662 while (rmp->rm_ref)
663 cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
666 mutex_exit(&rmp->rm_lock);
669 * It is now safe to remove this queue from the stream. No kRPC
670 * threads have a reference to the stream, and none ever will,
671 * because RM_CLOSING is set.
673 qprocsoff(q);
675 /* Notify kRPC that this stream is going away. */
676 svc_queueclose(q);
677 } else {
678 mutex_exit(&rmp->rm_lock);
679 qprocsoff(q);
682 q->q_ptr = NULL;
683 WR(q)->q_ptr = NULL;
684 mutex_destroy(&rmp->rm_lock);
685 cv_destroy(&rmp->rm_cwait);
686 kmem_free(rmp, sizeof (*rmp));
687 return (0);
690 #ifdef DEBUG
691 int rpcmod_send_msg_up = 0;
692 int rpcmod_send_uderr = 0;
693 int rpcmod_send_dup = 0;
694 int rpcmod_send_dup_cnt = 0;
695 #endif
698 * rpcmodrput - Module read put procedure. This is called from
699 * the module, driver, or stream head downstream.
701 void
702 rpcmodrput(queue_t *q, mblk_t *mp)
704 struct rpcm *rmp;
705 union T_primitives *pptr;
706 int hdrsz;
708 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
710 ASSERT(q != NULL);
711 rmp = (struct rpcm *)q->q_ptr;
713 if (rmp->rm_type == 0) {
714 freemsg(mp);
715 return;
718 #ifdef DEBUG
719 if (rpcmod_send_msg_up > 0) {
720 mblk_t *nmp = copymsg(mp);
721 if (nmp) {
722 putnext(q, nmp);
723 rpcmod_send_msg_up--;
726 if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
727 mblk_t *nmp;
728 struct T_unitdata_ind *data;
729 struct T_uderror_ind *ud;
730 int d;
731 data = (struct T_unitdata_ind *)mp->b_rptr;
732 if (data->PRIM_type == T_UNITDATA_IND) {
733 d = sizeof (*ud) - sizeof (*data);
734 nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
735 if (nmp) {
736 ud = (struct T_uderror_ind *)nmp->b_rptr;
737 ud->PRIM_type = T_UDERROR_IND;
738 ud->DEST_length = data->SRC_length;
739 ud->DEST_offset = data->SRC_offset + d;
740 ud->OPT_length = data->OPT_length;
741 ud->OPT_offset = data->OPT_offset + d;
742 ud->ERROR_type = ENETDOWN;
743 if (data->SRC_length) {
744 bcopy(mp->b_rptr +
745 data->SRC_offset,
746 nmp->b_rptr +
747 ud->DEST_offset,
748 data->SRC_length);
750 if (data->OPT_length) {
751 bcopy(mp->b_rptr +
752 data->OPT_offset,
753 nmp->b_rptr +
754 ud->OPT_offset,
755 data->OPT_length);
757 nmp->b_wptr += d;
758 nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
759 nmp->b_datap->db_type = M_PROTO;
760 putnext(q, nmp);
761 rpcmod_send_uderr--;
765 #endif
766 switch (mp->b_datap->db_type) {
767 default:
768 putnext(q, mp);
769 break;
771 case M_PROTO:
772 case M_PCPROTO:
773 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
774 pptr = (union T_primitives *)mp->b_rptr;
777 * Forward this message to krpc if it is data.
779 if (pptr->type == T_UNITDATA_IND) {
780 mblk_t *nmp;
783 * Check if the module is being popped.
785 mutex_enter(&rmp->rm_lock);
786 if (rmp->rm_state & RM_CLOSING) {
787 mutex_exit(&rmp->rm_lock);
788 putnext(q, mp);
789 break;
792 switch (rmp->rm_type) {
793 case RPC_CLIENT:
794 mutex_exit(&rmp->rm_lock);
795 hdrsz = mp->b_wptr - mp->b_rptr;
798 * Make sure the header is sane.
800 if (hdrsz < TUNITDATAINDSZ ||
801 hdrsz < (pptr->unitdata_ind.OPT_length +
802 pptr->unitdata_ind.OPT_offset) ||
803 hdrsz < (pptr->unitdata_ind.SRC_length +
804 pptr->unitdata_ind.SRC_offset)) {
805 freemsg(mp);
806 return;
810 * Call clnt_clts_dispatch_notify, so that it
811 * can pass the message to the proper caller.
812 * Don't discard the header just yet since the
813 * client may need the sender's address.
815 clnt_clts_dispatch_notify(mp, hdrsz,
816 rmp->rm_zoneid);
817 return;
818 case RPC_SERVER:
820 * rm_krpc_cell is exclusively used by the kRPC
821 * CLTS server
823 if (rmp->rm_krpc_cell) {
824 #ifdef DEBUG
826 * Test duplicate request cache and
827 * rm_ref count handling by sending a
828 * duplicate every so often, if
829 * desired.
831 if (rpcmod_send_dup &&
832 rpcmod_send_dup_cnt++ %
833 rpcmod_send_dup)
834 nmp = copymsg(mp);
835 else
836 nmp = NULL;
837 #endif
839 * Raise the reference count on this
840 * module to prevent it from being
841 * popped before krpc generates the
842 * reply.
844 rmp->rm_ref++;
845 mutex_exit(&rmp->rm_lock);
848 * Submit the message to krpc.
850 svc_queuereq(q, mp);
851 #ifdef DEBUG
853 * Send duplicate if we created one.
855 if (nmp) {
856 mutex_enter(&rmp->rm_lock);
857 rmp->rm_ref++;
858 mutex_exit(&rmp->rm_lock);
859 svc_queuereq(q, nmp);
861 #endif
862 } else {
863 mutex_exit(&rmp->rm_lock);
864 freemsg(mp);
866 return;
867 default:
868 mutex_exit(&rmp->rm_lock);
869 freemsg(mp);
870 return;
871 } /* end switch(rmp->rm_type) */
872 } else if (pptr->type == T_UDERROR_IND) {
873 mutex_enter(&rmp->rm_lock);
874 hdrsz = mp->b_wptr - mp->b_rptr;
877 * Make sure the header is sane
879 if (hdrsz < TUDERRORINDSZ ||
880 hdrsz < (pptr->uderror_ind.OPT_length +
881 pptr->uderror_ind.OPT_offset) ||
882 hdrsz < (pptr->uderror_ind.DEST_length +
883 pptr->uderror_ind.DEST_offset)) {
884 mutex_exit(&rmp->rm_lock);
885 freemsg(mp);
886 return;
890 * In the case where a unit data error has been
891 * received, all we need to do is clear the message from
892 * the queue.
894 mutex_exit(&rmp->rm_lock);
895 freemsg(mp);
896 RPCLOG(32, "rpcmodrput: unitdata error received at "
897 "%ld\n", gethrestime_sec());
898 return;
899 } /* end else if (pptr->type == T_UDERROR_IND) */
901 putnext(q, mp);
902 break;
903 } /* end switch (mp->b_datap->db_type) */
905 TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
906 "rpcmodrput_end:");
908 * Return codes are not looked at by the STREAMS framework.
913 * write put procedure
915 void
916 rpcmodwput(queue_t *q, mblk_t *mp)
918 struct rpcm *rmp;
920 ASSERT(q != NULL);
922 switch (mp->b_datap->db_type) {
923 case M_PROTO:
924 case M_PCPROTO:
925 break;
926 default:
927 rpcmodwput_other(q, mp);
928 return;
932 * Check to see if we can send the message downstream.
934 if (canputnext(q)) {
935 putnext(q, mp);
936 return;
939 rmp = (struct rpcm *)q->q_ptr;
940 ASSERT(rmp != NULL);
943 * The first canputnext failed. Try again except this time with the
944 * lock held, so that we can check the state of the stream to see if
945 * it is closing. If either of these conditions evaluate to true
946 * then send the meesage.
948 mutex_enter(&rmp->rm_lock);
949 if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
950 mutex_exit(&rmp->rm_lock);
951 putnext(q, mp);
952 } else {
954 * canputnext failed again and the stream is not closing.
955 * Place the message on the queue and let the service
956 * procedure handle the message.
958 mutex_exit(&rmp->rm_lock);
959 (void) putq(q, mp);
963 static void
964 rpcmodwput_other(queue_t *q, mblk_t *mp)
966 struct rpcm *rmp;
967 struct iocblk *iocp;
969 rmp = (struct rpcm *)q->q_ptr;
970 ASSERT(rmp != NULL);
972 switch (mp->b_datap->db_type) {
973 case M_IOCTL:
974 iocp = (struct iocblk *)mp->b_rptr;
975 ASSERT(iocp != NULL);
976 switch (iocp->ioc_cmd) {
977 case RPC_CLIENT:
978 case RPC_SERVER:
979 mutex_enter(&rmp->rm_lock);
980 rmp->rm_type = iocp->ioc_cmd;
981 mutex_exit(&rmp->rm_lock);
982 mp->b_datap->db_type = M_IOCACK;
983 qreply(q, mp);
984 return;
985 default:
987 * pass the ioctl downstream and hope someone
988 * down there knows how to handle it.
990 putnext(q, mp);
991 return;
993 default:
994 break;
997 * This is something we definitely do not know how to handle, just
998 * pass the message downstream
1000 putnext(q, mp);
1004 * Module write service procedure. This is called by downstream modules
1005 * for back enabling during flow control.
1007 void
1008 rpcmodwsrv(queue_t *q)
1010 struct rpcm *rmp;
1011 mblk_t *mp = NULL;
1013 rmp = (struct rpcm *)q->q_ptr;
1014 ASSERT(rmp != NULL);
1017 * Get messages that may be queued and send them down stream
1019 while ((mp = getq(q)) != NULL) {
1021 * Optimize the service procedure for the server-side, by
1022 * avoiding a call to canputnext().
1024 if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
1025 putnext(q, mp);
1026 continue;
1028 (void) putbq(q, mp);
1029 return;
1033 static void
1034 rpcmod_release(queue_t *q, mblk_t *bp)
1036 struct rpcm *rmp;
1039 * For now, just free the message.
1041 if (bp)
1042 freemsg(bp);
1043 rmp = (struct rpcm *)q->q_ptr;
1045 mutex_enter(&rmp->rm_lock);
1046 rmp->rm_ref--;
1048 if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
1049 cv_broadcast(&rmp->rm_cwait);
1052 mutex_exit(&rmp->rm_lock);
1056 * This part of rpcmod is pushed on a connection-oriented transport for use
1057 * by RPC. It serves to bypass the Stream head, implements
1058 * the record marking protocol, and dispatches incoming RPC messages.
1061 /* Default idle timer values */
1062 #define MIR_CLNT_IDLE_TIMEOUT (5 * (60 * 1000L)) /* 5 minutes */
1063 #define MIR_SVC_IDLE_TIMEOUT (6 * (60 * 1000L)) /* 6 minutes */
1064 #define MIR_SVC_ORDREL_TIMEOUT (10 * (60 * 1000L)) /* 10 minutes */
1065 #define MIR_LASTFRAG 0x80000000 /* Record marker */
1067 #define MIR_SVC_QUIESCED(mir) \
1068 (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
1070 #define MIR_CLEAR_INRSRV(mir_ptr) { \
1071 (mir_ptr)->mir_inrservice = 0; \
1072 if ((mir_ptr)->mir_type == RPC_SERVER && \
1073 (mir_ptr)->mir_closing) \
1074 cv_signal(&(mir_ptr)->mir_condvar); \
1078 * Don't block service procedure (and mir_close) if
1079 * we are in the process of closing.
1081 #define MIR_WCANPUTNEXT(mir_ptr, write_q) \
1082 (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1084 static int mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1085 static void mir_rput_proto(queue_t *q, mblk_t *mp);
1086 static int mir_svc_policy_notify(queue_t *q, int event);
1087 static void mir_svc_release(queue_t *wq, mblk_t *mp);
1088 static void mir_svc_start(queue_t *wq);
1089 static void mir_svc_idle_start(queue_t *, mir_t *);
1090 static void mir_svc_idle_stop(queue_t *, mir_t *);
1091 static void mir_svc_start_close(queue_t *, mir_t *);
1092 static void mir_clnt_idle_do_stop(queue_t *);
1093 static void mir_clnt_idle_stop(queue_t *, mir_t *);
1094 static void mir_clnt_idle_start(queue_t *, mir_t *);
1095 static void mir_wput(queue_t *q, mblk_t *mp);
1096 static void mir_wput_other(queue_t *q, mblk_t *mp);
1097 static void mir_wsrv(queue_t *q);
1098 static void mir_disconnect(queue_t *, mir_t *ir);
1099 static int mir_check_len(queue_t *, int32_t, mblk_t *);
1100 static void mir_timer(void *);
1102 extern void (*mir_rele)(queue_t *, mblk_t *);
1103 extern void (*mir_start)(queue_t *);
1104 extern void (*clnt_stop_idle)(queue_t *);
1106 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1107 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1110 * Timeout for subsequent notifications of idle connection. This is
1111 * typically used to clean up after a wedged orderly release.
1113 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1115 extern uint_t *clnt_max_msg_sizep;
1116 extern uint_t *svc_max_msg_sizep;
1117 uint_t clnt_max_msg_size = RPC_MAXDATASIZE;
1118 uint_t svc_max_msg_size = RPC_MAXDATASIZE;
1119 uint_t mir_krpc_cell_null;
1121 static void
1122 mir_timer_stop(mir_t *mir)
1124 timeout_id_t tid;
1126 ASSERT(MUTEX_HELD(&mir->mir_mutex));
1129 * Since the mir_mutex lock needs to be released to call
1130 * untimeout(), we need to make sure that no other thread
1131 * can start/stop the timer (changing mir_timer_id) during
1132 * that time. The mir_timer_call bit and the mir_timer_cv
1133 * condition variable are used to synchronize this. Setting
1134 * mir_timer_call also tells mir_timer() (refer to the comments
1135 * in mir_timer()) that it does not need to do anything.
1137 while (mir->mir_timer_call)
1138 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1139 mir->mir_timer_call = B_TRUE;
1141 if ((tid = mir->mir_timer_id) != 0) {
1142 mir->mir_timer_id = 0;
1143 mutex_exit(&mir->mir_mutex);
1144 (void) untimeout(tid);
1145 mutex_enter(&mir->mir_mutex);
1147 mir->mir_timer_call = B_FALSE;
1148 cv_broadcast(&mir->mir_timer_cv);
1151 static void
1152 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1154 timeout_id_t tid;
1156 ASSERT(MUTEX_HELD(&mir->mir_mutex));
1158 while (mir->mir_timer_call)
1159 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1160 mir->mir_timer_call = B_TRUE;
1162 if ((tid = mir->mir_timer_id) != 0) {
1163 mutex_exit(&mir->mir_mutex);
1164 (void) untimeout(tid);
1165 mutex_enter(&mir->mir_mutex);
1167 /* Only start the timer when it is not closing. */
1168 if (!mir->mir_closing) {
1169 mir->mir_timer_id = timeout(mir_timer, q,
1170 MSEC_TO_TICK(intrvl));
1172 mir->mir_timer_call = B_FALSE;
1173 cv_broadcast(&mir->mir_timer_cv);
1176 static int
1177 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1179 mblk_t *mp1;
1180 uint32_t new_xid;
1181 uint32_t old_xid;
1183 ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1184 new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1186 * This loop is a bit tacky -- it walks the STREAMS list of
1187 * flow-controlled messages.
1189 if ((mp1 = q->q_first) != NULL) {
1190 do {
1191 old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1192 if (new_xid == old_xid)
1193 return (1);
1194 } while ((mp1 = mp1->b_next) != NULL);
1196 return (0);
1199 static int
1200 mir_close(queue_t *q)
1202 mir_t *mir = q->q_ptr;
1203 mblk_t *mp;
1204 bool_t queue_cleaned = FALSE;
1206 RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1207 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1208 mutex_enter(&mir->mir_mutex);
1209 if ((mp = mir->mir_head_mp) != NULL) {
1210 mir->mir_head_mp = NULL;
1211 mir->mir_tail_mp = NULL;
1212 freemsg(mp);
1215 * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1216 * is TRUE. And mir_timer_start() won't start the timer again.
1218 mir->mir_closing = B_TRUE;
1219 mir_timer_stop(mir);
1221 if (mir->mir_type == RPC_SERVER) {
1222 flushq(q, FLUSHDATA); /* Ditch anything waiting on read q */
1225 * This will prevent more requests from arriving and
1226 * will force rpcmod to ignore flow control.
1228 mir_svc_start_close(WR(q), mir);
1230 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1232 if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1233 (queue_cleaned == FALSE)) {
1235 * call into SVC to clean the queue
1237 mutex_exit(&mir->mir_mutex);
1238 svc_queueclean(q);
1239 queue_cleaned = TRUE;
1240 mutex_enter(&mir->mir_mutex);
1241 continue;
1245 * Bugid 1253810 - Force the write service
1246 * procedure to send its messages, regardless
1247 * whether the downstream module is ready
1248 * to accept data.
1250 if (mir->mir_inwservice == 1)
1251 qenable(WR(q));
1253 cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1256 mutex_exit(&mir->mir_mutex);
1257 qprocsoff(q);
1259 /* Notify KRPC that this stream is going away. */
1260 svc_queueclose(q);
1261 } else {
1262 mutex_exit(&mir->mir_mutex);
1263 qprocsoff(q);
1266 mutex_destroy(&mir->mir_mutex);
1267 cv_destroy(&mir->mir_condvar);
1268 cv_destroy(&mir->mir_timer_cv);
1269 kmem_free(mir, sizeof (mir_t));
1270 return (0);
1274 * This is server side only (RPC_SERVER).
1276 * Exit idle mode.
1278 static void
1279 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1281 ASSERT(MUTEX_HELD(&mir->mir_mutex));
1282 ASSERT((q->q_flag & QREADR) == 0);
1283 ASSERT(mir->mir_type == RPC_SERVER);
1284 RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1286 mir_timer_stop(mir);
1290 * This is server side only (RPC_SERVER).
1292 * Start idle processing, which will include setting idle timer if the
1293 * stream is not being closed.
1295 static void
1296 mir_svc_idle_start(queue_t *q, mir_t *mir)
1298 ASSERT(MUTEX_HELD(&mir->mir_mutex));
1299 ASSERT((q->q_flag & QREADR) == 0);
1300 ASSERT(mir->mir_type == RPC_SERVER);
1301 RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1304 * Don't re-start idle timer if we are closing queues.
1306 if (mir->mir_closing) {
1307 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1308 (void *)q);
1311 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1312 * is true. When it is true, and we are in the process of
1313 * closing the stream, signal any thread waiting in
1314 * mir_close().
1316 if (mir->mir_inwservice == 0)
1317 cv_signal(&mir->mir_condvar);
1319 } else {
1320 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1321 mir->mir_ordrel_pending ? "ordrel" : "normal");
1323 * Normal condition, start the idle timer. If an orderly
1324 * release has been sent, set the timeout to wait for the
1325 * client to close its side of the connection. Otherwise,
1326 * use the normal idle timeout.
1328 mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1329 svc_ordrel_timeout : mir->mir_idle_timeout);
1333 /* ARGSUSED */
1334 static int
1335 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1337 mir_t *mir;
1339 RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1340 /* Set variables used directly by KRPC. */
1341 if (!mir_rele)
1342 mir_rele = mir_svc_release;
1343 if (!mir_start)
1344 mir_start = mir_svc_start;
1345 if (!clnt_stop_idle)
1346 clnt_stop_idle = mir_clnt_idle_do_stop;
1347 if (!clnt_max_msg_sizep)
1348 clnt_max_msg_sizep = &clnt_max_msg_size;
1349 if (!svc_max_msg_sizep)
1350 svc_max_msg_sizep = &svc_max_msg_size;
1352 /* Allocate a zero'ed out mir structure for this stream. */
1353 mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1356 * We set hold inbound here so that incoming messages will
1357 * be held on the read-side queue until the stream is completely
1358 * initialized with a RPC_CLIENT or RPC_SERVER ioctl. During
1359 * the ioctl processing, the flag is cleared and any messages that
1360 * arrived between the open and the ioctl are delivered to KRPC.
1362 * Early data should never arrive on a client stream since
1363 * servers only respond to our requests and we do not send any.
1364 * until after the stream is initialized. Early data is
1365 * very common on a server stream where the client will start
1366 * sending data as soon as the connection is made (and this
1367 * is especially true with TCP where the protocol accepts the
1368 * connection before nfsd or KRPC is notified about it).
1371 mir->mir_hold_inbound = 1;
1374 * Start the record marker looking for a 4-byte header. When
1375 * this length is negative, it indicates that rpcmod is looking
1376 * for bytes to consume for the record marker header. When it
1377 * is positive, it holds the number of bytes that have arrived
1378 * for the current fragment and are being held in mir_header_mp.
1381 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1383 mir->mir_zoneid = rpc_zoneid();
1384 mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1385 cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1386 cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1388 q->q_ptr = (char *)mir;
1389 WR(q)->q_ptr = (char *)mir;
1392 * We noenable the read-side queue because we don't want it
1393 * automatically enabled by putq. We enable it explicitly
1394 * in mir_wsrv when appropriate. (See additional comments on
1395 * flow control at the beginning of mir_rsrv.)
1397 noenable(q);
1399 qprocson(q);
1400 return (0);
1404 * Read-side put routine for both the client and server side. Does the
1405 * record marking for incoming RPC messages, and when complete, dispatches
1406 * the message to either the client or server.
1408 static void
1409 mir_rput(queue_t *q, mblk_t *mp)
1411 int excess;
1412 int32_t frag_len, frag_header;
1413 mblk_t *cont_mp, *head_mp, *tail_mp, *mp1;
1414 mir_t *mir = q->q_ptr;
1415 boolean_t stop_timer = B_FALSE;
1417 ASSERT(mir != NULL);
1420 * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1421 * with the corresponding ioctl, then don't accept
1422 * any inbound data. This should never happen for streams
1423 * created by nfsd or client-side KRPC because they are careful
1424 * to set the mode of the stream before doing anything else.
1426 if (mir->mir_type == 0) {
1427 freemsg(mp);
1428 return;
1431 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1433 switch (mp->b_datap->db_type) {
1434 case M_DATA:
1435 break;
1436 case M_PROTO:
1437 case M_PCPROTO:
1438 if (MBLKL(mp) < sizeof (t_scalar_t)) {
1439 RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1440 (int)MBLKL(mp));
1441 freemsg(mp);
1442 return;
1444 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1445 mir_rput_proto(q, mp);
1446 return;
1449 /* Throw away the T_DATA_IND block and continue with data. */
1450 mp1 = mp;
1451 mp = mp->b_cont;
1452 freeb(mp1);
1453 break;
1454 case M_SETOPTS:
1456 * If a module on the stream is trying set the Stream head's
1457 * high water mark, then set our hiwater to the requested
1458 * value. We are the "stream head" for all inbound
1459 * data messages since messages are passed directly to KRPC.
1461 if (MBLKL(mp) >= sizeof (struct stroptions)) {
1462 struct stroptions *stropts;
1464 stropts = (struct stroptions *)mp->b_rptr;
1465 if ((stropts->so_flags & SO_HIWAT) &&
1466 !(stropts->so_flags & SO_BAND)) {
1467 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1470 putnext(q, mp);
1471 return;
1472 case M_FLUSH:
1473 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1474 RPCLOG(32, "on q 0x%p\n", (void *)q);
1475 putnext(q, mp);
1476 return;
1477 default:
1478 putnext(q, mp);
1479 return;
1482 mutex_enter(&mir->mir_mutex);
1485 * If this connection is closing, don't accept any new messages.
1487 if (mir->mir_svc_no_more_msgs) {
1488 ASSERT(mir->mir_type == RPC_SERVER);
1489 mutex_exit(&mir->mir_mutex);
1490 freemsg(mp);
1491 return;
1494 /* Get local copies for quicker access. */
1495 frag_len = mir->mir_frag_len;
1496 frag_header = mir->mir_frag_header;
1497 head_mp = mir->mir_head_mp;
1498 tail_mp = mir->mir_tail_mp;
1500 /* Loop, processing each message block in the mp chain separately. */
1501 do {
1502 cont_mp = mp->b_cont;
1503 mp->b_cont = NULL;
1506 * Drop zero-length mblks to prevent unbounded kernel memory
1507 * consumption.
1509 if (MBLKL(mp) == 0) {
1510 freeb(mp);
1511 continue;
1515 * If frag_len is negative, we're still in the process of
1516 * building frag_header -- try to complete it with this mblk.
1518 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1519 frag_len++;
1520 frag_header <<= 8;
1521 frag_header += *mp->b_rptr++;
1524 if (MBLKL(mp) == 0 && frag_len < 0) {
1526 * We consumed this mblk while trying to complete the
1527 * fragment header. Free it and move on.
1529 freeb(mp);
1530 continue;
1533 ASSERT(frag_len >= 0);
1536 * Now frag_header has the number of bytes in this fragment
1537 * and we're just waiting to collect them all. Chain our
1538 * latest mblk onto the list and see if we now have enough
1539 * bytes to complete the fragment.
1541 if (head_mp == NULL) {
1542 ASSERT(tail_mp == NULL);
1543 head_mp = tail_mp = mp;
1544 } else {
1545 tail_mp->b_cont = mp;
1546 tail_mp = mp;
1549 frag_len += MBLKL(mp);
1550 excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1551 if (excess < 0) {
1553 * We still haven't received enough data to complete
1554 * the fragment, so continue on to the next mblk.
1556 continue;
1560 * We've got a complete fragment. If there are excess bytes,
1561 * then they're part of the next fragment's header (of either
1562 * this RPC message or the next RPC message). Split that part
1563 * into its own mblk so that we can safely freeb() it when
1564 * building frag_header above.
1566 if (excess > 0) {
1567 if ((mp1 = dupb(mp)) == NULL &&
1568 (mp1 = copyb(mp)) == NULL) {
1569 freemsg(head_mp);
1570 freemsg(cont_mp);
1571 RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1572 mir->mir_frag_header = 0;
1573 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1574 mir->mir_head_mp = NULL;
1575 mir->mir_tail_mp = NULL;
1576 mir_disconnect(q, mir); /* drops mir_mutex */
1577 return;
1581 * Relink the message chain so that the next mblk is
1582 * the next fragment header, followed by the rest of
1583 * the message chain.
1585 mp1->b_cont = cont_mp;
1586 cont_mp = mp1;
1589 * Data in the new mblk begins at the next fragment,
1590 * and data in the old mblk ends at the next fragment.
1592 mp1->b_rptr = mp1->b_wptr - excess;
1593 mp->b_wptr -= excess;
1597 * Reset frag_len and frag_header for the next fragment.
1599 frag_len = -(int32_t)sizeof (uint32_t);
1600 if (!(frag_header & MIR_LASTFRAG)) {
1602 * The current fragment is complete, but more
1603 * fragments need to be processed before we can
1604 * pass along the RPC message headed at head_mp.
1606 frag_header = 0;
1607 continue;
1609 frag_header = 0;
1612 * We've got a complete RPC message; pass it to the
1613 * appropriate consumer.
1615 switch (mir->mir_type) {
1616 case RPC_CLIENT:
1617 if (clnt_dispatch_notify(head_mp, mir->mir_zoneid)) {
1619 * Mark this stream as active. This marker
1620 * is used in mir_timer().
1622 mir->mir_clntreq = 1;
1623 mir->mir_use_timestamp = ddi_get_lbolt();
1624 } else {
1625 freemsg(head_mp);
1627 break;
1629 case RPC_SERVER:
1631 * Check for flow control before passing the
1632 * message to KRPC.
1634 if (!mir->mir_hold_inbound) {
1635 if (mir->mir_krpc_cell) {
1637 * If the reference count is 0
1638 * (not including this request),
1639 * then the stream is transitioning
1640 * from idle to non-idle. In this case,
1641 * we cancel the idle timer.
1643 if (mir->mir_ref_cnt++ == 0)
1644 stop_timer = B_TRUE;
1645 if (mir_check_len(q,
1646 (int32_t)msgdsize(mp), mp))
1647 return;
1648 svc_queuereq(q, head_mp); /* to KRPC */
1649 } else {
1651 * Count # of times this happens. Should
1652 * be never, but experience shows
1653 * otherwise.
1655 mir_krpc_cell_null++;
1656 freemsg(head_mp);
1658 } else {
1660 * If the outbound side of the stream is
1661 * flow controlled, then hold this message
1662 * until client catches up. mir_hold_inbound
1663 * is set in mir_wput and cleared in mir_wsrv.
1665 (void) putq(q, head_mp);
1666 mir->mir_inrservice = B_TRUE;
1668 break;
1669 default:
1670 RPCLOG(1, "mir_rput: unknown mir_type %d\n",
1671 mir->mir_type);
1672 freemsg(head_mp);
1673 break;
1677 * Reset the chain since we're starting on a new RPC message.
1679 head_mp = tail_mp = NULL;
1680 } while ((mp = cont_mp) != NULL);
1683 * Sanity check the message length; if it's too large mir_check_len()
1684 * will shutdown the connection, drop mir_mutex, and return non-zero.
1686 if (head_mp != NULL && mir->mir_setup_complete &&
1687 mir_check_len(q, frag_len, head_mp))
1688 return;
1690 /* Save our local copies back in the mir structure. */
1691 mir->mir_frag_header = frag_header;
1692 mir->mir_frag_len = frag_len;
1693 mir->mir_head_mp = head_mp;
1694 mir->mir_tail_mp = tail_mp;
1697 * The timer is stopped after the whole message chain is processed.
1698 * The reason is that stopping the timer releases the mir_mutex
1699 * lock temporarily. This means that the request can be serviced
1700 * while we are still processing the message chain. This is not
1701 * good. So we stop the timer here instead.
1703 * Note that if the timer fires before we stop it, it will not
1704 * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
1705 * will just return.
1707 if (stop_timer) {
1708 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
1709 "ref cnt going to non zero\n", (void *)WR(q));
1710 mir_svc_idle_stop(WR(q), mir);
1712 mutex_exit(&mir->mir_mutex);
1715 static void
1716 mir_rput_proto(queue_t *q, mblk_t *mp)
1718 mir_t *mir = (mir_t *)q->q_ptr;
1719 uint32_t type;
1720 uint32_t reason = 0;
1722 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1724 type = ((union T_primitives *)mp->b_rptr)->type;
1725 switch (mir->mir_type) {
1726 case RPC_CLIENT:
1727 switch (type) {
1728 case T_DISCON_IND:
1729 reason = ((struct T_discon_ind *)
1730 (mp->b_rptr))->DISCON_reason;
1731 /*FALLTHROUGH*/
1732 case T_ORDREL_IND:
1733 mutex_enter(&mir->mir_mutex);
1734 if (mir->mir_head_mp) {
1735 freemsg(mir->mir_head_mp);
1736 mir->mir_head_mp = (mblk_t *)0;
1737 mir->mir_tail_mp = (mblk_t *)0;
1740 * We are disconnecting, but not necessarily
1741 * closing. By not closing, we will fail to
1742 * pick up a possibly changed global timeout value,
1743 * unless we store it now.
1745 mir->mir_idle_timeout = clnt_idle_timeout;
1746 mir_clnt_idle_stop(WR(q), mir);
1749 * Even though we are unconnected, we still
1750 * leave the idle timer going on the client. The
1751 * reason for is that if we've disconnected due
1752 * to a server-side disconnect, reset, or connection
1753 * timeout, there is a possibility the client may
1754 * retry the RPC request. This retry needs to done on
1755 * the same bound address for the server to interpret
1756 * it as such. However, we don't want
1757 * to wait forever for that possibility. If the
1758 * end-point stays unconnected for mir_idle_timeout
1759 * units of time, then that is a signal to the
1760 * connection manager to give up waiting for the
1761 * application (eg. NFS) to send a retry.
1763 mir_clnt_idle_start(WR(q), mir);
1764 mutex_exit(&mir->mir_mutex);
1765 clnt_dispatch_notifyall(WR(q), type, reason);
1766 freemsg(mp);
1767 return;
1768 case T_ERROR_ACK:
1770 struct T_error_ack *terror;
1772 terror = (struct T_error_ack *)mp->b_rptr;
1773 RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
1774 (void *)q);
1775 RPCLOG(1, " ERROR_prim: %s,",
1776 rpc_tpiprim2name(terror->ERROR_prim));
1777 RPCLOG(1, " TLI_error: %s,",
1778 rpc_tpierr2name(terror->TLI_error));
1779 RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
1780 if (terror->ERROR_prim == T_DISCON_REQ) {
1781 clnt_dispatch_notifyall(WR(q), type, reason);
1782 freemsg(mp);
1783 return;
1784 } else {
1785 if (clnt_dispatch_notifyconn(WR(q), mp))
1786 return;
1788 break;
1790 case T_OK_ACK:
1792 struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr;
1794 if (tok->CORRECT_prim == T_DISCON_REQ) {
1795 clnt_dispatch_notifyall(WR(q), type, reason);
1796 freemsg(mp);
1797 return;
1798 } else {
1799 if (clnt_dispatch_notifyconn(WR(q), mp))
1800 return;
1802 break;
1804 case T_CONN_CON:
1805 case T_INFO_ACK:
1806 case T_OPTMGMT_ACK:
1807 if (clnt_dispatch_notifyconn(WR(q), mp))
1808 return;
1809 break;
1810 case T_BIND_ACK:
1811 break;
1812 default:
1813 RPCLOG(1, "mir_rput: unexpected message %d "
1814 "for KRPC client\n",
1815 ((union T_primitives *)mp->b_rptr)->type);
1816 break;
1818 break;
1820 case RPC_SERVER:
1821 switch (type) {
1822 case T_BIND_ACK:
1824 struct T_bind_ack *tbind;
1827 * If this is a listening stream, then shut
1828 * off the idle timer.
1830 tbind = (struct T_bind_ack *)mp->b_rptr;
1831 if (tbind->CONIND_number > 0) {
1832 mutex_enter(&mir->mir_mutex);
1833 mir_svc_idle_stop(WR(q), mir);
1836 * mark this as a listen endpoint
1837 * for special handling.
1840 mir->mir_listen_stream = 1;
1841 mutex_exit(&mir->mir_mutex);
1843 break;
1845 case T_DISCON_IND:
1846 case T_ORDREL_IND:
1847 RPCLOG(16, "mir_rput_proto: got %s indication\n",
1848 type == T_DISCON_IND ? "disconnect"
1849 : "orderly release");
1852 * For listen endpoint just pass
1853 * on the message.
1856 if (mir->mir_listen_stream)
1857 break;
1859 mutex_enter(&mir->mir_mutex);
1862 * If client wants to break off connection, record
1863 * that fact.
1865 mir_svc_start_close(WR(q), mir);
1868 * If we are idle, then send the orderly release
1869 * or disconnect indication to nfsd.
1871 if (MIR_SVC_QUIESCED(mir)) {
1872 mutex_exit(&mir->mir_mutex);
1873 break;
1876 RPCLOG(16, "mir_rput_proto: not idle, so "
1877 "disconnect/ord rel indication not passed "
1878 "upstream on 0x%p\n", (void *)q);
1881 * Hold the indication until we get idle
1882 * If there already is an indication stored,
1883 * replace it if the new one is a disconnect. The
1884 * reasoning is that disconnection takes less time
1885 * to process, and once a client decides to
1886 * disconnect, we should do that.
1888 if (mir->mir_svc_pend_mp) {
1889 if (type == T_DISCON_IND) {
1890 RPCLOG(16, "mir_rput_proto: replacing"
1891 " held disconnect/ord rel"
1892 " indication with disconnect on"
1893 " 0x%p\n", (void *)q);
1895 freemsg(mir->mir_svc_pend_mp);
1896 mir->mir_svc_pend_mp = mp;
1897 } else {
1898 RPCLOG(16, "mir_rput_proto: already "
1899 "held a disconnect/ord rel "
1900 "indication. freeing ord rel "
1901 "ind on 0x%p\n", (void *)q);
1902 freemsg(mp);
1904 } else
1905 mir->mir_svc_pend_mp = mp;
1907 mutex_exit(&mir->mir_mutex);
1908 return;
1910 default:
1911 /* nfsd handles server-side non-data messages. */
1912 break;
1914 break;
1916 default:
1917 break;
1920 putnext(q, mp);
1924 * The server-side read queues are used to hold inbound messages while
1925 * outbound flow control is exerted. When outbound flow control is
1926 * relieved, mir_wsrv qenables the read-side queue. Read-side queues
1927 * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
1929 * For the server side, we have two types of messages queued. The first type
1930 * are messages that are ready to be XDR decoded and and then sent to the
1931 * RPC program's dispatch routine. The second type are "raw" messages that
1932 * haven't been processed, i.e. assembled from rpc record fragements into
1933 * full requests. The only time we will see the second type of message
1934 * queued is if we have a memory allocation failure while processing a
1935 * a raw message. The field mir_first_non_processed_mblk will mark the
1936 * first such raw message. So the flow for server side is:
1938 * - send processed queued messages to kRPC until we run out or find
1939 * one that needs additional processing because we were short on memory
1940 * earlier
1941 * - process a message that was deferred because of lack of
1942 * memory
1943 * - continue processing messages until the queue empties or we
1944 * have to stop because of lack of memory
1945 * - during each of the above phase, if the queue is empty and
1946 * there are no pending messages that were passed to the RPC
1947 * layer, send upstream the pending disconnect/ordrel indication if
1948 * there is one
1950 * The read-side queue is also enabled by a bufcall callback if dupmsg
1951 * fails in mir_rput.
1953 static void
1954 mir_rsrv(queue_t *q)
1956 mir_t *mir;
1957 mblk_t *mp;
1958 mblk_t *cmp = NULL;
1959 boolean_t stop_timer = B_FALSE;
1961 mir = (mir_t *)q->q_ptr;
1962 mutex_enter(&mir->mir_mutex);
1964 mp = NULL;
1965 switch (mir->mir_type) {
1966 case RPC_SERVER:
1967 if (mir->mir_ref_cnt == 0)
1968 mir->mir_hold_inbound = 0;
1969 if (mir->mir_hold_inbound) {
1971 ASSERT(cmp == NULL);
1972 if (q->q_first == NULL) {
1974 MIR_CLEAR_INRSRV(mir);
1976 if (MIR_SVC_QUIESCED(mir)) {
1977 cmp = mir->mir_svc_pend_mp;
1978 mir->mir_svc_pend_mp = NULL;
1982 mutex_exit(&mir->mir_mutex);
1984 if (cmp != NULL) {
1985 RPCLOG(16, "mir_rsrv: line %d: sending a held "
1986 "disconnect/ord rel indication upstream\n",
1987 __LINE__);
1988 putnext(q, cmp);
1991 return;
1993 while (mp = getq(q)) {
1994 if (mir->mir_krpc_cell &&
1995 (mir->mir_svc_no_more_msgs == 0)) {
1997 * If we were idle, turn off idle timer since
1998 * we aren't idle any more.
2000 if (mir->mir_ref_cnt++ == 0)
2001 stop_timer = B_TRUE;
2002 if (mir_check_len(q,
2003 (int32_t)msgdsize(mp), mp))
2004 return;
2005 svc_queuereq(q, mp);
2006 } else {
2008 * Count # of times this happens. Should be
2009 * never, but experience shows otherwise.
2011 if (mir->mir_krpc_cell == NULL)
2012 mir_krpc_cell_null++;
2013 freemsg(mp);
2016 break;
2017 case RPC_CLIENT:
2018 break;
2019 default:
2020 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
2022 if (q->q_first == NULL)
2023 MIR_CLEAR_INRSRV(mir);
2025 mutex_exit(&mir->mir_mutex);
2027 return;
2031 * The timer is stopped after all the messages are processed.
2032 * The reason is that stopping the timer releases the mir_mutex
2033 * lock temporarily. This means that the request can be serviced
2034 * while we are still processing the message queue. This is not
2035 * good. So we stop the timer here instead.
2037 if (stop_timer) {
2038 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
2039 "cnt going to non zero\n", (void *)WR(q));
2040 mir_svc_idle_stop(WR(q), mir);
2043 if (q->q_first == NULL) {
2045 MIR_CLEAR_INRSRV(mir);
2047 ASSERT(cmp == NULL);
2048 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
2049 cmp = mir->mir_svc_pend_mp;
2050 mir->mir_svc_pend_mp = NULL;
2053 mutex_exit(&mir->mir_mutex);
2055 if (cmp != NULL) {
2056 RPCLOG(16, "mir_rsrv: line %d: sending a held "
2057 "disconnect/ord rel indication upstream\n",
2058 __LINE__);
2059 putnext(q, cmp);
2062 return;
2064 mutex_exit(&mir->mir_mutex);
2067 static int mir_svc_policy_fails;
2070 * Called to send an event code to nfsd/lockd so that it initiates
2071 * connection close.
2073 static int
2074 mir_svc_policy_notify(queue_t *q, int event)
2076 mblk_t *mp;
2077 #ifdef DEBUG
2078 mir_t *mir = (mir_t *)q->q_ptr;
2079 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2080 #endif
2081 ASSERT(q->q_flag & QREADR);
2084 * Create an M_DATA message with the event code and pass it to the
2085 * Stream head (nfsd or whoever created the stream will consume it).
2087 mp = allocb(sizeof (int), BPRI_HI);
2089 if (!mp) {
2091 mir_svc_policy_fails++;
2092 RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
2093 "%d\n", event);
2094 return (ENOMEM);
2097 U32_TO_BE32(event, mp->b_rptr);
2098 mp->b_wptr = mp->b_rptr + sizeof (int);
2099 putnext(q, mp);
2100 return (0);
2104 * Server side: start the close phase. We want to get this rpcmod slot in an
2105 * idle state before mir_close() is called.
2107 static void
2108 mir_svc_start_close(queue_t *wq, mir_t *mir)
2110 ASSERT(MUTEX_HELD(&mir->mir_mutex));
2111 ASSERT((wq->q_flag & QREADR) == 0);
2112 ASSERT(mir->mir_type == RPC_SERVER);
2116 * Do not accept any more messages.
2118 mir->mir_svc_no_more_msgs = 1;
2121 * Next two statements will make the read service procedure invoke
2122 * svc_queuereq() on everything stuck in the streams read queue.
2123 * It's not necessary because enabling the write queue will
2124 * have the same effect, but why not speed the process along?
2126 mir->mir_hold_inbound = 0;
2127 qenable(RD(wq));
2130 * Meanwhile force the write service procedure to send the
2131 * responses downstream, regardless of flow control.
2133 qenable(wq);
2137 * This routine is called directly by KRPC after a request is completed,
2138 * whether a reply was sent or the request was dropped.
2140 static void
2141 mir_svc_release(queue_t *wq, mblk_t *mp)
2143 mir_t *mir = (mir_t *)wq->q_ptr;
2144 mblk_t *cmp = NULL;
2146 ASSERT((wq->q_flag & QREADR) == 0);
2147 if (mp)
2148 freemsg(mp);
2150 mutex_enter(&mir->mir_mutex);
2153 * Start idle processing if this is the last reference.
2155 if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2156 cmp = mir->mir_svc_pend_mp;
2157 mir->mir_svc_pend_mp = NULL;
2160 if (cmp) {
2161 RPCLOG(16, "mir_svc_release: sending a held "
2162 "disconnect/ord rel indication upstream on queue 0x%p\n",
2163 (void *)RD(wq));
2165 mutex_exit(&mir->mir_mutex);
2167 putnext(RD(wq), cmp);
2169 mutex_enter(&mir->mir_mutex);
2173 * Start idle processing if this is the last reference.
2175 if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2177 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2178 "because ref cnt is zero\n", (void *) wq);
2180 mir_svc_idle_start(wq, mir);
2183 mir->mir_ref_cnt--;
2184 ASSERT(mir->mir_ref_cnt >= 0);
2187 * Wake up the thread waiting to close.
2190 if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2191 cv_signal(&mir->mir_condvar);
2193 mutex_exit(&mir->mir_mutex);
2197 * This routine is called by server-side KRPC when it is ready to
2198 * handle inbound messages on the stream.
2200 static void
2201 mir_svc_start(queue_t *wq)
2203 mir_t *mir = (mir_t *)wq->q_ptr;
2206 * no longer need to take the mir_mutex because the
2207 * mir_setup_complete field has been moved out of
2208 * the binary field protected by the mir_mutex.
2211 mir->mir_setup_complete = 1;
2212 qenable(RD(wq));
2216 * client side wrapper for stopping timer with normal idle timeout.
2218 static void
2219 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2221 ASSERT(MUTEX_HELD(&mir->mir_mutex));
2222 ASSERT((wq->q_flag & QREADR) == 0);
2223 ASSERT(mir->mir_type == RPC_CLIENT);
2225 mir_timer_stop(mir);
2229 * client side wrapper for stopping timer with normal idle timeout.
2231 static void
2232 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2234 ASSERT(MUTEX_HELD(&mir->mir_mutex));
2235 ASSERT((wq->q_flag & QREADR) == 0);
2236 ASSERT(mir->mir_type == RPC_CLIENT);
2238 mir_timer_start(wq, mir, mir->mir_idle_timeout);
2242 * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2243 * end-points that aren't connected.
2245 static void
2246 mir_clnt_idle_do_stop(queue_t *wq)
2248 mir_t *mir = (mir_t *)wq->q_ptr;
2250 RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2251 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2252 mutex_enter(&mir->mir_mutex);
2253 mir_clnt_idle_stop(wq, mir);
2254 mutex_exit(&mir->mir_mutex);
2258 * Timer handler. It handles idle timeout and memory shortage problem.
2260 static void
2261 mir_timer(void *arg)
2263 queue_t *wq = (queue_t *)arg;
2264 mir_t *mir = (mir_t *)wq->q_ptr;
2265 boolean_t notify;
2266 clock_t now;
2268 mutex_enter(&mir->mir_mutex);
2271 * mir_timer_call is set only when either mir_timer_[start|stop]
2272 * is progressing. And mir_timer() can only be run while they
2273 * are progressing if the timer is being stopped. So just
2274 * return.
2276 if (mir->mir_timer_call) {
2277 mutex_exit(&mir->mir_mutex);
2278 return;
2280 mir->mir_timer_id = 0;
2282 switch (mir->mir_type) {
2283 case RPC_CLIENT:
2286 * For clients, the timer fires at clnt_idle_timeout
2287 * intervals. If the activity marker (mir_clntreq) is
2288 * zero, then the stream has been idle since the last
2289 * timer event and we notify KRPC. If mir_clntreq is
2290 * non-zero, then the stream is active and we just
2291 * restart the timer for another interval. mir_clntreq
2292 * is set to 1 in mir_wput for every request passed
2293 * downstream.
2295 * If this was a memory shortage timer reset the idle
2296 * timeout regardless; the mir_clntreq will not be a
2297 * valid indicator.
2299 * The timer is initially started in mir_wput during
2300 * RPC_CLIENT ioctl processing.
2302 * The timer interval can be changed for individual
2303 * streams with the ND variable "mir_idle_timeout".
2305 now = ddi_get_lbolt();
2306 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2307 MSEC_TO_TICK(mir->mir_idle_timeout) - now >= 0) {
2308 clock_t tout;
2310 tout = mir->mir_idle_timeout -
2311 TICK_TO_MSEC(now - mir->mir_use_timestamp);
2312 if (tout < 0)
2313 tout = 1000;
2314 #if 0
2315 printf("mir_timer[%d < %d + %d]: reset client timer "
2316 "to %d (ms)\n", TICK_TO_MSEC(now),
2317 TICK_TO_MSEC(mir->mir_use_timestamp),
2318 mir->mir_idle_timeout, tout);
2319 #endif
2320 mir->mir_clntreq = 0;
2321 mir_timer_start(wq, mir, tout);
2322 mutex_exit(&mir->mir_mutex);
2323 return;
2325 #if 0
2326 printf("mir_timer[%d]: doing client timeout\n", now / hz);
2327 #endif
2329 * We are disconnecting, but not necessarily
2330 * closing. By not closing, we will fail to
2331 * pick up a possibly changed global timeout value,
2332 * unless we store it now.
2334 mir->mir_idle_timeout = clnt_idle_timeout;
2335 mir_clnt_idle_start(wq, mir);
2337 mutex_exit(&mir->mir_mutex);
2339 * We pass T_ORDREL_REQ as an integer value
2340 * to KRPC as the indication that the stream
2341 * is idle. This is not a T_ORDREL_REQ message,
2342 * it is just a convenient value since we call
2343 * the same KRPC routine for T_ORDREL_INDs and
2344 * T_DISCON_INDs.
2346 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2347 return;
2349 case RPC_SERVER:
2352 * For servers, the timer is only running when the stream
2353 * is really idle or memory is short. The timer is started
2354 * by mir_wput when mir_type is set to RPC_SERVER and
2355 * by mir_svc_idle_start whenever the stream goes idle
2356 * (mir_ref_cnt == 0). The timer is cancelled in
2357 * mir_rput whenever a new inbound request is passed to KRPC
2358 * and the stream was previously idle.
2360 * The timer interval can be changed for individual
2361 * streams with the ND variable "mir_idle_timeout".
2363 * If the stream is not idle do nothing.
2365 if (!MIR_SVC_QUIESCED(mir)) {
2366 mutex_exit(&mir->mir_mutex);
2367 return;
2370 notify = !mir->mir_inrservice;
2371 mutex_exit(&mir->mir_mutex);
2374 * If there is no packet queued up in read queue, the stream
2375 * is really idle so notify nfsd to close it.
2377 if (notify) {
2378 RPCLOG(16, "mir_timer: telling stream head listener "
2379 "to close stream (0x%p)\n", (void *) RD(wq));
2380 (void) mir_svc_policy_notify(RD(wq), 1);
2382 return;
2383 default:
2384 RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2385 mir->mir_type);
2386 mutex_exit(&mir->mir_mutex);
2387 return;
2392 * Called by the RPC package to send either a call or a return, or a
2393 * transport connection request. Adds the record marking header.
2395 static void
2396 mir_wput(queue_t *q, mblk_t *mp)
2398 uint_t frag_header;
2399 mir_t *mir = (mir_t *)q->q_ptr;
2400 uchar_t *rptr = mp->b_rptr;
2402 if (!mir) {
2403 freemsg(mp);
2404 return;
2407 if (mp->b_datap->db_type != M_DATA) {
2408 mir_wput_other(q, mp);
2409 return;
2412 if (mir->mir_ordrel_pending == 1) {
2413 freemsg(mp);
2414 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2415 (void *)q);
2416 return;
2419 frag_header = (uint_t)DLEN(mp);
2420 frag_header |= MIR_LASTFRAG;
2422 /* Stick in the 4 byte record marking header. */
2423 if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2424 !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2426 * Since we know that M_DATA messages are created exclusively
2427 * by KRPC, we expect that KRPC will leave room for our header
2428 * and 4 byte align which is normal for XDR.
2429 * If KRPC (or someone else) does not cooperate, then we
2430 * just throw away the message.
2432 RPCLOG(1, "mir_wput: KRPC did not leave space for record "
2433 "fragment header (%d bytes left)\n",
2434 (int)(rptr - mp->b_datap->db_base));
2435 freemsg(mp);
2436 return;
2438 rptr -= sizeof (uint32_t);
2439 *(uint32_t *)rptr = htonl(frag_header);
2440 mp->b_rptr = rptr;
2442 mutex_enter(&mir->mir_mutex);
2443 if (mir->mir_type == RPC_CLIENT) {
2445 * For the client, set mir_clntreq to indicate that the
2446 * connection is active.
2448 mir->mir_clntreq = 1;
2449 mir->mir_use_timestamp = ddi_get_lbolt();
2453 * If we haven't already queued some data and the downstream module
2454 * can accept more data, send it on, otherwise we queue the message
2455 * and take other actions depending on mir_type.
2457 if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2458 mutex_exit(&mir->mir_mutex);
2461 * Now we pass the RPC message downstream.
2463 putnext(q, mp);
2464 return;
2467 switch (mir->mir_type) {
2468 case RPC_CLIENT:
2470 * Check for a previous duplicate request on the
2471 * queue. If there is one, then we throw away
2472 * the current message and let the previous one
2473 * go through. If we can't find a duplicate, then
2474 * send this one. This tap dance is an effort
2475 * to reduce traffic and processing requirements
2476 * under load conditions.
2478 if (mir_clnt_dup_request(q, mp)) {
2479 mutex_exit(&mir->mir_mutex);
2480 freemsg(mp);
2481 return;
2483 break;
2484 case RPC_SERVER:
2486 * Set mir_hold_inbound so that new inbound RPC
2487 * messages will be held until the client catches
2488 * up on the earlier replies. This flag is cleared
2489 * in mir_wsrv after flow control is relieved;
2490 * the read-side queue is also enabled at that time.
2492 mir->mir_hold_inbound = 1;
2493 break;
2494 default:
2495 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2496 break;
2498 mir->mir_inwservice = 1;
2499 (void) putq(q, mp);
2500 mutex_exit(&mir->mir_mutex);
2503 static void
2504 mir_wput_other(queue_t *q, mblk_t *mp)
2506 mir_t *mir = (mir_t *)q->q_ptr;
2507 struct iocblk *iocp;
2508 uchar_t *rptr = mp->b_rptr;
2509 bool_t flush_in_svc = FALSE;
2511 ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2512 switch (mp->b_datap->db_type) {
2513 case M_IOCTL:
2514 iocp = (struct iocblk *)rptr;
2515 switch (iocp->ioc_cmd) {
2516 case RPC_CLIENT:
2517 mutex_enter(&mir->mir_mutex);
2518 if (mir->mir_type != 0 &&
2519 mir->mir_type != iocp->ioc_cmd) {
2520 ioc_eperm:
2521 mutex_exit(&mir->mir_mutex);
2522 iocp->ioc_error = EPERM;
2523 iocp->ioc_count = 0;
2524 mp->b_datap->db_type = M_IOCACK;
2525 qreply(q, mp);
2526 return;
2529 mir->mir_type = iocp->ioc_cmd;
2532 * Clear mir_hold_inbound which was set to 1 by
2533 * mir_open. This flag is not used on client
2534 * streams.
2536 mir->mir_hold_inbound = 0;
2537 mir->mir_max_msg_sizep = &clnt_max_msg_size;
2540 * Start the idle timer. See mir_timer() for more
2541 * information on how client timers work.
2543 mir->mir_idle_timeout = clnt_idle_timeout;
2544 mir_clnt_idle_start(q, mir);
2545 mutex_exit(&mir->mir_mutex);
2547 mp->b_datap->db_type = M_IOCACK;
2548 qreply(q, mp);
2549 return;
2550 case RPC_SERVER:
2551 mutex_enter(&mir->mir_mutex);
2552 if (mir->mir_type != 0 &&
2553 mir->mir_type != iocp->ioc_cmd)
2554 goto ioc_eperm;
2557 * We don't clear mir_hold_inbound here because
2558 * mir_hold_inbound is used in the flow control
2559 * model. If we cleared it here, then we'd commit
2560 * a small violation to the model where the transport
2561 * might immediately block downstream flow.
2564 mir->mir_type = iocp->ioc_cmd;
2565 mir->mir_max_msg_sizep = &svc_max_msg_size;
2568 * Start the idle timer. See mir_timer() for more
2569 * information on how server timers work.
2571 * Note that it is important to start the idle timer
2572 * here so that connections time out even if we
2573 * never receive any data on them.
2575 mir->mir_idle_timeout = svc_idle_timeout;
2576 RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2577 "because we got RPC_SERVER ioctl\n", (void *)q);
2578 mir_svc_idle_start(q, mir);
2579 mutex_exit(&mir->mir_mutex);
2581 mp->b_datap->db_type = M_IOCACK;
2582 qreply(q, mp);
2583 return;
2584 default:
2585 break;
2587 break;
2589 case M_PROTO:
2590 if (mir->mir_type == RPC_CLIENT) {
2592 * We are likely being called from the context of a
2593 * service procedure. So we need to enqueue. However
2594 * enqueing may put our message behind data messages.
2595 * So flush the data first.
2597 flush_in_svc = TRUE;
2599 if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2600 !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2601 break;
2603 switch (((union T_primitives *)rptr)->type) {
2604 case T_DATA_REQ:
2605 /* Don't pass T_DATA_REQ messages downstream. */
2606 freemsg(mp);
2607 return;
2608 case T_ORDREL_REQ:
2609 RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2610 (void *)q);
2611 mutex_enter(&mir->mir_mutex);
2612 if (mir->mir_type != RPC_SERVER) {
2614 * We are likely being called from
2615 * clnt_dispatch_notifyall(). Sending
2616 * a T_ORDREL_REQ will result in
2617 * a some kind of _IND message being sent,
2618 * will be another call to
2619 * clnt_dispatch_notifyall(). To keep the stack
2620 * lean, queue this message.
2622 mir->mir_inwservice = 1;
2623 (void) putq(q, mp);
2624 mutex_exit(&mir->mir_mutex);
2625 return;
2629 * Mark the structure such that we don't accept any
2630 * more requests from client. We could defer this
2631 * until we actually send the orderly release
2632 * request downstream, but all that does is delay
2633 * the closing of this stream.
2635 RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
2636 " so calling mir_svc_start_close\n", (void *)q);
2638 mir_svc_start_close(q, mir);
2641 * If we have sent down a T_ORDREL_REQ, don't send
2642 * any more.
2644 if (mir->mir_ordrel_pending) {
2645 freemsg(mp);
2646 mutex_exit(&mir->mir_mutex);
2647 return;
2651 * If the stream is not idle, then we hold the
2652 * orderly release until it becomes idle. This
2653 * ensures that KRPC will be able to reply to
2654 * all requests that we have passed to it.
2656 * We also queue the request if there is data already
2657 * queued, because we cannot allow the T_ORDREL_REQ
2658 * to go before data. When we had a separate reply
2659 * count, this was not a problem, because the
2660 * reply count was reconciled when mir_wsrv()
2661 * completed.
2663 if (!MIR_SVC_QUIESCED(mir) ||
2664 mir->mir_inwservice == 1) {
2665 mir->mir_inwservice = 1;
2666 (void) putq(q, mp);
2668 RPCLOG(16, "mir_wput_other: queuing "
2669 "T_ORDREL_REQ on 0x%p\n", (void *)q);
2671 mutex_exit(&mir->mir_mutex);
2672 return;
2676 * Mark the structure so that we know we sent
2677 * an orderly release request, and reset the idle timer.
2679 mir->mir_ordrel_pending = 1;
2681 RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
2682 " on 0x%p because we got T_ORDREL_REQ\n",
2683 (void *)q);
2685 mir_svc_idle_start(q, mir);
2686 mutex_exit(&mir->mir_mutex);
2689 * When we break, we will putnext the T_ORDREL_REQ.
2691 break;
2693 case T_CONN_REQ:
2694 mutex_enter(&mir->mir_mutex);
2695 if (mir->mir_head_mp != NULL) {
2696 freemsg(mir->mir_head_mp);
2697 mir->mir_head_mp = NULL;
2698 mir->mir_tail_mp = NULL;
2700 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2702 * Restart timer in case mir_clnt_idle_do_stop() was
2703 * called.
2705 mir->mir_idle_timeout = clnt_idle_timeout;
2706 mir_clnt_idle_stop(q, mir);
2707 mir_clnt_idle_start(q, mir);
2708 mutex_exit(&mir->mir_mutex);
2709 break;
2711 default:
2713 * T_DISCON_REQ is one of the interesting default
2714 * cases here. Ideally, an M_FLUSH is done before
2715 * T_DISCON_REQ is done. However, that is somewhat
2716 * cumbersome for clnt_cots.c to do. So we queue
2717 * T_DISCON_REQ, and let the service procedure
2718 * flush all M_DATA.
2720 break;
2722 /* fallthru */;
2723 default:
2724 if (mp->b_datap->db_type >= QPCTL) {
2725 if (mp->b_datap->db_type == M_FLUSH) {
2726 if (mir->mir_type == RPC_CLIENT &&
2727 *mp->b_rptr & FLUSHW) {
2728 RPCLOG(32, "mir_wput_other: flushing "
2729 "wq 0x%p\n", (void *)q);
2730 if (*mp->b_rptr & FLUSHBAND) {
2731 flushband(q, *(mp->b_rptr + 1),
2732 FLUSHDATA);
2733 } else {
2734 flushq(q, FLUSHDATA);
2736 } else {
2737 RPCLOG(32, "mir_wput_other: ignoring "
2738 "M_FLUSH on wq 0x%p\n", (void *)q);
2741 break;
2744 mutex_enter(&mir->mir_mutex);
2745 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
2746 mutex_exit(&mir->mir_mutex);
2747 break;
2749 mir->mir_inwservice = 1;
2750 mir->mir_inwflushdata = flush_in_svc;
2751 (void) putq(q, mp);
2752 mutex_exit(&mir->mir_mutex);
2753 qenable(q);
2755 return;
2757 putnext(q, mp);
2760 static void
2761 mir_wsrv(queue_t *q)
2763 mblk_t *mp;
2764 mir_t *mir;
2765 bool_t flushdata;
2767 mir = (mir_t *)q->q_ptr;
2768 mutex_enter(&mir->mir_mutex);
2770 flushdata = mir->mir_inwflushdata;
2771 mir->mir_inwflushdata = 0;
2773 while (mp = getq(q)) {
2774 if (mp->b_datap->db_type == M_DATA) {
2776 * Do not send any more data if we have sent
2777 * a T_ORDREL_REQ.
2779 if (flushdata || mir->mir_ordrel_pending == 1) {
2780 freemsg(mp);
2781 continue;
2785 * Make sure that the stream can really handle more
2786 * data.
2788 if (!MIR_WCANPUTNEXT(mir, q)) {
2789 (void) putbq(q, mp);
2790 mutex_exit(&mir->mir_mutex);
2791 return;
2795 * Now we pass the RPC message downstream.
2797 mutex_exit(&mir->mir_mutex);
2798 putnext(q, mp);
2799 mutex_enter(&mir->mir_mutex);
2800 continue;
2804 * This is not an RPC message, pass it downstream
2805 * (ignoring flow control) if the server side is not sending a
2806 * T_ORDREL_REQ downstream.
2808 if (mir->mir_type != RPC_SERVER ||
2809 ((union T_primitives *)mp->b_rptr)->type !=
2810 T_ORDREL_REQ) {
2811 mutex_exit(&mir->mir_mutex);
2812 putnext(q, mp);
2813 mutex_enter(&mir->mir_mutex);
2814 continue;
2817 if (mir->mir_ordrel_pending == 1) {
2819 * Don't send two T_ORDRELs
2821 freemsg(mp);
2822 continue;
2826 * Mark the structure so that we know we sent an orderly
2827 * release request. We will check to see slot is idle at the
2828 * end of this routine, and if so, reset the idle timer to
2829 * handle orderly release timeouts.
2831 mir->mir_ordrel_pending = 1;
2832 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
2833 (void *)q);
2835 * Send the orderly release downstream. If there are other
2836 * pending replies we won't be able to send them. However,
2837 * the only reason we should send the orderly release is if
2838 * we were idle, or if an unusual event occurred.
2840 mutex_exit(&mir->mir_mutex);
2841 putnext(q, mp);
2842 mutex_enter(&mir->mir_mutex);
2845 if (q->q_first == NULL)
2847 * If we call mir_svc_idle_start() below, then
2848 * clearing mir_inwservice here will also result in
2849 * any thread waiting in mir_close() to be signaled.
2851 mir->mir_inwservice = 0;
2853 if (mir->mir_type != RPC_SERVER) {
2854 mutex_exit(&mir->mir_mutex);
2855 return;
2859 * If idle we call mir_svc_idle_start to start the timer (or wakeup
2860 * a close). Also make sure not to start the idle timer on the
2861 * listener stream. This can cause nfsd to send an orderly release
2862 * command on the listener stream.
2864 if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
2865 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
2866 "because mir slot is idle\n", (void *)q);
2867 mir_svc_idle_start(q, mir);
2871 * If outbound flow control has been relieved, then allow new
2872 * inbound requests to be processed.
2874 if (mir->mir_hold_inbound) {
2875 mir->mir_hold_inbound = 0;
2876 qenable(RD(q));
2878 mutex_exit(&mir->mir_mutex);
2881 static void
2882 mir_disconnect(queue_t *q, mir_t *mir)
2884 ASSERT(MUTEX_HELD(&mir->mir_mutex));
2886 switch (mir->mir_type) {
2887 case RPC_CLIENT:
2889 * We are disconnecting, but not necessarily
2890 * closing. By not closing, we will fail to
2891 * pick up a possibly changed global timeout value,
2892 * unless we store it now.
2894 mir->mir_idle_timeout = clnt_idle_timeout;
2895 mir_clnt_idle_start(WR(q), mir);
2896 mutex_exit(&mir->mir_mutex);
2899 * T_DISCON_REQ is passed to KRPC as an integer value
2900 * (this is not a TPI message). It is used as a
2901 * convenient value to indicate a sanity check
2902 * failure -- the same KRPC routine is also called
2903 * for T_DISCON_INDs and T_ORDREL_INDs.
2905 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
2906 break;
2908 case RPC_SERVER:
2909 mir->mir_svc_no_more_msgs = 1;
2910 mir_svc_idle_stop(WR(q), mir);
2911 mutex_exit(&mir->mir_mutex);
2912 RPCLOG(16, "mir_disconnect: telling "
2913 "stream head listener to disconnect stream "
2914 "(0x%p)\n", (void *) q);
2915 (void) mir_svc_policy_notify(q, 2);
2916 break;
2918 default:
2919 mutex_exit(&mir->mir_mutex);
2920 break;
2925 * Sanity check the message length, and if it's too large, shutdown the
2926 * connection. Returns 1 if the connection is shutdown; 0 otherwise.
2928 static int
2929 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
2931 mir_t *mir = q->q_ptr;
2932 uint_t maxsize = 0;
2934 if (mir->mir_max_msg_sizep != NULL)
2935 maxsize = *mir->mir_max_msg_sizep;
2937 if (maxsize == 0 || frag_len <= (int)maxsize)
2938 return (0);
2940 freemsg(head_mp);
2941 mir->mir_head_mp = NULL;
2942 mir->mir_tail_mp = NULL;
2943 mir->mir_frag_header = 0;
2944 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2945 if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
2946 cmn_err(CE_NOTE,
2947 "KRPC: record fragment from %s of size(%d) exceeds "
2948 "maximum (%u). Disconnecting",
2949 (mir->mir_type == RPC_CLIENT) ? "server" :
2950 (mir->mir_type == RPC_SERVER) ? "client" :
2951 "test tool", frag_len, maxsize);
2954 mir_disconnect(q, mir);
2955 return (1);