2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
71 #include <machine/smp.h>
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG
, "lwkt message", "lwkt message");
77 /************************************************************************
79 ************************************************************************/
84 * Request asynchronous completion and call lwkt_beginmsg(). The
85 * target port can opt to execute the message synchronously or
86 * asynchronously and this function will automatically queue the
87 * response if the target executes the message synchronously.
89 * NOTE: The message is in an indeterminant state until this call
90 * returns. The caller should not mess with it (e.g. try to abort it)
94 lwkt_sendmsg(lwkt_port_t port
, lwkt_msg_t msg
)
98 KKASSERT(msg
->ms_reply_port
!= NULL
&&
99 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
100 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_SYNC
| MSGF_DONE
);
101 if ((error
= lwkt_beginmsg(port
, msg
)) != EASYNC
) {
102 lwkt_replymsg(msg
, error
);
109 * Request asynchronous completion and call lwkt_beginmsg(). The
110 * target port can opt to execute the message synchronously or
111 * asynchronously and this function will automatically queue the
112 * response if the target executes the message synchronously.
115 lwkt_domsg(lwkt_port_t port
, lwkt_msg_t msg
, int flags
)
119 KKASSERT(msg
->ms_reply_port
!= NULL
&&
120 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
121 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_DONE
);
122 msg
->ms_flags
|= MSGF_SYNC
;
123 if ((error
= lwkt_beginmsg(port
, msg
)) == EASYNC
) {
124 error
= lwkt_waitmsg(msg
, flags
);
126 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
134 * Forward a message received on one port to another port.
137 lwkt_forwardmsg(lwkt_port_t port
, lwkt_msg_t msg
)
142 KKASSERT((msg
->ms_flags
& (MSGF_QUEUED
|MSGF_DONE
|MSGF_REPLY
)) == 0);
143 if ((error
= port
->mp_putport(port
, msg
)) != EASYNC
)
144 lwkt_replymsg(msg
, error
);
152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
153 * The caller must ensure that the message will not be both replied AND
154 * destroyed while the abort is in progress.
156 * This function issues a callback which might block!
159 lwkt_abortmsg(lwkt_msg_t msg
)
162 * A critical section protects us from reply IPIs on this cpu.
167 * Shortcut the operation if the message has already been returned.
168 * The callback typically constructs a lwkt_msg with the abort request,
169 * issues it synchronously, and waits for completion. The callback
170 * is not required to actually abort the message and the target port,
171 * upon receiving an abort request message generated by the callback
172 * should check whether the original message has already completed or
175 if (msg
->ms_flags
& MSGF_ABORTABLE
) {
176 if ((msg
->ms_flags
& (MSGF_DONE
|MSGF_REPLY
)) == 0)
177 msg
->ms_abortfn(msg
);
182 /************************************************************************
183 * PORT INITIALIZATION API *
184 ************************************************************************/
186 static void *lwkt_thread_getport(lwkt_port_t port
);
187 static int lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
);
189 static void *lwkt_thread_waitport(lwkt_port_t port
, int flags
);
190 static void lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
191 static void lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
193 static void *lwkt_spin_getport(lwkt_port_t port
);
194 static int lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
);
196 static void *lwkt_spin_waitport(lwkt_port_t port
, int flags
);
197 static void lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
199 static void *lwkt_serialize_getport(lwkt_port_t port
);
200 static int lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
);
201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
);
202 static void *lwkt_serialize_waitport(lwkt_port_t port
, int flags
);
203 static void lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
205 static void lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
206 static void *lwkt_panic_getport(lwkt_port_t port
);
207 static int lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
);
208 static int lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
);
209 static void *lwkt_panic_waitport(lwkt_port_t port
, int flags
);
210 static void lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
211 static void lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
214 * Core port initialization (internal)
218 _lwkt_initport(lwkt_port_t port
,
219 void *(*gportfn
)(lwkt_port_t
),
220 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
),
221 int (*wmsgfn
)(lwkt_msg_t
, int),
222 void *(*wportfn
)(lwkt_port_t
, int),
223 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
),
224 void (*dmsgfn
)(lwkt_port_t
, lwkt_msg_t
))
226 bzero(port
, sizeof(*port
));
227 TAILQ_INIT(&port
->mp_msgq
);
228 TAILQ_INIT(&port
->mp_msgq_prio
);
229 port
->mp_getport
= gportfn
;
230 port
->mp_putport
= pportfn
;
231 port
->mp_waitmsg
= wmsgfn
;
232 port
->mp_waitport
= wportfn
;
233 port
->mp_replyport
= rportfn
;
234 port
->mp_dropmsg
= dmsgfn
;
238 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
239 * we tell the scheduler not to reschedule if td is at a higher priority.
241 * This routine is called even if the thread is already scheduled so messages
242 * without NORESCHED will cause the target thread to be rescheduled even if
243 * prior messages did not.
247 _lwkt_schedule_msg(thread_t td
, int flags
)
249 if (flags
& MSGF_NORESCHED
)
250 lwkt_schedule_noresched(td
);
256 * lwkt_initport_thread()
258 * Initialize a port for use by a particular thread. The port may
259 * only be used by <td>.
262 lwkt_initport_thread(lwkt_port_t port
, thread_t td
)
268 lwkt_thread_waitport
,
269 lwkt_thread_replyport
,
270 lwkt_thread_dropmsg
);
275 * lwkt_initport_spin()
277 * Initialize a port for use with descriptors that might be accessed
278 * via multiple LWPs, processes, or threads. Has somewhat more
279 * overhead then thread ports.
282 lwkt_initport_spin(lwkt_port_t port
)
291 spin_init(&port
->mpu_spin
);
295 * lwkt_initport_serialize()
297 * Initialize a port for use with descriptors that might be accessed
298 * via multiple LWPs, processes, or threads. Callers are assumed to
299 * have held the serializer (slz).
302 lwkt_initport_serialize(lwkt_port_t port
, struct lwkt_serialize
*slz
)
305 lwkt_serialize_getport
,
306 lwkt_serialize_putport
,
307 lwkt_serialize_waitmsg
,
308 lwkt_serialize_waitport
,
309 lwkt_serialize_replyport
,
311 port
->mpu_serialize
= slz
;
315 * Similar to the standard initport, this function simply marks the message
316 * as being done and does not attempt to return it to an originating port.
319 lwkt_initport_replyonly_null(lwkt_port_t port
)
331 * Initialize a reply-only port, typically used as a message sink. Such
332 * ports can only be used as a reply port.
335 lwkt_initport_replyonly(lwkt_port_t port
,
336 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
))
338 _lwkt_initport(port
, lwkt_panic_getport
, lwkt_panic_putport
,
339 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
340 rportfn
, lwkt_panic_dropmsg
);
344 lwkt_initport_putonly(lwkt_port_t port
,
345 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
))
347 _lwkt_initport(port
, lwkt_panic_getport
, pportfn
,
348 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
349 lwkt_panic_replyport
, lwkt_panic_dropmsg
);
353 lwkt_initport_panic(lwkt_port_t port
)
356 lwkt_panic_getport
, lwkt_panic_putport
,
357 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
358 lwkt_panic_replyport
, lwkt_panic_dropmsg
);
363 _lwkt_pullmsg(lwkt_port_t port
, lwkt_msg_t msg
)
365 lwkt_msg_queue
*queue
;
368 * normal case, remove and return the message.
370 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
371 queue
= &port
->mp_msgq_prio
;
373 queue
= &port
->mp_msgq
;
374 TAILQ_REMOVE(queue
, msg
, ms_node
);
375 msg
->ms_flags
&= ~MSGF_QUEUED
;
380 _lwkt_pushmsg(lwkt_port_t port
, lwkt_msg_t msg
)
382 lwkt_msg_queue
*queue
;
384 msg
->ms_flags
|= MSGF_QUEUED
;
385 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
386 queue
= &port
->mp_msgq_prio
;
388 queue
= &port
->mp_msgq
;
389 TAILQ_INSERT_TAIL(queue
, msg
, ms_node
);
394 _lwkt_pollmsg(lwkt_port_t port
)
398 msg
= TAILQ_FIRST(&port
->mp_msgq_prio
);
399 if (__predict_false(msg
!= NULL
))
403 * Priority queue has no message, fallback to non-priority queue.
405 return TAILQ_FIRST(&port
->mp_msgq
);
410 _lwkt_enqueue_reply(lwkt_port_t port
, lwkt_msg_t msg
)
412 _lwkt_pushmsg(port
, msg
);
413 msg
->ms_flags
|= MSGF_REPLY
| MSGF_DONE
;
416 /************************************************************************
417 * THREAD PORT BACKEND *
418 ************************************************************************
420 * This backend is used when the port a message is retrieved from is owned
421 * by a single thread (the calling thread). Messages are IPId to the
422 * correct cpu before being enqueued to a port. Note that this is fairly
423 * optimal since scheduling would have had to do an IPI anyway if the
424 * message were headed to a different cpu.
430 * This function completes reply processing for the default case in the
431 * context of the originating cpu.
435 lwkt_thread_replyport_remote(lwkt_msg_t msg
)
437 lwkt_port_t port
= msg
->ms_reply_port
;
441 * Chase any thread migration that occurs
443 if (port
->mpu_td
->td_gd
!= mycpu
) {
444 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
445 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
453 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
454 msg
->ms_flags
&= ~MSGF_INTRANSIT
;
456 flags
= msg
->ms_flags
;
457 if (msg
->ms_flags
& MSGF_SYNC
) {
459 msg
->ms_flags
|= MSGF_REPLY
| MSGF_DONE
;
461 _lwkt_enqueue_reply(port
, msg
);
463 if (port
->mp_flags
& MSGPORTF_WAITING
)
464 _lwkt_schedule_msg(port
->mpu_td
, flags
);
470 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
472 * Called with the reply port as an argument but in the context of the
473 * original target port. Completion must occur on the target port's
476 * The critical section protects us from IPIs on the this CPU.
480 lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
484 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
|MSGF_INTRANSIT
)) == 0);
486 if (msg
->ms_flags
& MSGF_SYNC
) {
488 * If a synchronous completion has been requested, just wakeup
489 * the message without bothering to queue it to the target port.
491 * Assume the target thread is non-preemptive, so no critical
492 * section is required.
495 if (port
->mpu_td
->td_gd
== mycpu
) {
497 flags
= msg
->ms_flags
;
499 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
500 if (port
->mp_flags
& MSGPORTF_WAITING
)
501 _lwkt_schedule_msg(port
->mpu_td
, flags
);
505 msg
->ms_flags
|= MSGF_INTRANSIT
;
507 msg
->ms_flags
|= MSGF_REPLY
;
508 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
509 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
514 * If an asynchronous completion has been requested the message
515 * must be queued to the reply port.
517 * A critical section is required to interlock the port queue.
520 if (port
->mpu_td
->td_gd
== mycpu
) {
523 _lwkt_enqueue_reply(port
, msg
);
524 if (port
->mp_flags
& MSGPORTF_WAITING
)
525 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
530 msg
->ms_flags
|= MSGF_INTRANSIT
;
532 msg
->ms_flags
|= MSGF_REPLY
;
533 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
534 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
541 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
543 * This function could _only_ be used when caller is in the same thread
544 * as the message's target port owner thread.
547 lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
549 KASSERT(port
->mpu_td
== curthread
,
550 ("message could only be dropped in the same thread "
551 "as the message target port thread\n"));
552 crit_enter_quick(port
->mpu_td
);
553 _lwkt_pullmsg(port
, msg
);
554 msg
->ms_flags
|= MSGF_DONE
;
555 crit_exit_quick(port
->mpu_td
);
559 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
561 * Called with the target port as an argument but in the context of the
562 * reply port. This function always implements an asynchronous put to
563 * the target message port, and thus returns EASYNC.
565 * The message must already have cleared MSGF_DONE and MSGF_REPLY
572 lwkt_thread_putport_remote(lwkt_msg_t msg
)
574 lwkt_port_t port
= msg
->ms_target_port
;
577 * Chase any thread migration that occurs
579 if (port
->mpu_td
->td_gd
!= mycpu
) {
580 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
581 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
589 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
590 msg
->ms_flags
&= ~MSGF_INTRANSIT
;
592 _lwkt_pushmsg(port
, msg
);
593 if (port
->mp_flags
& MSGPORTF_WAITING
)
594 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
601 lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
)
603 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
605 msg
->ms_target_port
= port
;
607 if (port
->mpu_td
->td_gd
== mycpu
) {
610 _lwkt_pushmsg(port
, msg
);
611 if (port
->mp_flags
& MSGPORTF_WAITING
)
612 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
617 msg
->ms_flags
|= MSGF_INTRANSIT
;
619 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
620 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
627 * lwkt_thread_getport()
629 * Retrieve the next message from the port or NULL if no messages
634 lwkt_thread_getport(lwkt_port_t port
)
638 KKASSERT(port
->mpu_td
== curthread
);
640 crit_enter_quick(port
->mpu_td
);
641 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
642 _lwkt_pullmsg(port
, msg
);
643 crit_exit_quick(port
->mpu_td
);
648 * lwkt_thread_waitmsg()
650 * Wait for a particular message to be replied. We must be the only
651 * thread waiting on the message. The port must be owned by the
656 lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
)
658 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
659 ("can't wait dropable message\n"));
661 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
663 * If the done bit was not set we have to block until it is.
665 lwkt_port_t port
= msg
->ms_reply_port
;
666 thread_t td
= curthread
;
669 KKASSERT(port
->mpu_td
== td
);
670 crit_enter_quick(td
);
673 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
674 port
->mp_flags
|= MSGPORTF_WAITING
;
675 if (sentabort
== 0) {
676 if ((sentabort
= lwkt_sleep("waitmsg", flags
)) != 0) {
680 lwkt_sleep("waitabt", 0);
682 port
->mp_flags
&= ~MSGPORTF_WAITING
;
684 if (msg
->ms_flags
& MSGF_QUEUED
)
685 _lwkt_pullmsg(port
, msg
);
689 * If the done bit was set we only have to mess around with the
690 * message if it is queued on the reply port.
692 if (msg
->ms_flags
& MSGF_QUEUED
) {
693 lwkt_port_t port
= msg
->ms_reply_port
;
694 thread_t td
= curthread
;
696 KKASSERT(port
->mpu_td
== td
);
697 crit_enter_quick(td
);
698 _lwkt_pullmsg(port
, msg
);
702 return(msg
->ms_error
);
707 lwkt_thread_waitport(lwkt_port_t port
, int flags
)
709 thread_t td
= curthread
;
713 KKASSERT(port
->mpu_td
== td
);
714 crit_enter_quick(td
);
715 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
716 port
->mp_flags
|= MSGPORTF_WAITING
;
717 error
= lwkt_sleep("waitport", flags
);
718 port
->mp_flags
&= ~MSGPORTF_WAITING
;
722 _lwkt_pullmsg(port
, msg
);
728 /************************************************************************
729 * SPIN PORT BACKEND *
730 ************************************************************************
732 * This backend uses spinlocks instead of making assumptions about which
733 * thread is accessing the port. It must be used when a port is not owned
734 * by a particular thread. This is less optimal then thread ports but
735 * you don't have a choice if there are multiple threads accessing the port.
737 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
738 * on the message port, it is the responsibility of the code doing the
739 * wakeup to clear this flag rather then the blocked threads. Some
740 * superfluous wakeups may occur, which is ok.
742 * XXX synchronous message wakeups are not current optimized.
747 lwkt_spin_getport(lwkt_port_t port
)
751 spin_lock_wr(&port
->mpu_spin
);
752 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
753 _lwkt_pullmsg(port
, msg
);
754 spin_unlock_wr(&port
->mpu_spin
);
760 lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
)
764 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
766 msg
->ms_target_port
= port
;
767 spin_lock_wr(&port
->mpu_spin
);
768 _lwkt_pushmsg(port
, msg
);
770 if (port
->mp_flags
& MSGPORTF_WAITING
) {
771 port
->mp_flags
&= ~MSGPORTF_WAITING
;
774 spin_unlock_wr(&port
->mpu_spin
);
782 lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
)
788 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
789 ("can't wait dropable message\n"));
791 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
792 port
= msg
->ms_reply_port
;
794 spin_lock_wr(&port
->mpu_spin
);
795 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
799 * If message was sent synchronously from the beginning
800 * the wakeup will be on the message structure, else it
801 * will be on the port structure.
803 if (msg
->ms_flags
& MSGF_SYNC
) {
807 port
->mp_flags
|= MSGPORTF_WAITING
;
811 * Only messages which support abort can be interrupted.
812 * We must still wait for message completion regardless.
814 if ((flags
& PCATCH
) && sentabort
== 0) {
815 error
= ssleep(won
, &port
->mpu_spin
, PCATCH
, "waitmsg", 0);
818 spin_unlock_wr(&port
->mpu_spin
);
820 spin_lock_wr(&port
->mpu_spin
);
823 error
= ssleep(won
, &port
->mpu_spin
, 0, "waitmsg", 0);
825 /* see note at the top on the MSGPORTF_WAITING flag */
828 * Turn EINTR into ERESTART if the signal indicates.
830 if (sentabort
&& msg
->ms_error
== EINTR
)
831 msg
->ms_error
= sentabort
;
832 if (msg
->ms_flags
& MSGF_QUEUED
)
833 _lwkt_pullmsg(port
, msg
);
834 spin_unlock_wr(&port
->mpu_spin
);
836 if (msg
->ms_flags
& MSGF_QUEUED
) {
837 port
= msg
->ms_reply_port
;
838 spin_lock_wr(&port
->mpu_spin
);
839 _lwkt_pullmsg(port
, msg
);
840 spin_unlock_wr(&port
->mpu_spin
);
843 return(msg
->ms_error
);
848 lwkt_spin_waitport(lwkt_port_t port
, int flags
)
853 spin_lock_wr(&port
->mpu_spin
);
854 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
855 port
->mp_flags
|= MSGPORTF_WAITING
;
856 error
= ssleep(port
, &port
->mpu_spin
, flags
, "waitport", 0);
857 /* see note at the top on the MSGPORTF_WAITING flag */
859 spin_unlock_wr(&port
->mpu_spin
);
863 _lwkt_pullmsg(port
, msg
);
864 spin_unlock_wr(&port
->mpu_spin
);
870 lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
874 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
876 if (msg
->ms_flags
& MSGF_SYNC
) {
878 * If a synchronous completion has been requested, just wakeup
879 * the message without bothering to queue it to the target port.
881 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
885 * If an asynchronous completion has been requested the message
886 * must be queued to the reply port.
888 spin_lock_wr(&port
->mpu_spin
);
889 _lwkt_enqueue_reply(port
, msg
);
891 if (port
->mp_flags
& MSGPORTF_WAITING
) {
892 port
->mp_flags
&= ~MSGPORTF_WAITING
;
895 spin_unlock_wr(&port
->mpu_spin
);
901 /************************************************************************
902 * SERIALIZER PORT BACKEND *
903 ************************************************************************
905 * This backend uses serializer to protect port accessing. Callers are
906 * assumed to have serializer held. This kind of port is usually created
907 * by network device driver along with _one_ lwkt thread to pipeline
908 * operations which may temporarily release serializer.
910 * Implementation is based on SPIN PORT BACKEND.
915 lwkt_serialize_getport(lwkt_port_t port
)
919 ASSERT_SERIALIZED(port
->mpu_serialize
);
921 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
922 _lwkt_pullmsg(port
, msg
);
928 lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
)
930 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
931 ASSERT_SERIALIZED(port
->mpu_serialize
);
933 msg
->ms_target_port
= port
;
934 _lwkt_pushmsg(port
, msg
);
935 if (port
->mp_flags
& MSGPORTF_WAITING
) {
936 port
->mp_flags
&= ~MSGPORTF_WAITING
;
944 lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
)
950 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
951 ("can't wait dropable message\n"));
953 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
954 port
= msg
->ms_reply_port
;
956 ASSERT_SERIALIZED(port
->mpu_serialize
);
959 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
963 * If message was sent synchronously from the beginning
964 * the wakeup will be on the message structure, else it
965 * will be on the port structure.
967 if (msg
->ms_flags
& MSGF_SYNC
) {
971 port
->mp_flags
|= MSGPORTF_WAITING
;
975 * Only messages which support abort can be interrupted.
976 * We must still wait for message completion regardless.
978 if ((flags
& PCATCH
) && sentabort
== 0) {
979 error
= zsleep(won
, port
->mpu_serialize
, PCATCH
, "waitmsg", 0);
982 lwkt_serialize_exit(port
->mpu_serialize
);
984 lwkt_serialize_enter(port
->mpu_serialize
);
987 error
= zsleep(won
, port
->mpu_serialize
, 0, "waitmsg", 0);
989 /* see note at the top on the MSGPORTF_WAITING flag */
992 * Turn EINTR into ERESTART if the signal indicates.
994 if (sentabort
&& msg
->ms_error
== EINTR
)
995 msg
->ms_error
= sentabort
;
996 if (msg
->ms_flags
& MSGF_QUEUED
)
997 _lwkt_pullmsg(port
, msg
);
999 if (msg
->ms_flags
& MSGF_QUEUED
) {
1000 port
= msg
->ms_reply_port
;
1002 ASSERT_SERIALIZED(port
->mpu_serialize
);
1003 _lwkt_pullmsg(port
, msg
);
1006 return(msg
->ms_error
);
1011 lwkt_serialize_waitport(lwkt_port_t port
, int flags
)
1016 ASSERT_SERIALIZED(port
->mpu_serialize
);
1018 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
1019 port
->mp_flags
|= MSGPORTF_WAITING
;
1020 error
= zsleep(port
, port
->mpu_serialize
, flags
, "waitport", 0);
1021 /* see note at the top on the MSGPORTF_WAITING flag */
1025 _lwkt_pullmsg(port
, msg
);
1031 lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1033 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
1034 ASSERT_SERIALIZED(port
->mpu_serialize
);
1036 if (msg
->ms_flags
& MSGF_SYNC
) {
1038 * If a synchronous completion has been requested, just wakeup
1039 * the message without bothering to queue it to the target port.
1041 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1045 * If an asynchronous completion has been requested the message
1046 * must be queued to the reply port.
1048 _lwkt_enqueue_reply(port
, msg
);
1049 if (port
->mp_flags
& MSGPORTF_WAITING
) {
1050 port
->mp_flags
&= ~MSGPORTF_WAITING
;
1056 /************************************************************************
1057 * PANIC AND SPECIAL PORT FUNCTIONS *
1058 ************************************************************************/
1061 * You can point a port's reply vector at this function if you just want
1062 * the message marked done, without any queueing or signaling. This is
1063 * often used for structure-embedded messages.
1067 lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1069 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1074 lwkt_panic_getport(lwkt_port_t port
)
1076 panic("lwkt_getport() illegal on port %p", port
);
1081 lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
)
1083 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port
, msg
);
1088 lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
)
1090 panic("port %p msg %p cannot be waited on", msg
->ms_reply_port
, msg
);
1095 lwkt_panic_waitport(lwkt_port_t port
, int flags
)
1097 panic("port %p cannot be waited on", port
);
1102 lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1104 panic("lwkt_replymsg() is illegal on port %p msg %p", port
, msg
);
1109 lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
1111 panic("lwkt_dropmsg() is illegal on port %p msg %p", port
, msg
);