2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG
, "lwkt message", "lwkt message");
73 /************************************************************************
75 ************************************************************************/
80 * Request asynchronous completion and call lwkt_beginmsg(). The
81 * target port can opt to execute the message synchronously or
82 * asynchronously and this function will automatically queue the
83 * response if the target executes the message synchronously.
85 * NOTE: The message is in an indeterminant state until this call
86 * returns. The caller should not mess with it (e.g. try to abort it)
90 lwkt_sendmsg(lwkt_port_t port
, lwkt_msg_t msg
)
94 KKASSERT(msg
->ms_reply_port
!= NULL
&&
95 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
96 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_SYNC
| MSGF_DONE
);
97 if ((error
= lwkt_beginmsg(port
, msg
)) != EASYNC
) {
99 * Target port opted to execute the message synchronously so
100 * queue the response.
102 lwkt_replymsg(msg
, error
);
109 * Request synchronous completion and call lwkt_beginmsg(). The
110 * target port can opt to execute the message synchronously or
111 * asynchronously and this function will automatically block and
112 * wait for a response if the target executes the message
116 lwkt_domsg(lwkt_port_t port
, lwkt_msg_t msg
, int flags
)
120 KKASSERT(msg
->ms_reply_port
!= NULL
&&
121 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
122 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_DONE
);
123 msg
->ms_flags
|= MSGF_SYNC
;
124 if ((error
= lwkt_beginmsg(port
, msg
)) == EASYNC
) {
126 * Target port opted to execute the message asynchronously so
127 * block and wait for a reply.
129 error
= lwkt_waitmsg(msg
, flags
);
131 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
139 * Forward a message received on one port to another port.
142 lwkt_forwardmsg(lwkt_port_t port
, lwkt_msg_t msg
)
147 KKASSERT((msg
->ms_flags
& (MSGF_QUEUED
|MSGF_DONE
|MSGF_REPLY
)) == 0);
148 if ((error
= port
->mp_putport(port
, msg
)) != EASYNC
)
149 lwkt_replymsg(msg
, error
);
157 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
158 * The caller must ensure that the message will not be both replied AND
159 * destroyed while the abort is in progress.
161 * This function issues a callback which might block!
164 lwkt_abortmsg(lwkt_msg_t msg
)
167 * A critical section protects us from reply IPIs on this cpu.
172 * Shortcut the operation if the message has already been returned.
173 * The callback typically constructs a lwkt_msg with the abort request,
174 * issues it synchronously, and waits for completion. The callback
175 * is not required to actually abort the message and the target port,
176 * upon receiving an abort request message generated by the callback
177 * should check whether the original message has already completed or
180 if (msg
->ms_flags
& MSGF_ABORTABLE
) {
181 if ((msg
->ms_flags
& (MSGF_DONE
|MSGF_REPLY
)) == 0)
182 msg
->ms_abortfn(msg
);
187 /************************************************************************
188 * PORT INITIALIZATION API *
189 ************************************************************************/
191 static void *lwkt_thread_getport(lwkt_port_t port
);
192 static int lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
);
193 static int lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
);
194 static void *lwkt_thread_waitport(lwkt_port_t port
, int flags
);
195 static void lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
196 static void lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
198 static void *lwkt_spin_getport(lwkt_port_t port
);
199 static int lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
);
200 static int lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
);
201 static void *lwkt_spin_waitport(lwkt_port_t port
, int flags
);
202 static void lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
203 static void lwkt_spin_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
205 static void *lwkt_serialize_getport(lwkt_port_t port
);
206 static int lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
);
207 static int lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
);
208 static void *lwkt_serialize_waitport(lwkt_port_t port
, int flags
);
209 static void lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
211 static void lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
212 static void *lwkt_panic_getport(lwkt_port_t port
);
213 static int lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
);
214 static int lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
);
215 static void *lwkt_panic_waitport(lwkt_port_t port
, int flags
);
216 static void lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
217 static void lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
220 * Core port initialization (internal)
224 _lwkt_initport(lwkt_port_t port
,
225 void *(*gportfn
)(lwkt_port_t
),
226 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
),
227 int (*wmsgfn
)(lwkt_msg_t
, int),
228 void *(*wportfn
)(lwkt_port_t
, int),
229 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
),
230 void (*dmsgfn
)(lwkt_port_t
, lwkt_msg_t
))
232 bzero(port
, sizeof(*port
));
233 TAILQ_INIT(&port
->mp_msgq
);
234 TAILQ_INIT(&port
->mp_msgq_prio
);
235 port
->mp_getport
= gportfn
;
236 port
->mp_putport
= pportfn
;
237 port
->mp_waitmsg
= wmsgfn
;
238 port
->mp_waitport
= wportfn
;
239 port
->mp_replyport
= rportfn
;
240 port
->mp_dropmsg
= dmsgfn
;
244 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
245 * we tell the scheduler not to reschedule if td is at a higher priority.
247 * This routine is called even if the thread is already scheduled.
251 _lwkt_schedule_msg(thread_t td
, int flags
)
257 * lwkt_initport_thread()
259 * Initialize a port for use by a particular thread. The port may
260 * only be used by <td>.
263 lwkt_initport_thread(lwkt_port_t port
, thread_t td
)
269 lwkt_thread_waitport
,
270 lwkt_thread_replyport
,
271 lwkt_thread_dropmsg
);
276 * lwkt_initport_spin()
278 * Initialize a port for use with descriptors that might be accessed
279 * via multiple LWPs, processes, or threads. Has somewhat more
280 * overhead then thread ports.
283 lwkt_initport_spin(lwkt_port_t port
, thread_t td
)
285 void (*dmsgfn
)(lwkt_port_t
, lwkt_msg_t
);
288 dmsgfn
= lwkt_panic_dropmsg
;
290 dmsgfn
= lwkt_spin_dropmsg
;
299 spin_init(&port
->mpu_spin
);
304 * lwkt_initport_serialize()
306 * Initialize a port for use with descriptors that might be accessed
307 * via multiple LWPs, processes, or threads. Callers are assumed to
308 * have held the serializer (slz).
311 lwkt_initport_serialize(lwkt_port_t port
, struct lwkt_serialize
*slz
)
314 lwkt_serialize_getport
,
315 lwkt_serialize_putport
,
316 lwkt_serialize_waitmsg
,
317 lwkt_serialize_waitport
,
318 lwkt_serialize_replyport
,
320 port
->mpu_serialize
= slz
;
324 * Similar to the standard initport, this function simply marks the message
325 * as being done and does not attempt to return it to an originating port.
328 lwkt_initport_replyonly_null(lwkt_port_t port
)
340 * Initialize a reply-only port, typically used as a message sink. Such
341 * ports can only be used as a reply port.
344 lwkt_initport_replyonly(lwkt_port_t port
,
345 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
))
347 _lwkt_initport(port
, lwkt_panic_getport
, lwkt_panic_putport
,
348 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
349 rportfn
, lwkt_panic_dropmsg
);
353 lwkt_initport_putonly(lwkt_port_t port
,
354 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
))
356 _lwkt_initport(port
, lwkt_panic_getport
, pportfn
,
357 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
358 lwkt_panic_replyport
, lwkt_panic_dropmsg
);
362 lwkt_initport_panic(lwkt_port_t port
)
365 lwkt_panic_getport
, lwkt_panic_putport
,
366 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
367 lwkt_panic_replyport
, lwkt_panic_dropmsg
);
372 _lwkt_pullmsg(lwkt_port_t port
, lwkt_msg_t msg
)
374 lwkt_msg_queue
*queue
;
377 * normal case, remove and return the message.
379 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
380 queue
= &port
->mp_msgq_prio
;
382 queue
= &port
->mp_msgq
;
383 TAILQ_REMOVE(queue
, msg
, ms_node
);
386 * atomic op needed for spin ports
388 atomic_clear_int(&msg
->ms_flags
, MSGF_QUEUED
);
393 _lwkt_pushmsg(lwkt_port_t port
, lwkt_msg_t msg
)
395 lwkt_msg_queue
*queue
;
398 * atomic op needed for spin ports
400 atomic_set_int(&msg
->ms_flags
, MSGF_QUEUED
);
401 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
402 queue
= &port
->mp_msgq_prio
;
404 queue
= &port
->mp_msgq
;
405 TAILQ_INSERT_TAIL(queue
, msg
, ms_node
);
410 _lwkt_pollmsg(lwkt_port_t port
)
414 msg
= TAILQ_FIRST(&port
->mp_msgq_prio
);
415 if (__predict_false(msg
!= NULL
))
419 * Priority queue has no message, fallback to non-priority queue.
421 return TAILQ_FIRST(&port
->mp_msgq
);
426 _lwkt_enqueue_reply(lwkt_port_t port
, lwkt_msg_t msg
)
429 * atomic op needed for spin ports
431 _lwkt_pushmsg(port
, msg
);
432 atomic_set_int(&msg
->ms_flags
, MSGF_REPLY
| MSGF_DONE
);
435 /************************************************************************
436 * THREAD PORT BACKEND *
437 ************************************************************************
439 * This backend is used when the port a message is retrieved from is owned
440 * by a single thread (the calling thread). Messages are IPId to the
441 * correct cpu before being enqueued to a port. Note that this is fairly
442 * optimal since scheduling would have had to do an IPI anyway if the
443 * message were headed to a different cpu.
447 * This function completes reply processing for the default case in the
448 * context of the originating cpu.
452 lwkt_thread_replyport_remote(lwkt_msg_t msg
)
454 lwkt_port_t port
= msg
->ms_reply_port
;
458 * Chase any thread migration that occurs
460 if (port
->mpu_td
->td_gd
!= mycpu
) {
461 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
462 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
470 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
471 msg
->ms_flags
&= ~MSGF_INTRANSIT
;
473 flags
= msg
->ms_flags
;
474 if (msg
->ms_flags
& MSGF_SYNC
) {
476 msg
->ms_flags
|= MSGF_REPLY
| MSGF_DONE
;
478 _lwkt_enqueue_reply(port
, msg
);
480 if (port
->mp_flags
& MSGPORTF_WAITING
)
481 _lwkt_schedule_msg(port
->mpu_td
, flags
);
485 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
487 * Called with the reply port as an argument but in the context of the
488 * original target port. Completion must occur on the target port's
491 * The critical section protects us from IPIs on the this CPU.
495 lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
499 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
|MSGF_INTRANSIT
)) == 0);
501 if (msg
->ms_flags
& MSGF_SYNC
) {
503 * If a synchronous completion has been requested, just wakeup
504 * the message without bothering to queue it to the target port.
506 * Assume the target thread is non-preemptive, so no critical
507 * section is required.
509 if (port
->mpu_td
->td_gd
== mycpu
) {
510 flags
= msg
->ms_flags
;
512 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
513 if (port
->mp_flags
& MSGPORTF_WAITING
)
514 _lwkt_schedule_msg(port
->mpu_td
, flags
);
517 msg
->ms_flags
|= MSGF_INTRANSIT
;
519 msg
->ms_flags
|= MSGF_REPLY
;
520 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
521 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
525 * If an asynchronous completion has been requested the message
526 * must be queued to the reply port.
528 * A critical section is required to interlock the port queue.
530 if (port
->mpu_td
->td_gd
== mycpu
) {
532 _lwkt_enqueue_reply(port
, msg
);
533 if (port
->mp_flags
& MSGPORTF_WAITING
)
534 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
538 msg
->ms_flags
|= MSGF_INTRANSIT
;
540 msg
->ms_flags
|= MSGF_REPLY
;
541 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
542 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
548 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
550 * This function could _only_ be used when caller is in the same thread
551 * as the message's target port owner thread.
554 lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
556 KASSERT(port
->mpu_td
== curthread
,
557 ("message could only be dropped in the same thread "
558 "as the message target port thread"));
559 crit_enter_quick(port
->mpu_td
);
560 _lwkt_pullmsg(port
, msg
);
561 msg
->ms_flags
|= MSGF_DONE
;
562 crit_exit_quick(port
->mpu_td
);
566 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
568 * Called with the target port as an argument but in the context of the
569 * reply port. This function always implements an asynchronous put to
570 * the target message port, and thus returns EASYNC.
572 * The message must already have cleared MSGF_DONE and MSGF_REPLY
576 lwkt_thread_putport_remote(lwkt_msg_t msg
)
578 lwkt_port_t port
= msg
->ms_target_port
;
581 * Chase any thread migration that occurs
583 if (port
->mpu_td
->td_gd
!= mycpu
) {
584 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
585 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
593 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
594 msg
->ms_flags
&= ~MSGF_INTRANSIT
;
596 _lwkt_pushmsg(port
, msg
);
597 if (port
->mp_flags
& MSGPORTF_WAITING
)
598 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
603 lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
)
605 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
607 msg
->ms_target_port
= port
;
608 if (port
->mpu_td
->td_gd
== mycpu
) {
610 _lwkt_pushmsg(port
, msg
);
611 if (port
->mp_flags
& MSGPORTF_WAITING
)
612 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
616 msg
->ms_flags
|= MSGF_INTRANSIT
;
618 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
619 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
625 * lwkt_thread_getport()
627 * Retrieve the next message from the port or NULL if no messages
632 lwkt_thread_getport(lwkt_port_t port
)
636 KKASSERT(port
->mpu_td
== curthread
);
638 crit_enter_quick(port
->mpu_td
);
639 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
640 _lwkt_pullmsg(port
, msg
);
641 crit_exit_quick(port
->mpu_td
);
646 * lwkt_thread_waitmsg()
648 * Wait for a particular message to be replied. We must be the only
649 * thread waiting on the message. The port must be owned by the
654 lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
)
656 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
657 ("can't wait dropable message"));
659 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
661 * If the done bit was not set we have to block until it is.
663 lwkt_port_t port
= msg
->ms_reply_port
;
664 thread_t td
= curthread
;
667 KKASSERT(port
->mpu_td
== td
);
668 crit_enter_quick(td
);
671 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
672 port
->mp_flags
|= MSGPORTF_WAITING
;
673 if (sentabort
== 0) {
674 if ((sentabort
= lwkt_sleep("waitmsg", flags
)) != 0) {
678 lwkt_sleep("waitabt", 0);
680 port
->mp_flags
&= ~MSGPORTF_WAITING
;
682 if (msg
->ms_flags
& MSGF_QUEUED
)
683 _lwkt_pullmsg(port
, msg
);
687 * If the done bit was set we only have to mess around with the
688 * message if it is queued on the reply port.
690 if (msg
->ms_flags
& MSGF_QUEUED
) {
691 lwkt_port_t port
= msg
->ms_reply_port
;
692 thread_t td
= curthread
;
694 KKASSERT(port
->mpu_td
== td
);
695 crit_enter_quick(td
);
696 _lwkt_pullmsg(port
, msg
);
700 return(msg
->ms_error
);
704 * lwkt_thread_waitport()
706 * Wait for a new message to be available on the port. We must be the
707 * the only thread waiting on the port. The port must be owned by caller.
711 lwkt_thread_waitport(lwkt_port_t port
, int flags
)
713 thread_t td
= curthread
;
717 KKASSERT(port
->mpu_td
== td
);
718 crit_enter_quick(td
);
719 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
720 port
->mp_flags
|= MSGPORTF_WAITING
;
721 error
= lwkt_sleep("waitport", flags
);
722 port
->mp_flags
&= ~MSGPORTF_WAITING
;
726 _lwkt_pullmsg(port
, msg
);
732 /************************************************************************
733 * SPIN PORT BACKEND *
734 ************************************************************************
736 * This backend uses spinlocks instead of making assumptions about which
737 * thread is accessing the port. It must be used when a port is not owned
738 * by a particular thread. This is less optimal then thread ports but
739 * you don't have a choice if there are multiple threads accessing the port.
741 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
742 * on the message port, it is the responsibility of the code doing the
743 * wakeup to clear this flag rather then the blocked threads. Some
744 * superfluous wakeups may occur, which is ok.
746 * XXX synchronous message wakeups are not current optimized.
751 lwkt_spin_getport(lwkt_port_t port
)
755 spin_lock(&port
->mpu_spin
);
756 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
757 _lwkt_pullmsg(port
, msg
);
758 spin_unlock(&port
->mpu_spin
);
764 lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
)
768 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
770 msg
->ms_target_port
= port
;
771 spin_lock(&port
->mpu_spin
);
772 _lwkt_pushmsg(port
, msg
);
774 if (port
->mp_flags
& MSGPORTF_WAITING
) {
775 port
->mp_flags
&= ~MSGPORTF_WAITING
;
778 spin_unlock(&port
->mpu_spin
);
786 lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
)
792 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
793 ("can't wait dropable message"));
795 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
796 port
= msg
->ms_reply_port
;
798 spin_lock(&port
->mpu_spin
);
799 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
803 * If message was sent synchronously from the beginning
804 * the wakeup will be on the message structure, else it
805 * will be on the port structure.
807 if (msg
->ms_flags
& MSGF_SYNC
) {
809 atomic_set_int(&msg
->ms_flags
, MSGF_WAITING
);
812 port
->mp_flags
|= MSGPORTF_WAITING
;
816 * Only messages which support abort can be interrupted.
817 * We must still wait for message completion regardless.
819 if ((flags
& PCATCH
) && sentabort
== 0) {
820 error
= ssleep(won
, &port
->mpu_spin
, PCATCH
, "waitmsg", 0);
823 spin_unlock(&port
->mpu_spin
);
825 spin_lock(&port
->mpu_spin
);
828 error
= ssleep(won
, &port
->mpu_spin
, 0, "waitmsg", 0);
830 /* see note at the top on the MSGPORTF_WAITING flag */
833 * Turn EINTR into ERESTART if the signal indicates.
835 if (sentabort
&& msg
->ms_error
== EINTR
)
836 msg
->ms_error
= sentabort
;
837 if (msg
->ms_flags
& MSGF_QUEUED
)
838 _lwkt_pullmsg(port
, msg
);
839 spin_unlock(&port
->mpu_spin
);
841 if (msg
->ms_flags
& MSGF_QUEUED
) {
842 port
= msg
->ms_reply_port
;
843 spin_lock(&port
->mpu_spin
);
844 _lwkt_pullmsg(port
, msg
);
845 spin_unlock(&port
->mpu_spin
);
848 return(msg
->ms_error
);
853 lwkt_spin_waitport(lwkt_port_t port
, int flags
)
858 spin_lock(&port
->mpu_spin
);
859 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
860 port
->mp_flags
|= MSGPORTF_WAITING
;
861 error
= ssleep(port
, &port
->mpu_spin
, flags
, "waitport", 0);
862 /* see note at the top on the MSGPORTF_WAITING flag */
864 spin_unlock(&port
->mpu_spin
);
868 _lwkt_pullmsg(port
, msg
);
869 spin_unlock(&port
->mpu_spin
);
875 lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
879 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
881 if (msg
->ms_flags
& MSGF_SYNC
) {
883 * If a synchronous completion has been requested, just wakeup
884 * the message without bothering to queue it to the target port.
886 spin_lock(&port
->mpu_spin
);
887 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
889 if (msg
->ms_flags
& MSGF_WAITING
) {
890 atomic_clear_int(&msg
->ms_flags
, MSGF_WAITING
);
893 spin_unlock(&port
->mpu_spin
);
898 * If an asynchronous completion has been requested the message
899 * must be queued to the reply port.
901 spin_lock(&port
->mpu_spin
);
902 _lwkt_enqueue_reply(port
, msg
);
904 if (port
->mp_flags
& MSGPORTF_WAITING
) {
905 port
->mp_flags
&= ~MSGPORTF_WAITING
;
908 spin_unlock(&port
->mpu_spin
);
915 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
917 * This function could _only_ be used when caller is in the same thread
918 * as the message's target port owner thread.
921 lwkt_spin_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
923 KASSERT(port
->mpu_td
== curthread
,
924 ("message could only be dropped in the same thread "
925 "as the message target port thread\n"));
926 spin_lock(&port
->mpu_spin
);
927 _lwkt_pullmsg(port
, msg
);
928 msg
->ms_flags
|= MSGF_DONE
;
929 spin_unlock(&port
->mpu_spin
);
932 /************************************************************************
933 * SERIALIZER PORT BACKEND *
934 ************************************************************************
936 * This backend uses serializer to protect port accessing. Callers are
937 * assumed to have serializer held. This kind of port is usually created
938 * by network device driver along with _one_ lwkt thread to pipeline
939 * operations which may temporarily release serializer.
941 * Implementation is based on SPIN PORT BACKEND.
946 lwkt_serialize_getport(lwkt_port_t port
)
950 ASSERT_SERIALIZED(port
->mpu_serialize
);
952 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
953 _lwkt_pullmsg(port
, msg
);
959 lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
)
961 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
962 ASSERT_SERIALIZED(port
->mpu_serialize
);
964 msg
->ms_target_port
= port
;
965 _lwkt_pushmsg(port
, msg
);
966 if (port
->mp_flags
& MSGPORTF_WAITING
) {
967 port
->mp_flags
&= ~MSGPORTF_WAITING
;
975 lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
)
981 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
982 ("can't wait dropable message"));
984 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
985 port
= msg
->ms_reply_port
;
987 ASSERT_SERIALIZED(port
->mpu_serialize
);
990 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
994 * If message was sent synchronously from the beginning
995 * the wakeup will be on the message structure, else it
996 * will be on the port structure.
998 if (msg
->ms_flags
& MSGF_SYNC
) {
1002 port
->mp_flags
|= MSGPORTF_WAITING
;
1006 * Only messages which support abort can be interrupted.
1007 * We must still wait for message completion regardless.
1009 if ((flags
& PCATCH
) && sentabort
== 0) {
1010 error
= zsleep(won
, port
->mpu_serialize
, PCATCH
, "waitmsg", 0);
1013 lwkt_serialize_exit(port
->mpu_serialize
);
1015 lwkt_serialize_enter(port
->mpu_serialize
);
1018 error
= zsleep(won
, port
->mpu_serialize
, 0, "waitmsg", 0);
1020 /* see note at the top on the MSGPORTF_WAITING flag */
1023 * Turn EINTR into ERESTART if the signal indicates.
1025 if (sentabort
&& msg
->ms_error
== EINTR
)
1026 msg
->ms_error
= sentabort
;
1027 if (msg
->ms_flags
& MSGF_QUEUED
)
1028 _lwkt_pullmsg(port
, msg
);
1030 if (msg
->ms_flags
& MSGF_QUEUED
) {
1031 port
= msg
->ms_reply_port
;
1033 ASSERT_SERIALIZED(port
->mpu_serialize
);
1034 _lwkt_pullmsg(port
, msg
);
1037 return(msg
->ms_error
);
1042 lwkt_serialize_waitport(lwkt_port_t port
, int flags
)
1047 ASSERT_SERIALIZED(port
->mpu_serialize
);
1049 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
1050 port
->mp_flags
|= MSGPORTF_WAITING
;
1051 error
= zsleep(port
, port
->mpu_serialize
, flags
, "waitport", 0);
1052 /* see note at the top on the MSGPORTF_WAITING flag */
1056 _lwkt_pullmsg(port
, msg
);
1062 lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1064 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
1065 ASSERT_SERIALIZED(port
->mpu_serialize
);
1067 if (msg
->ms_flags
& MSGF_SYNC
) {
1069 * If a synchronous completion has been requested, just wakeup
1070 * the message without bothering to queue it to the target port.
1072 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1076 * If an asynchronous completion has been requested the message
1077 * must be queued to the reply port.
1079 _lwkt_enqueue_reply(port
, msg
);
1080 if (port
->mp_flags
& MSGPORTF_WAITING
) {
1081 port
->mp_flags
&= ~MSGPORTF_WAITING
;
1087 /************************************************************************
1088 * PANIC AND SPECIAL PORT FUNCTIONS *
1089 ************************************************************************/
1092 * You can point a port's reply vector at this function if you just want
1093 * the message marked done, without any queueing or signaling. This is
1094 * often used for structure-embedded messages.
1098 lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1100 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1105 lwkt_panic_getport(lwkt_port_t port
)
1107 panic("lwkt_getport() illegal on port %p", port
);
1112 lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
)
1114 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port
, msg
);
1119 lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
)
1121 panic("port %p msg %p cannot be waited on", msg
->ms_reply_port
, msg
);
1126 lwkt_panic_waitport(lwkt_port_t port
, int flags
)
1128 panic("port %p cannot be waited on", port
);
1133 lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1135 panic("lwkt_replymsg() is illegal on port %p msg %p", port
, msg
);
1140 lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
1142 panic("lwkt_dropmsg() is illegal on port %p msg %p", port
, msg
);