1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
4 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2
= "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid
= "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
39 * svc.c, Server-side remote procedure call interface.
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
45 * Copyright (C) 1984, Sun Microsystems, Inc.
48 #include <sys/param.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
54 #include <sys/mutex.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
61 #include <sys/ucred.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
67 #include <rpc/rpc_com.h>
69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
72 static struct svc_callout
*svc_find(SVCPOOL
*pool
, rpcprog_t
, rpcvers_t
,
74 static void svc_new_thread(SVCGROUP
*grp
);
75 static void xprt_unregister_locked(SVCXPRT
*xprt
);
76 static void svc_change_space_used(SVCPOOL
*pool
, long delta
);
77 static bool_t
svc_request_space_available(SVCPOOL
*pool
);
79 /* *************** SVCXPRT related stuff **************** */
81 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS
);
82 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS
);
83 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS
);
86 svcpool_create(const char *name
, struct sysctl_oid_list
*sysctl_base
)
92 pool
= malloc(sizeof(SVCPOOL
), M_RPC
, M_WAITOK
|M_ZERO
);
94 mtx_init(&pool
->sp_lock
, "sp_lock", NULL
, MTX_DEF
);
96 pool
->sp_state
= SVCPOOL_INIT
;
98 TAILQ_INIT(&pool
->sp_callouts
);
99 TAILQ_INIT(&pool
->sp_lcallouts
);
100 pool
->sp_minthreads
= 1;
101 pool
->sp_maxthreads
= 1;
102 pool
->sp_groupcount
= 1;
103 for (g
= 0; g
< SVC_MAXGROUPS
; g
++) {
104 grp
= &pool
->sp_groups
[g
];
105 mtx_init(&grp
->sg_lock
, "sg_lock", NULL
, MTX_DEF
);
107 grp
->sg_state
= SVCPOOL_ACTIVE
;
108 TAILQ_INIT(&grp
->sg_xlist
);
109 TAILQ_INIT(&grp
->sg_active
);
110 LIST_INIT(&grp
->sg_idlethreads
);
111 grp
->sg_minthreads
= 1;
112 grp
->sg_maxthreads
= 1;
116 * Don't use more than a quarter of mbuf clusters. Nota bene:
117 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
118 * on LP64 architectures, so cast to u_long to avoid undefined
119 * behavior. (ILP32 architectures cannot have nmbclusters
120 * large enough to overflow for other reasons.)
122 pool
->sp_space_high
= (u_long
)nmbclusters
* MCLBYTES
/ 4;
123 pool
->sp_space_low
= (pool
->sp_space_high
/ 3) * 2;
125 sysctl_ctx_init(&pool
->sp_sysctl
);
127 SYSCTL_ADD_PROC(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
128 "minthreads", CTLTYPE_INT
| CTLFLAG_RW
,
129 pool
, 0, svcpool_minthread_sysctl
, "I",
130 "Minimal number of threads");
131 SYSCTL_ADD_PROC(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
132 "maxthreads", CTLTYPE_INT
| CTLFLAG_RW
,
133 pool
, 0, svcpool_maxthread_sysctl
, "I",
134 "Maximal number of threads");
135 SYSCTL_ADD_PROC(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
136 "threads", CTLTYPE_INT
| CTLFLAG_RD
,
137 pool
, 0, svcpool_threads_sysctl
, "I",
138 "Current number of threads");
139 SYSCTL_ADD_INT(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
140 "groups", CTLFLAG_RD
, &pool
->sp_groupcount
, 0,
141 "Number of thread groups");
143 SYSCTL_ADD_ULONG(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
144 "request_space_used", CTLFLAG_RD
,
145 &pool
->sp_space_used
,
146 "Space in parsed but not handled requests.");
148 SYSCTL_ADD_ULONG(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
149 "request_space_used_highest", CTLFLAG_RD
,
150 &pool
->sp_space_used_highest
,
151 "Highest space used since reboot.");
153 SYSCTL_ADD_ULONG(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
154 "request_space_high", CTLFLAG_RW
,
155 &pool
->sp_space_high
,
156 "Maximum space in parsed but not handled requests.");
158 SYSCTL_ADD_ULONG(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
159 "request_space_low", CTLFLAG_RW
,
161 "Low water mark for request space.");
163 SYSCTL_ADD_INT(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
164 "request_space_throttled", CTLFLAG_RD
,
165 &pool
->sp_space_throttled
, 0,
166 "Whether nfs requests are currently throttled");
168 SYSCTL_ADD_INT(&pool
->sp_sysctl
, sysctl_base
, OID_AUTO
,
169 "request_space_throttle_count", CTLFLAG_RD
,
170 &pool
->sp_space_throttle_count
, 0,
171 "Count of times throttling based on request space has occurred");
178 svcpool_destroy(SVCPOOL
*pool
)
181 SVCXPRT
*xprt
, *nxprt
;
182 struct svc_callout
*s
;
183 struct svc_loss_callout
*sl
;
184 struct svcxprt_list cleanup
;
187 TAILQ_INIT(&cleanup
);
189 for (g
= 0; g
< SVC_MAXGROUPS
; g
++) {
190 grp
= &pool
->sp_groups
[g
];
191 mtx_lock(&grp
->sg_lock
);
192 while ((xprt
= TAILQ_FIRST(&grp
->sg_xlist
)) != NULL
) {
193 xprt_unregister_locked(xprt
);
194 TAILQ_INSERT_TAIL(&cleanup
, xprt
, xp_link
);
196 mtx_unlock(&grp
->sg_lock
);
198 TAILQ_FOREACH_SAFE(xprt
, &cleanup
, xp_link
, nxprt
) {
202 mtx_lock(&pool
->sp_lock
);
203 while ((s
= TAILQ_FIRST(&pool
->sp_callouts
)) != NULL
) {
204 mtx_unlock(&pool
->sp_lock
);
205 svc_unreg(pool
, s
->sc_prog
, s
->sc_vers
);
206 mtx_lock(&pool
->sp_lock
);
208 while ((sl
= TAILQ_FIRST(&pool
->sp_lcallouts
)) != NULL
) {
209 mtx_unlock(&pool
->sp_lock
);
210 svc_loss_unreg(pool
, sl
->slc_dispatch
);
211 mtx_lock(&pool
->sp_lock
);
213 mtx_unlock(&pool
->sp_lock
);
215 for (g
= 0; g
< SVC_MAXGROUPS
; g
++) {
216 grp
= &pool
->sp_groups
[g
];
217 mtx_destroy(&grp
->sg_lock
);
219 mtx_destroy(&pool
->sp_lock
);
222 replay_freecache(pool
->sp_rcache
);
224 sysctl_ctx_free(&pool
->sp_sysctl
);
229 * Sysctl handler to get the present thread count on a pool
232 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS
)
235 int threads
, error
, g
;
237 pool
= oidp
->oid_arg1
;
239 mtx_lock(&pool
->sp_lock
);
240 for (g
= 0; g
< pool
->sp_groupcount
; g
++)
241 threads
+= pool
->sp_groups
[g
].sg_threadcount
;
242 mtx_unlock(&pool
->sp_lock
);
243 error
= sysctl_handle_int(oidp
, &threads
, 0, req
);
248 * Sysctl handler to set the minimum thread count on a pool
251 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS
)
254 int newminthreads
, error
, g
;
256 pool
= oidp
->oid_arg1
;
257 newminthreads
= pool
->sp_minthreads
;
258 error
= sysctl_handle_int(oidp
, &newminthreads
, 0, req
);
259 if (error
== 0 && newminthreads
!= pool
->sp_minthreads
) {
260 if (newminthreads
> pool
->sp_maxthreads
)
262 mtx_lock(&pool
->sp_lock
);
263 pool
->sp_minthreads
= newminthreads
;
264 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
265 pool
->sp_groups
[g
].sg_minthreads
= max(1,
266 pool
->sp_minthreads
/ pool
->sp_groupcount
);
268 mtx_unlock(&pool
->sp_lock
);
274 * Sysctl handler to set the maximum thread count on a pool
277 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS
)
280 int newmaxthreads
, error
, g
;
282 pool
= oidp
->oid_arg1
;
283 newmaxthreads
= pool
->sp_maxthreads
;
284 error
= sysctl_handle_int(oidp
, &newmaxthreads
, 0, req
);
285 if (error
== 0 && newmaxthreads
!= pool
->sp_maxthreads
) {
286 if (newmaxthreads
< pool
->sp_minthreads
)
288 mtx_lock(&pool
->sp_lock
);
289 pool
->sp_maxthreads
= newmaxthreads
;
290 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
291 pool
->sp_groups
[g
].sg_maxthreads
= max(1,
292 pool
->sp_maxthreads
/ pool
->sp_groupcount
);
294 mtx_unlock(&pool
->sp_lock
);
300 * Activate a transport handle.
303 xprt_register(SVCXPRT
*xprt
)
305 SVCPOOL
*pool
= xprt
->xp_pool
;
310 g
= atomic_fetchadd_int(&pool
->sp_nextgroup
, 1) % pool
->sp_groupcount
;
311 xprt
->xp_group
= grp
= &pool
->sp_groups
[g
];
312 mtx_lock(&grp
->sg_lock
);
313 xprt
->xp_registered
= TRUE
;
314 xprt
->xp_active
= FALSE
;
315 TAILQ_INSERT_TAIL(&grp
->sg_xlist
, xprt
, xp_link
);
316 mtx_unlock(&grp
->sg_lock
);
320 * De-activate a transport handle. Note: the locked version doesn't
321 * release the transport - caller must do that after dropping the pool
325 xprt_unregister_locked(SVCXPRT
*xprt
)
327 SVCGROUP
*grp
= xprt
->xp_group
;
329 mtx_assert(&grp
->sg_lock
, MA_OWNED
);
330 KASSERT(xprt
->xp_registered
== TRUE
,
331 ("xprt_unregister_locked: not registered"));
332 xprt_inactive_locked(xprt
);
333 TAILQ_REMOVE(&grp
->sg_xlist
, xprt
, xp_link
);
334 xprt
->xp_registered
= FALSE
;
338 xprt_unregister(SVCXPRT
*xprt
)
340 SVCGROUP
*grp
= xprt
->xp_group
;
342 mtx_lock(&grp
->sg_lock
);
343 if (xprt
->xp_registered
== FALSE
) {
344 /* Already unregistered by another thread */
345 mtx_unlock(&grp
->sg_lock
);
348 xprt_unregister_locked(xprt
);
349 mtx_unlock(&grp
->sg_lock
);
355 * Attempt to assign a service thread to this transport.
358 xprt_assignthread(SVCXPRT
*xprt
)
360 SVCGROUP
*grp
= xprt
->xp_group
;
363 mtx_assert(&grp
->sg_lock
, MA_OWNED
);
364 st
= LIST_FIRST(&grp
->sg_idlethreads
);
366 LIST_REMOVE(st
, st_ilink
);
368 xprt
->xp_thread
= st
;
370 cv_signal(&st
->st_cond
);
374 * See if we can create a new thread. The
375 * actual thread creation happens in
376 * svc_run_internal because our locking state
377 * is poorly defined (we are typically called
378 * from a socket upcall). Don't create more
379 * than one thread per second.
381 if (grp
->sg_state
== SVCPOOL_ACTIVE
382 && grp
->sg_lastcreatetime
< time_uptime
383 && grp
->sg_threadcount
< grp
->sg_maxthreads
) {
384 grp
->sg_state
= SVCPOOL_THREADWANTED
;
391 xprt_active(SVCXPRT
*xprt
)
393 SVCGROUP
*grp
= xprt
->xp_group
;
395 mtx_lock(&grp
->sg_lock
);
397 if (!xprt
->xp_registered
) {
399 * Race with xprt_unregister - we lose.
401 mtx_unlock(&grp
->sg_lock
);
405 if (!xprt
->xp_active
) {
406 xprt
->xp_active
= TRUE
;
407 if (xprt
->xp_thread
== NULL
) {
408 if (!svc_request_space_available(xprt
->xp_pool
) ||
409 !xprt_assignthread(xprt
))
410 TAILQ_INSERT_TAIL(&grp
->sg_active
, xprt
,
415 mtx_unlock(&grp
->sg_lock
);
419 xprt_inactive_locked(SVCXPRT
*xprt
)
421 SVCGROUP
*grp
= xprt
->xp_group
;
423 mtx_assert(&grp
->sg_lock
, MA_OWNED
);
424 if (xprt
->xp_active
) {
425 if (xprt
->xp_thread
== NULL
)
426 TAILQ_REMOVE(&grp
->sg_active
, xprt
, xp_alink
);
427 xprt
->xp_active
= FALSE
;
432 xprt_inactive(SVCXPRT
*xprt
)
434 SVCGROUP
*grp
= xprt
->xp_group
;
436 mtx_lock(&grp
->sg_lock
);
437 xprt_inactive_locked(xprt
);
438 mtx_unlock(&grp
->sg_lock
);
442 * Variant of xprt_inactive() for use only when sure that port is
443 * assigned to thread. For example, within receive handlers.
446 xprt_inactive_self(SVCXPRT
*xprt
)
449 KASSERT(xprt
->xp_thread
!= NULL
,
450 ("xprt_inactive_self(%p) with NULL xp_thread", xprt
));
451 xprt
->xp_active
= FALSE
;
455 * Add a service program to the callout list.
456 * The dispatch routine will be called when a rpc request for this
457 * program number comes in.
460 svc_reg(SVCXPRT
*xprt
, const rpcprog_t prog
, const rpcvers_t vers
,
461 void (*dispatch
)(struct svc_req
*, SVCXPRT
*),
462 const struct netconfig
*nconf
)
464 SVCPOOL
*pool
= xprt
->xp_pool
;
465 struct svc_callout
*s
;
469 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
471 if (xprt
->xp_netid
) {
472 netid
= strdup(xprt
->xp_netid
, M_RPC
);
474 } else if (nconf
&& nconf
->nc_netid
) {
475 netid
= strdup(nconf
->nc_netid
, M_RPC
);
477 } /* must have been created with svc_raw_create */
478 if ((netid
== NULL
) && (flag
== 1)) {
482 mtx_lock(&pool
->sp_lock
);
483 if ((s
= svc_find(pool
, prog
, vers
, netid
)) != NULL
) {
486 if (s
->sc_dispatch
== dispatch
)
487 goto rpcb_it
; /* he is registering another xptr */
488 mtx_unlock(&pool
->sp_lock
);
491 s
= malloc(sizeof (struct svc_callout
), M_RPC
, M_NOWAIT
);
495 mtx_unlock(&pool
->sp_lock
);
501 s
->sc_dispatch
= dispatch
;
503 TAILQ_INSERT_TAIL(&pool
->sp_callouts
, s
, sc_link
);
505 if ((xprt
->xp_netid
== NULL
) && (flag
== 1) && netid
)
506 ((SVCXPRT
*) xprt
)->xp_netid
= strdup(netid
, M_RPC
);
509 mtx_unlock(&pool
->sp_lock
);
510 /* now register the information with the local binder service */
513 struct netconfig tnc
;
516 nb
.buf
= &xprt
->xp_ltaddr
;
517 nb
.len
= xprt
->xp_ltaddr
.ss_len
;
518 dummy
= rpcb_set(prog
, vers
, &tnc
, &nb
);
525 * Remove a service program from the callout list.
528 svc_unreg(SVCPOOL
*pool
, const rpcprog_t prog
, const rpcvers_t vers
)
530 struct svc_callout
*s
;
532 /* unregister the information anyway */
533 (void) rpcb_unset(prog
, vers
, NULL
);
534 mtx_lock(&pool
->sp_lock
);
535 while ((s
= svc_find(pool
, prog
, vers
, NULL
)) != NULL
) {
536 TAILQ_REMOVE(&pool
->sp_callouts
, s
, sc_link
);
538 mem_free(s
->sc_netid
, sizeof (s
->sc_netid
) + 1);
539 mem_free(s
, sizeof (struct svc_callout
));
541 mtx_unlock(&pool
->sp_lock
);
545 * Add a service connection loss program to the callout list.
546 * The dispatch routine will be called when some port in ths pool die.
549 svc_loss_reg(SVCXPRT
*xprt
, void (*dispatch
)(SVCXPRT
*))
551 SVCPOOL
*pool
= xprt
->xp_pool
;
552 struct svc_loss_callout
*s
;
554 mtx_lock(&pool
->sp_lock
);
555 TAILQ_FOREACH(s
, &pool
->sp_lcallouts
, slc_link
) {
556 if (s
->slc_dispatch
== dispatch
)
560 mtx_unlock(&pool
->sp_lock
);
563 s
= malloc(sizeof(struct svc_loss_callout
), M_RPC
, M_NOWAIT
);
565 mtx_unlock(&pool
->sp_lock
);
568 s
->slc_dispatch
= dispatch
;
569 TAILQ_INSERT_TAIL(&pool
->sp_lcallouts
, s
, slc_link
);
570 mtx_unlock(&pool
->sp_lock
);
575 * Remove a service connection loss program from the callout list.
578 svc_loss_unreg(SVCPOOL
*pool
, void (*dispatch
)(SVCXPRT
*))
580 struct svc_loss_callout
*s
;
582 mtx_lock(&pool
->sp_lock
);
583 TAILQ_FOREACH(s
, &pool
->sp_lcallouts
, slc_link
) {
584 if (s
->slc_dispatch
== dispatch
) {
585 TAILQ_REMOVE(&pool
->sp_lcallouts
, s
, slc_link
);
590 mtx_unlock(&pool
->sp_lock
);
593 /* ********************** CALLOUT list related stuff ************* */
596 * Search the callout list for a program number, return the callout
599 static struct svc_callout
*
600 svc_find(SVCPOOL
*pool
, rpcprog_t prog
, rpcvers_t vers
, char *netid
)
602 struct svc_callout
*s
;
604 mtx_assert(&pool
->sp_lock
, MA_OWNED
);
605 TAILQ_FOREACH(s
, &pool
->sp_callouts
, sc_link
) {
606 if (s
->sc_prog
== prog
&& s
->sc_vers
== vers
607 && (netid
== NULL
|| s
->sc_netid
== NULL
||
608 strcmp(netid
, s
->sc_netid
) == 0))
615 /* ******************* REPLY GENERATION ROUTINES ************ */
618 svc_sendreply_common(struct svc_req
*rqstp
, struct rpc_msg
*rply
,
621 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
624 if (rqstp
->rq_args
) {
625 m_freem(rqstp
->rq_args
);
626 rqstp
->rq_args
= NULL
;
629 if (xprt
->xp_pool
->sp_rcache
)
630 replay_setreply(xprt
->xp_pool
->sp_rcache
,
631 rply
, svc_getrpccaller(rqstp
), body
);
633 if (!SVCAUTH_WRAP(&rqstp
->rq_auth
, &body
))
636 ok
= SVC_REPLY(xprt
, rply
, rqstp
->rq_addr
, body
, &rqstp
->rq_reply_seq
);
637 if (rqstp
->rq_addr
) {
638 free(rqstp
->rq_addr
, M_SONAME
);
639 rqstp
->rq_addr
= NULL
;
646 * Send a reply to an rpc request
649 svc_sendreply(struct svc_req
*rqstp
, xdrproc_t xdr_results
, void * xdr_location
)
656 rply
.rm_xid
= rqstp
->rq_xid
;
657 rply
.rm_direction
= REPLY
;
658 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
659 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
660 rply
.acpted_rply
.ar_stat
= SUCCESS
;
661 rply
.acpted_rply
.ar_results
.where
= NULL
;
662 rply
.acpted_rply
.ar_results
.proc
= (xdrproc_t
) xdr_void
;
664 m
= m_getcl(M_WAITOK
, MT_DATA
, 0);
665 xdrmbuf_create(&xdrs
, m
, XDR_ENCODE
);
666 ok
= xdr_results(&xdrs
, xdr_location
);
670 return (svc_sendreply_common(rqstp
, &rply
, m
));
678 svc_sendreply_mbuf(struct svc_req
*rqstp
, struct mbuf
*m
)
682 rply
.rm_xid
= rqstp
->rq_xid
;
683 rply
.rm_direction
= REPLY
;
684 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
685 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
686 rply
.acpted_rply
.ar_stat
= SUCCESS
;
687 rply
.acpted_rply
.ar_results
.where
= NULL
;
688 rply
.acpted_rply
.ar_results
.proc
= (xdrproc_t
) xdr_void
;
690 return (svc_sendreply_common(rqstp
, &rply
, m
));
694 * No procedure error reply
697 svcerr_noproc(struct svc_req
*rqstp
)
699 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
702 rply
.rm_xid
= rqstp
->rq_xid
;
703 rply
.rm_direction
= REPLY
;
704 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
705 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
706 rply
.acpted_rply
.ar_stat
= PROC_UNAVAIL
;
708 if (xprt
->xp_pool
->sp_rcache
)
709 replay_setreply(xprt
->xp_pool
->sp_rcache
,
710 &rply
, svc_getrpccaller(rqstp
), NULL
);
712 svc_sendreply_common(rqstp
, &rply
, NULL
);
716 * Can't decode args error reply
719 svcerr_decode(struct svc_req
*rqstp
)
721 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
724 rply
.rm_xid
= rqstp
->rq_xid
;
725 rply
.rm_direction
= REPLY
;
726 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
727 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
728 rply
.acpted_rply
.ar_stat
= GARBAGE_ARGS
;
730 if (xprt
->xp_pool
->sp_rcache
)
731 replay_setreply(xprt
->xp_pool
->sp_rcache
,
732 &rply
, (struct sockaddr
*) &xprt
->xp_rtaddr
, NULL
);
734 svc_sendreply_common(rqstp
, &rply
, NULL
);
741 svcerr_systemerr(struct svc_req
*rqstp
)
743 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
746 rply
.rm_xid
= rqstp
->rq_xid
;
747 rply
.rm_direction
= REPLY
;
748 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
749 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
750 rply
.acpted_rply
.ar_stat
= SYSTEM_ERR
;
752 if (xprt
->xp_pool
->sp_rcache
)
753 replay_setreply(xprt
->xp_pool
->sp_rcache
,
754 &rply
, svc_getrpccaller(rqstp
), NULL
);
756 svc_sendreply_common(rqstp
, &rply
, NULL
);
760 * Authentication error reply
763 svcerr_auth(struct svc_req
*rqstp
, enum auth_stat why
)
765 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
768 rply
.rm_xid
= rqstp
->rq_xid
;
769 rply
.rm_direction
= REPLY
;
770 rply
.rm_reply
.rp_stat
= MSG_DENIED
;
771 rply
.rjcted_rply
.rj_stat
= AUTH_ERROR
;
772 rply
.rjcted_rply
.rj_why
= why
;
774 if (xprt
->xp_pool
->sp_rcache
)
775 replay_setreply(xprt
->xp_pool
->sp_rcache
,
776 &rply
, svc_getrpccaller(rqstp
), NULL
);
778 svc_sendreply_common(rqstp
, &rply
, NULL
);
782 * Auth too weak error reply
785 svcerr_weakauth(struct svc_req
*rqstp
)
788 svcerr_auth(rqstp
, AUTH_TOOWEAK
);
792 * Program unavailable error reply
795 svcerr_noprog(struct svc_req
*rqstp
)
797 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
800 rply
.rm_xid
= rqstp
->rq_xid
;
801 rply
.rm_direction
= REPLY
;
802 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
803 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
804 rply
.acpted_rply
.ar_stat
= PROG_UNAVAIL
;
806 if (xprt
->xp_pool
->sp_rcache
)
807 replay_setreply(xprt
->xp_pool
->sp_rcache
,
808 &rply
, svc_getrpccaller(rqstp
), NULL
);
810 svc_sendreply_common(rqstp
, &rply
, NULL
);
814 * Program version mismatch error reply
817 svcerr_progvers(struct svc_req
*rqstp
, rpcvers_t low_vers
, rpcvers_t high_vers
)
819 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
822 rply
.rm_xid
= rqstp
->rq_xid
;
823 rply
.rm_direction
= REPLY
;
824 rply
.rm_reply
.rp_stat
= MSG_ACCEPTED
;
825 rply
.acpted_rply
.ar_verf
= rqstp
->rq_verf
;
826 rply
.acpted_rply
.ar_stat
= PROG_MISMATCH
;
827 rply
.acpted_rply
.ar_vers
.low
= (uint32_t)low_vers
;
828 rply
.acpted_rply
.ar_vers
.high
= (uint32_t)high_vers
;
830 if (xprt
->xp_pool
->sp_rcache
)
831 replay_setreply(xprt
->xp_pool
->sp_rcache
,
832 &rply
, svc_getrpccaller(rqstp
), NULL
);
834 svc_sendreply_common(rqstp
, &rply
, NULL
);
838 * Allocate a new server transport structure. All fields are
839 * initialized to zero and xp_p3 is initialized to point at an
840 * extension structure to hold various flags and authentication
849 xprt
= mem_alloc(sizeof(SVCXPRT
));
850 ext
= mem_alloc(sizeof(SVCXPRT_EXT
));
852 refcount_init(&xprt
->xp_refs
, 1);
858 * Free a server transport structure.
861 svc_xprt_free(SVCXPRT
*xprt
)
864 mem_free(xprt
->xp_p3
, sizeof(SVCXPRT_EXT
));
865 mem_free(xprt
, sizeof(SVCXPRT
));
868 /* ******************* SERVER INPUT STUFF ******************* */
871 * Read RPC requests from a transport and queue them to be
872 * executed. We handle authentication and replay cache replies here.
873 * Actually dispatching the RPC is deferred till svc_executereq.
875 static enum xprt_stat
876 svc_getreq(SVCXPRT
*xprt
, struct svc_req
**rqstp_ret
)
878 SVCPOOL
*pool
= xprt
->xp_pool
;
882 struct svc_loss_callout
*s
;
885 /* now receive msgs from xprtprt (support batch calls) */
886 r
= malloc(sizeof(*r
), M_RPC
, M_WAITOK
|M_ZERO
);
888 msg
.rm_call
.cb_cred
.oa_base
= r
->rq_credarea
;
889 msg
.rm_call
.cb_verf
.oa_base
= &r
->rq_credarea
[MAX_AUTH_BYTES
];
890 r
->rq_clntcred
= &r
->rq_credarea
[2*MAX_AUTH_BYTES
];
891 if (SVC_RECV(xprt
, &msg
, &r
->rq_addr
, &args
)) {
895 * Handle replays and authenticate before queuing the
896 * request to be executed.
900 if (pool
->sp_rcache
) {
901 struct rpc_msg repmsg
;
902 struct mbuf
*repbody
;
903 enum replay_state rs
;
904 rs
= replay_find(pool
->sp_rcache
, &msg
,
905 svc_getrpccaller(r
), &repmsg
, &repbody
);
910 SVC_REPLY(xprt
, &repmsg
, r
->rq_addr
,
911 repbody
, &r
->rq_reply_seq
);
913 free(r
->rq_addr
, M_SONAME
);
925 r
->rq_xid
= msg
.rm_xid
;
926 r
->rq_prog
= msg
.rm_call
.cb_prog
;
927 r
->rq_vers
= msg
.rm_call
.cb_vers
;
928 r
->rq_proc
= msg
.rm_call
.cb_proc
;
929 r
->rq_size
= sizeof(*r
) + m_length(args
, NULL
);
931 if ((why
= _authenticate(r
, &msg
)) != AUTH_OK
) {
933 * RPCSEC_GSS uses this return code
934 * for requests that form part of its
935 * context establishment protocol and
936 * should not be dispatched to the
939 if (why
!= RPCSEC_GSS_NODISPATCH
)
944 if (!SVCAUTH_UNWRAP(&r
->rq_auth
, &r
->rq_args
)) {
950 * Everything checks out, return request to caller.
960 if ((stat
= SVC_STAT(xprt
)) == XPRT_DIED
) {
961 TAILQ_FOREACH(s
, &pool
->sp_lcallouts
, slc_link
)
962 (*s
->slc_dispatch
)(xprt
);
963 xprt_unregister(xprt
);
970 svc_executereq(struct svc_req
*rqstp
)
972 SVCXPRT
*xprt
= rqstp
->rq_xprt
;
973 SVCPOOL
*pool
= xprt
->xp_pool
;
977 struct svc_callout
*s
;
979 /* now match message with a registered service*/
981 low_vers
= (rpcvers_t
) -1L;
982 high_vers
= (rpcvers_t
) 0L;
983 TAILQ_FOREACH(s
, &pool
->sp_callouts
, sc_link
) {
984 if (s
->sc_prog
== rqstp
->rq_prog
) {
985 if (s
->sc_vers
== rqstp
->rq_vers
) {
987 * We hand ownership of r to the
988 * dispatch method - they must call
991 (*s
->sc_dispatch
)(rqstp
, xprt
);
993 } /* found correct version */
995 if (s
->sc_vers
< low_vers
)
996 low_vers
= s
->sc_vers
;
997 if (s
->sc_vers
> high_vers
)
998 high_vers
= s
->sc_vers
;
999 } /* found correct program */
1003 * if we got here, the program or version
1007 svcerr_progvers(rqstp
, low_vers
, high_vers
);
1009 svcerr_noprog(rqstp
);
1015 svc_checkidle(SVCGROUP
*grp
)
1017 SVCXPRT
*xprt
, *nxprt
;
1019 struct svcxprt_list cleanup
;
1021 TAILQ_INIT(&cleanup
);
1022 TAILQ_FOREACH_SAFE(xprt
, &grp
->sg_xlist
, xp_link
, nxprt
) {
1024 * Only some transports have idle timers. Don't time
1025 * something out which is just waking up.
1027 if (!xprt
->xp_idletimeout
|| xprt
->xp_thread
)
1030 timo
= xprt
->xp_lastactive
+ xprt
->xp_idletimeout
;
1031 if (time_uptime
> timo
) {
1032 xprt_unregister_locked(xprt
);
1033 TAILQ_INSERT_TAIL(&cleanup
, xprt
, xp_link
);
1037 mtx_unlock(&grp
->sg_lock
);
1038 TAILQ_FOREACH_SAFE(xprt
, &cleanup
, xp_link
, nxprt
) {
1041 mtx_lock(&grp
->sg_lock
);
1045 svc_assign_waiting_sockets(SVCPOOL
*pool
)
1051 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
1052 grp
= &pool
->sp_groups
[g
];
1053 mtx_lock(&grp
->sg_lock
);
1054 while ((xprt
= TAILQ_FIRST(&grp
->sg_active
)) != NULL
) {
1055 if (xprt_assignthread(xprt
))
1056 TAILQ_REMOVE(&grp
->sg_active
, xprt
, xp_alink
);
1060 mtx_unlock(&grp
->sg_lock
);
1065 svc_change_space_used(SVCPOOL
*pool
, long delta
)
1067 unsigned long value
;
1069 value
= atomic_fetchadd_long(&pool
->sp_space_used
, delta
) + delta
;
1071 if (value
>= pool
->sp_space_high
&& !pool
->sp_space_throttled
) {
1072 pool
->sp_space_throttled
= TRUE
;
1073 pool
->sp_space_throttle_count
++;
1075 if (value
> pool
->sp_space_used_highest
)
1076 pool
->sp_space_used_highest
= value
;
1078 if (value
< pool
->sp_space_low
&& pool
->sp_space_throttled
) {
1079 pool
->sp_space_throttled
= FALSE
;
1080 svc_assign_waiting_sockets(pool
);
1086 svc_request_space_available(SVCPOOL
*pool
)
1089 if (pool
->sp_space_throttled
)
1095 svc_run_internal(SVCGROUP
*grp
, bool_t ismaster
)
1097 SVCPOOL
*pool
= grp
->sg_pool
;
1098 SVCTHREAD
*st
, *stpref
;
1100 enum xprt_stat stat
;
1101 struct svc_req
*rqstp
;
1106 st
= mem_alloc(sizeof(*st
));
1107 mtx_init(&st
->st_lock
, "st_lock", NULL
, MTX_DEF
);
1110 STAILQ_INIT(&st
->st_reqs
);
1111 cv_init(&st
->st_cond
, "rpcsvc");
1113 mtx_lock(&grp
->sg_lock
);
1116 * If we are a new thread which was spawned to cope with
1117 * increased load, set the state back to SVCPOOL_ACTIVE.
1119 if (grp
->sg_state
== SVCPOOL_THREADSTARTING
)
1120 grp
->sg_state
= SVCPOOL_ACTIVE
;
1122 while (grp
->sg_state
!= SVCPOOL_CLOSING
) {
1124 * Create new thread if requested.
1126 if (grp
->sg_state
== SVCPOOL_THREADWANTED
) {
1127 grp
->sg_state
= SVCPOOL_THREADSTARTING
;
1128 grp
->sg_lastcreatetime
= time_uptime
;
1129 mtx_unlock(&grp
->sg_lock
);
1130 svc_new_thread(grp
);
1131 mtx_lock(&grp
->sg_lock
);
1136 * Check for idle transports once per second.
1138 if (time_uptime
> grp
->sg_lastidlecheck
) {
1139 grp
->sg_lastidlecheck
= time_uptime
;
1146 * Enforce maxthreads count.
1148 if (grp
->sg_threadcount
> grp
->sg_maxthreads
)
1152 * Before sleeping, see if we can find an
1153 * active transport which isn't being serviced
1156 if (svc_request_space_available(pool
) &&
1157 (xprt
= TAILQ_FIRST(&grp
->sg_active
)) != NULL
) {
1158 TAILQ_REMOVE(&grp
->sg_active
, xprt
, xp_alink
);
1160 xprt
->xp_thread
= st
;
1165 LIST_INSERT_HEAD(&grp
->sg_idlethreads
, st
, st_ilink
);
1166 if (ismaster
|| (!ismaster
&&
1167 grp
->sg_threadcount
> grp
->sg_minthreads
))
1168 error
= cv_timedwait_sig(&st
->st_cond
,
1169 &grp
->sg_lock
, 5 * hz
);
1171 error
= cv_wait_sig(&st
->st_cond
,
1173 if (st
->st_xprt
== NULL
)
1174 LIST_REMOVE(st
, st_ilink
);
1177 * Reduce worker thread count when idle.
1179 if (error
== EWOULDBLOCK
) {
1181 && (grp
->sg_threadcount
1182 > grp
->sg_minthreads
)
1185 } else if (error
!= 0) {
1186 KASSERT(error
== EINTR
|| error
== ERESTART
,
1187 ("non-signal error %d", error
));
1188 mtx_unlock(&grp
->sg_lock
);
1191 if (P_SHOULDSTOP(p
) ||
1192 (p
->p_flag
& P_TOTAL_STOP
) != 0) {
1193 thread_suspend_check(0);
1195 mtx_lock(&grp
->sg_lock
);
1199 mtx_lock(&grp
->sg_lock
);
1205 mtx_unlock(&grp
->sg_lock
);
1208 * Drain the transport socket and queue up any RPCs.
1210 xprt
->xp_lastactive
= time_uptime
;
1212 if (!svc_request_space_available(pool
))
1215 stat
= svc_getreq(xprt
, &rqstp
);
1217 svc_change_space_used(pool
, rqstp
->rq_size
);
1219 * See if the application has a preference
1220 * for some other thread.
1222 if (pool
->sp_assign
) {
1223 stpref
= pool
->sp_assign(st
, rqstp
);
1224 rqstp
->rq_thread
= stpref
;
1225 STAILQ_INSERT_TAIL(&stpref
->st_reqs
,
1227 mtx_unlock(&stpref
->st_lock
);
1231 rqstp
->rq_thread
= st
;
1232 STAILQ_INSERT_TAIL(&st
->st_reqs
,
1236 } while (rqstp
== NULL
&& stat
== XPRT_MOREREQS
1237 && grp
->sg_state
!= SVCPOOL_CLOSING
);
1240 * Move this transport to the end of the active list to
1241 * ensure fairness when multiple transports are active.
1242 * If this was the last queued request, svc_getreq will end
1243 * up calling xprt_inactive to remove from the active list.
1245 mtx_lock(&grp
->sg_lock
);
1246 xprt
->xp_thread
= NULL
;
1248 if (xprt
->xp_active
) {
1249 if (!svc_request_space_available(pool
) ||
1250 !xprt_assignthread(xprt
))
1251 TAILQ_INSERT_TAIL(&grp
->sg_active
,
1254 mtx_unlock(&grp
->sg_lock
);
1258 * Execute what we have queued.
1260 mtx_lock(&st
->st_lock
);
1261 while ((rqstp
= STAILQ_FIRST(&st
->st_reqs
)) != NULL
) {
1262 STAILQ_REMOVE_HEAD(&st
->st_reqs
, rq_link
);
1263 mtx_unlock(&st
->st_lock
);
1264 sz
= (long)rqstp
->rq_size
;
1265 svc_executereq(rqstp
);
1266 svc_change_space_used(pool
, -sz
);
1267 mtx_lock(&st
->st_lock
);
1269 mtx_unlock(&st
->st_lock
);
1270 mtx_lock(&grp
->sg_lock
);
1278 KASSERT(STAILQ_EMPTY(&st
->st_reqs
), ("stray reqs on exit"));
1279 mtx_destroy(&st
->st_lock
);
1280 cv_destroy(&st
->st_cond
);
1281 mem_free(st
, sizeof(*st
));
1283 grp
->sg_threadcount
--;
1286 mtx_unlock(&grp
->sg_lock
);
1290 svc_thread_start(void *arg
)
1293 svc_run_internal((SVCGROUP
*) arg
, FALSE
);
1298 svc_new_thread(SVCGROUP
*grp
)
1300 SVCPOOL
*pool
= grp
->sg_pool
;
1303 mtx_lock(&grp
->sg_lock
);
1304 grp
->sg_threadcount
++;
1305 mtx_unlock(&grp
->sg_lock
);
1306 kthread_add(svc_thread_start
, grp
, pool
->sp_proc
, &td
, 0, 0,
1307 "%s: service", pool
->sp_name
);
1311 svc_run(SVCPOOL
*pool
)
1320 snprintf(td
->td_name
, sizeof(td
->td_name
),
1321 "%s: master", pool
->sp_name
);
1322 pool
->sp_state
= SVCPOOL_ACTIVE
;
1325 /* Choose group count based on number of threads and CPUs. */
1326 pool
->sp_groupcount
= max(1, min(SVC_MAXGROUPS
,
1327 min(pool
->sp_maxthreads
/ 2, mp_ncpus
) / 6));
1328 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
1329 grp
= &pool
->sp_groups
[g
];
1330 grp
->sg_minthreads
= max(1,
1331 pool
->sp_minthreads
/ pool
->sp_groupcount
);
1332 grp
->sg_maxthreads
= max(1,
1333 pool
->sp_maxthreads
/ pool
->sp_groupcount
);
1334 grp
->sg_lastcreatetime
= time_uptime
;
1337 /* Starting threads */
1338 pool
->sp_groups
[0].sg_threadcount
++;
1339 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
1340 grp
= &pool
->sp_groups
[g
];
1341 for (i
= ((g
== 0) ? 1 : 0); i
< grp
->sg_minthreads
; i
++)
1342 svc_new_thread(grp
);
1344 svc_run_internal(&pool
->sp_groups
[0], TRUE
);
1346 /* Waiting for threads to stop. */
1347 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
1348 grp
= &pool
->sp_groups
[g
];
1349 mtx_lock(&grp
->sg_lock
);
1350 while (grp
->sg_threadcount
> 0)
1351 msleep(grp
, &grp
->sg_lock
, 0, "svcexit", 0);
1352 mtx_unlock(&grp
->sg_lock
);
1357 svc_exit(SVCPOOL
*pool
)
1363 pool
->sp_state
= SVCPOOL_CLOSING
;
1364 for (g
= 0; g
< pool
->sp_groupcount
; g
++) {
1365 grp
= &pool
->sp_groups
[g
];
1366 mtx_lock(&grp
->sg_lock
);
1367 if (grp
->sg_state
!= SVCPOOL_CLOSING
) {
1368 grp
->sg_state
= SVCPOOL_CLOSING
;
1369 LIST_FOREACH(st
, &grp
->sg_idlethreads
, st_ilink
)
1370 cv_signal(&st
->st_cond
);
1372 mtx_unlock(&grp
->sg_lock
);
1377 svc_getargs(struct svc_req
*rqstp
, xdrproc_t xargs
, void *args
)
1384 rqstp
->rq_args
= NULL
;
1386 xdrmbuf_create(&xdrs
, m
, XDR_DECODE
);
1387 stat
= xargs(&xdrs
, args
);
1394 svc_freeargs(struct svc_req
*rqstp
, xdrproc_t xargs
, void *args
)
1398 if (rqstp
->rq_addr
) {
1399 free(rqstp
->rq_addr
, M_SONAME
);
1400 rqstp
->rq_addr
= NULL
;
1403 xdrs
.x_op
= XDR_FREE
;
1404 return (xargs(&xdrs
, args
));
1408 svc_freereq(struct svc_req
*rqstp
)
1413 st
= rqstp
->rq_thread
;
1417 pool
->sp_done(st
, rqstp
);
1420 if (rqstp
->rq_auth
.svc_ah_ops
)
1421 SVCAUTH_RELEASE(&rqstp
->rq_auth
);
1423 if (rqstp
->rq_xprt
) {
1424 SVC_RELEASE(rqstp
->rq_xprt
);
1428 free(rqstp
->rq_addr
, M_SONAME
);
1431 m_freem(rqstp
->rq_args
);