2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG
, "lwkt message", "lwkt message");
73 /************************************************************************
75 ************************************************************************/
78 lwkt_beginmsg(lwkt_port_t port
, lwkt_msg_t msg
)
80 return port
->mp_putport(port
, msg
);
84 lwkt_beginmsg_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
86 return port
->mp_putport_oncpu(port
, msg
);
90 _lwkt_sendmsg_prepare(lwkt_port_t port
, lwkt_msg_t msg
)
92 KKASSERT(msg
->ms_reply_port
!= NULL
&&
93 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
94 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_SYNC
| MSGF_DONE
);
98 _lwkt_sendmsg_start(lwkt_port_t port
, lwkt_msg_t msg
)
102 if ((error
= lwkt_beginmsg(port
, msg
)) != EASYNC
) {
104 * Target port opted to execute the message synchronously so
105 * queue the response.
107 lwkt_replymsg(msg
, error
);
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
116 if ((error
= lwkt_beginmsg_oncpu(port
, msg
)) != EASYNC
) {
118 * Target port opted to execute the message synchronously so
119 * queue the response.
121 lwkt_replymsg(msg
, error
);
128 * Request asynchronous completion and call lwkt_beginmsg(). The
129 * target port can opt to execute the message synchronously or
130 * asynchronously and this function will automatically queue the
131 * response if the target executes the message synchronously.
133 * NOTE: The message is in an indeterminant state until this call
134 * returns. The caller should not mess with it (e.g. try to abort it)
137 * NOTE: Do not use this function to forward a message as we might
138 * clobber ms_flags in a SMP race.
141 lwkt_sendmsg(lwkt_port_t port
, lwkt_msg_t msg
)
143 _lwkt_sendmsg_prepare(port
, msg
);
144 _lwkt_sendmsg_start(port
, msg
);
148 lwkt_sendmsg_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
150 _lwkt_sendmsg_prepare(port
, msg
);
151 _lwkt_sendmsg_start_oncpu(port
, msg
);
155 lwkt_sendmsg_prepare(lwkt_port_t port
, lwkt_msg_t msg
)
157 _lwkt_sendmsg_prepare(port
, msg
);
161 lwkt_sendmsg_start(lwkt_port_t port
, lwkt_msg_t msg
)
163 _lwkt_sendmsg_start(port
, msg
);
167 lwkt_sendmsg_start_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
169 _lwkt_sendmsg_start_oncpu(port
, msg
);
175 * Request synchronous completion and call lwkt_beginmsg(). The
176 * target port can opt to execute the message synchronously or
177 * asynchronously and this function will automatically block and
178 * wait for a response if the target executes the message
181 * NOTE: Do not use this function to forward a message as we might
182 * clobber ms_flags in a SMP race.
185 lwkt_domsg(lwkt_port_t port
, lwkt_msg_t msg
, int flags
)
189 KKASSERT(msg
->ms_reply_port
!= NULL
&&
190 (msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == MSGF_DONE
);
191 msg
->ms_flags
&= ~(MSGF_REPLY
| MSGF_DONE
);
192 msg
->ms_flags
|= MSGF_SYNC
;
193 if ((error
= lwkt_beginmsg(port
, msg
)) == EASYNC
) {
195 * Target port opted to execute the message asynchronously so
196 * block and wait for a reply.
198 error
= lwkt_waitmsg(msg
, flags
);
200 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
208 * Forward a message received on one port to another port.
211 lwkt_forwardmsg(lwkt_port_t port
, lwkt_msg_t msg
)
215 KKASSERT((msg
->ms_flags
& (MSGF_QUEUED
|MSGF_DONE
|MSGF_REPLY
)) == 0);
216 if ((error
= port
->mp_putport(port
, msg
)) != EASYNC
)
217 lwkt_replymsg(msg
, error
);
224 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
225 * The caller must ensure that the message will not be both replied AND
226 * destroyed while the abort is in progress.
228 * This function issues a callback which might block!
231 lwkt_abortmsg(lwkt_msg_t msg
)
234 * A critical section protects us from reply IPIs on this cpu.
239 * Shortcut the operation if the message has already been returned.
240 * The callback typically constructs a lwkt_msg with the abort request,
241 * issues it synchronously, and waits for completion. The callback
242 * is not required to actually abort the message and the target port,
243 * upon receiving an abort request message generated by the callback
244 * should check whether the original message has already completed or
247 if (msg
->ms_flags
& MSGF_ABORTABLE
) {
248 if ((msg
->ms_flags
& (MSGF_DONE
|MSGF_REPLY
)) == 0)
249 msg
->ms_abortfn(msg
);
254 /************************************************************************
255 * PORT INITIALIZATION API *
256 ************************************************************************/
258 static void *lwkt_thread_getport(lwkt_port_t port
);
259 static int lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
);
260 static int lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
);
261 static void *lwkt_thread_waitport(lwkt_port_t port
, int flags
);
262 static void lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
263 static int lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
265 static void *lwkt_spin_getport(lwkt_port_t port
);
266 static int lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
);
267 static int lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
);
268 static void *lwkt_spin_waitport(lwkt_port_t port
, int flags
);
269 static void lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
270 static int lwkt_spin_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
271 static int lwkt_spin_putport_oncpu(lwkt_port_t port
, lwkt_msg_t msg
);
273 static void *lwkt_serialize_getport(lwkt_port_t port
);
274 static int lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
);
275 static int lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
);
276 static void *lwkt_serialize_waitport(lwkt_port_t port
, int flags
);
277 static void lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
279 static void lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
280 static void *lwkt_panic_getport(lwkt_port_t port
);
281 static int lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
);
282 static int lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
);
283 static void *lwkt_panic_waitport(lwkt_port_t port
, int flags
);
284 static void lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
);
285 static int lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
);
286 static int lwkt_panic_putport_oncpu(lwkt_port_t port
, lwkt_msg_t msg
);
289 * Core port initialization (internal)
293 _lwkt_initport(lwkt_port_t port
,
294 void *(*gportfn
)(lwkt_port_t
),
295 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
),
296 int (*wmsgfn
)(lwkt_msg_t
, int),
297 void *(*wportfn
)(lwkt_port_t
, int),
298 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
),
299 int (*dmsgfn
)(lwkt_port_t
, lwkt_msg_t
),
300 int (*pportfn_oncpu
)(lwkt_port_t
, lwkt_msg_t
))
302 bzero(port
, sizeof(*port
));
304 TAILQ_INIT(&port
->mp_msgq
);
305 TAILQ_INIT(&port
->mp_msgq_prio
);
306 port
->mp_getport
= gportfn
;
307 port
->mp_putport
= pportfn
;
308 port
->mp_waitmsg
= wmsgfn
;
309 port
->mp_waitport
= wportfn
;
310 port
->mp_replyport
= rportfn
;
311 port
->mp_dropmsg
= dmsgfn
;
312 port
->mp_putport_oncpu
= pportfn_oncpu
;
316 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
317 * we tell the scheduler not to reschedule if td is at a higher priority.
319 * This routine is called even if the thread is already scheduled.
323 _lwkt_schedule_msg(thread_t td
, int flags
)
329 * lwkt_initport_thread()
331 * Initialize a port for use by a particular thread. The port may
332 * only be used by <td>.
335 lwkt_initport_thread(lwkt_port_t port
, thread_t td
)
341 lwkt_thread_waitport
,
342 lwkt_thread_replyport
,
344 lwkt_thread_putport
);
349 * lwkt_initport_spin()
351 * Initialize a port for use with descriptors that might be accessed
352 * via multiple LWPs, processes, or threads. Has somewhat more
353 * overhead then thread ports.
356 lwkt_initport_spin(lwkt_port_t port
, thread_t td
, boolean_t fixed_cpuid
)
358 int (*dmsgfn
)(lwkt_port_t
, lwkt_msg_t
);
359 int (*pportfn_oncpu
)(lwkt_port_t
, lwkt_msg_t
);
362 dmsgfn
= lwkt_panic_dropmsg
;
364 dmsgfn
= lwkt_spin_dropmsg
;
367 pportfn_oncpu
= lwkt_spin_putport_oncpu
;
369 pportfn_oncpu
= lwkt_panic_putport_oncpu
;
379 spin_init(&port
->mpu_spin
, "lwktinitport");
382 port
->mp_cpuid
= td
->td_gd
->gd_cpuid
;
386 * lwkt_initport_serialize()
388 * Initialize a port for use with descriptors that might be accessed
389 * via multiple LWPs, processes, or threads. Callers are assumed to
390 * have held the serializer (slz).
393 lwkt_initport_serialize(lwkt_port_t port
, struct lwkt_serialize
*slz
)
396 lwkt_serialize_getport
,
397 lwkt_serialize_putport
,
398 lwkt_serialize_waitmsg
,
399 lwkt_serialize_waitport
,
400 lwkt_serialize_replyport
,
402 lwkt_panic_putport_oncpu
);
403 port
->mpu_serialize
= slz
;
407 * Similar to the standard initport, this function simply marks the message
408 * as being done and does not attempt to return it to an originating port.
411 lwkt_initport_replyonly_null(lwkt_port_t port
)
420 lwkt_panic_putport_oncpu
);
424 * Initialize a reply-only port, typically used as a message sink. Such
425 * ports can only be used as a reply port.
428 lwkt_initport_replyonly(lwkt_port_t port
,
429 void (*rportfn
)(lwkt_port_t
, lwkt_msg_t
))
431 _lwkt_initport(port
, lwkt_panic_getport
, lwkt_panic_putport
,
432 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
433 rportfn
, lwkt_panic_dropmsg
,
434 lwkt_panic_putport_oncpu
);
438 lwkt_initport_putonly(lwkt_port_t port
,
439 int (*pportfn
)(lwkt_port_t
, lwkt_msg_t
))
441 _lwkt_initport(port
, lwkt_panic_getport
, pportfn
,
442 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
443 lwkt_panic_replyport
, lwkt_panic_dropmsg
,
444 lwkt_panic_putport_oncpu
);
448 lwkt_initport_panic(lwkt_port_t port
)
451 lwkt_panic_getport
, lwkt_panic_putport
,
452 lwkt_panic_waitmsg
, lwkt_panic_waitport
,
453 lwkt_panic_replyport
, lwkt_panic_dropmsg
,
454 lwkt_panic_putport_oncpu
);
459 _lwkt_pullmsg(lwkt_port_t port
, lwkt_msg_t msg
)
461 lwkt_msg_queue
*queue
;
464 * normal case, remove and return the message.
466 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
467 queue
= &port
->mp_msgq_prio
;
469 queue
= &port
->mp_msgq
;
470 TAILQ_REMOVE(queue
, msg
, ms_node
);
473 * atomic op needed for spin ports
475 atomic_clear_int(&msg
->ms_flags
, MSGF_QUEUED
);
480 _lwkt_pushmsg(lwkt_port_t port
, lwkt_msg_t msg
)
482 lwkt_msg_queue
*queue
;
485 * atomic op needed for spin ports
487 atomic_set_int(&msg
->ms_flags
, MSGF_QUEUED
);
488 if (__predict_false(msg
->ms_flags
& MSGF_PRIORITY
))
489 queue
= &port
->mp_msgq_prio
;
491 queue
= &port
->mp_msgq
;
492 TAILQ_INSERT_TAIL(queue
, msg
, ms_node
);
494 if (msg
->ms_flags
& MSGF_RECEIPT
) {
496 * In case this message is forwarded later, the receipt
497 * flag was cleared once the receipt function is called.
499 atomic_clear_int(&msg
->ms_flags
, MSGF_RECEIPT
);
500 msg
->ms_receiptfn(msg
, port
);
506 _lwkt_pollmsg(lwkt_port_t port
)
510 msg
= TAILQ_FIRST(&port
->mp_msgq_prio
);
511 if (__predict_false(msg
!= NULL
))
515 * Priority queue has no message, fallback to non-priority queue.
517 return TAILQ_FIRST(&port
->mp_msgq
);
522 _lwkt_enqueue_reply(lwkt_port_t port
, lwkt_msg_t msg
)
525 * atomic op needed for spin ports
527 _lwkt_pushmsg(port
, msg
);
528 atomic_set_int(&msg
->ms_flags
, MSGF_REPLY
| MSGF_DONE
);
531 /************************************************************************
532 * THREAD PORT BACKEND *
533 ************************************************************************
535 * This backend is used when the port a message is retrieved from is owned
536 * by a single thread (the calling thread). Messages are IPId to the
537 * correct cpu before being enqueued to a port. Note that this is fairly
538 * optimal since scheduling would have had to do an IPI anyway if the
539 * message were headed to a different cpu.
543 * This function completes reply processing for the default case in the
544 * context of the originating cpu.
548 lwkt_thread_replyport_remote(lwkt_msg_t msg
)
550 lwkt_port_t port
= msg
->ms_reply_port
;
554 * Chase any thread migration that occurs
556 if (port
->mpu_td
->td_gd
!= mycpu
) {
557 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
558 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
563 * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
566 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
567 msg
->ms_flags
&= ~MSGF_INTRANSIT
;
569 flags
= msg
->ms_flags
;
570 if (msg
->ms_flags
& MSGF_SYNC
) {
572 msg
->ms_flags
|= MSGF_REPLY
| MSGF_DONE
;
574 _lwkt_enqueue_reply(port
, msg
);
576 if (port
->mp_flags
& MSGPORTF_WAITING
)
577 _lwkt_schedule_msg(port
->mpu_td
, flags
);
581 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
583 * Called with the reply port as an argument but in the context of the
584 * original target port. Completion must occur on the target port's
587 * The critical section protects us from IPIs on the this CPU.
591 lwkt_thread_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
595 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
|MSGF_INTRANSIT
)) == 0);
597 if (msg
->ms_flags
& MSGF_SYNC
) {
599 * If a synchronous completion has been requested, just wakeup
600 * the message without bothering to queue it to the target port.
602 * Assume the target thread is non-preemptive, so no critical
603 * section is required.
605 if (port
->mpu_td
->td_gd
== mycpu
) {
607 flags
= msg
->ms_flags
;
609 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
610 if (port
->mp_flags
& MSGPORTF_WAITING
)
611 _lwkt_schedule_msg(port
->mpu_td
, flags
);
615 atomic_set_int(&msg
->ms_flags
, MSGF_INTRANSIT
);
617 atomic_set_int(&msg
->ms_flags
, MSGF_REPLY
);
618 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
619 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
623 * If an asynchronous completion has been requested the message
624 * must be queued to the reply port.
626 * A critical section is required to interlock the port queue.
628 if (port
->mpu_td
->td_gd
== mycpu
) {
630 _lwkt_enqueue_reply(port
, msg
);
631 if (port
->mp_flags
& MSGPORTF_WAITING
)
632 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
636 atomic_set_int(&msg
->ms_flags
, MSGF_INTRANSIT
);
638 atomic_set_int(&msg
->ms_flags
, MSGF_REPLY
);
639 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
640 (ipifunc1_t
)lwkt_thread_replyport_remote
, msg
);
646 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
648 * This function could _only_ be used when caller is in the same thread
649 * as the message's target port owner thread.
652 lwkt_thread_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
656 KASSERT(port
->mpu_td
== curthread
,
657 ("message could only be dropped in the same thread "
658 "as the message target port thread"));
659 crit_enter_quick(port
->mpu_td
);
660 if ((msg
->ms_flags
& (MSGF_REPLY
|MSGF_QUEUED
)) == MSGF_QUEUED
) {
661 _lwkt_pullmsg(port
, msg
);
662 atomic_set_int(&msg
->ms_flags
, MSGF_DONE
);
667 crit_exit_quick(port
->mpu_td
);
673 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
675 * Called with the target port as an argument but in the context of the
676 * reply port. This function always implements an asynchronous put to
677 * the target message port, and thus returns EASYNC.
679 * The message must already have cleared MSGF_DONE and MSGF_REPLY
683 lwkt_thread_putport_remote(lwkt_msg_t msg
)
685 lwkt_port_t port
= msg
->ms_target_port
;
688 * Chase any thread migration that occurs
690 if (port
->mpu_td
->td_gd
!= mycpu
) {
691 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
692 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
697 * An atomic op is needed on ms_flags vs originator. Also
698 * note that the originator might be using a different type
702 KKASSERT(msg
->ms_flags
& MSGF_INTRANSIT
);
703 atomic_clear_int(&msg
->ms_flags
, MSGF_INTRANSIT
);
705 _lwkt_pushmsg(port
, msg
);
706 if (port
->mp_flags
& MSGPORTF_WAITING
)
707 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
712 lwkt_thread_putport(lwkt_port_t port
, lwkt_msg_t msg
)
714 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
716 msg
->ms_target_port
= port
;
717 if (port
->mpu_td
->td_gd
== mycpu
) {
719 _lwkt_pushmsg(port
, msg
);
720 if (port
->mp_flags
& MSGPORTF_WAITING
)
721 _lwkt_schedule_msg(port
->mpu_td
, msg
->ms_flags
);
728 * An atomic op is needed on ms_flags vs originator. Also
729 * note that the originator might be using a different type
732 atomic_set_int(&msg
->ms_flags
, MSGF_INTRANSIT
);
734 lwkt_send_ipiq(port
->mpu_td
->td_gd
,
735 (ipifunc1_t
)lwkt_thread_putport_remote
, msg
);
741 * lwkt_thread_getport()
743 * Retrieve the next message from the port or NULL if no messages
748 lwkt_thread_getport(lwkt_port_t port
)
752 KKASSERT(port
->mpu_td
== curthread
);
754 crit_enter_quick(port
->mpu_td
);
755 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
756 _lwkt_pullmsg(port
, msg
);
757 crit_exit_quick(port
->mpu_td
);
762 * lwkt_thread_waitmsg()
764 * Wait for a particular message to be replied. We must be the only
765 * thread waiting on the message. The port must be owned by the
770 lwkt_thread_waitmsg(lwkt_msg_t msg
, int flags
)
772 thread_t td
= curthread
;
774 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
775 ("can't wait dropable message"));
777 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
779 * If the done bit was not set we have to block until it is.
781 lwkt_port_t port
= msg
->ms_reply_port
;
784 KKASSERT(port
->mpu_td
== td
);
785 crit_enter_quick(td
);
788 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
789 port
->mp_flags
|= MSGPORTF_WAITING
; /* same cpu */
790 if (sentabort
== 0) {
791 if ((sentabort
= lwkt_sleep("waitmsg", flags
)) != 0) {
795 lwkt_sleep("waitabt", 0);
797 port
->mp_flags
&= ~MSGPORTF_WAITING
;
799 if (msg
->ms_flags
& MSGF_QUEUED
)
800 _lwkt_pullmsg(port
, msg
);
804 * If the done bit was set we only have to mess around with the
805 * message if it is queued on the reply port.
807 crit_enter_quick(td
);
808 if (msg
->ms_flags
& MSGF_QUEUED
) {
809 lwkt_port_t port
= msg
->ms_reply_port
;
810 thread_t td __debugvar
= curthread
;
812 KKASSERT(port
->mpu_td
== td
);
813 _lwkt_pullmsg(port
, msg
);
817 return(msg
->ms_error
);
821 * lwkt_thread_waitport()
823 * Wait for a new message to be available on the port. We must be the
824 * the only thread waiting on the port. The port must be owned by caller.
828 lwkt_thread_waitport(lwkt_port_t port
, int flags
)
830 thread_t td
= curthread
;
834 KKASSERT(port
->mpu_td
== td
);
835 crit_enter_quick(td
);
836 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
837 port
->mp_flags
|= MSGPORTF_WAITING
;
838 error
= lwkt_sleep("waitport", flags
);
839 port
->mp_flags
&= ~MSGPORTF_WAITING
;
843 _lwkt_pullmsg(port
, msg
);
849 /************************************************************************
850 * SPIN PORT BACKEND *
851 ************************************************************************
853 * This backend uses spinlocks instead of making assumptions about which
854 * thread is accessing the port. It must be used when a port is not owned
855 * by a particular thread. This is less optimal then thread ports but
856 * you don't have a choice if there are multiple threads accessing the port.
858 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
859 * on the message port, it is the responsibility of the code doing the
860 * wakeup to clear this flag rather then the blocked threads. Some
861 * superfluous wakeups may occur, which is ok.
863 * XXX synchronous message wakeups are not current optimized.
868 lwkt_spin_getport(lwkt_port_t port
)
872 spin_lock(&port
->mpu_spin
);
873 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
874 _lwkt_pullmsg(port
, msg
);
875 spin_unlock(&port
->mpu_spin
);
880 lwkt_spin_putport_only(lwkt_port_t port
, lwkt_msg_t msg
)
884 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
886 msg
->ms_target_port
= port
;
887 spin_lock(&port
->mpu_spin
);
888 _lwkt_pushmsg(port
, msg
);
890 if (port
->mp_flags
& MSGPORTF_WAITING
) {
891 port
->mp_flags
&= ~MSGPORTF_WAITING
;
894 spin_unlock(&port
->mpu_spin
);
901 lwkt_spin_putport(lwkt_port_t port
, lwkt_msg_t msg
)
903 if (lwkt_spin_putport_only(port
, msg
))
910 lwkt_spin_putport_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
912 KASSERT(port
->mp_cpuid
== mycpuid
,
913 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
914 port
->mp_cpuid
, mycpuid
));
915 if (lwkt_spin_putport_only(port
, msg
))
922 lwkt_spin_waitmsg(lwkt_msg_t msg
, int flags
)
928 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
929 ("can't wait dropable message"));
930 port
= msg
->ms_reply_port
;
932 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
934 spin_lock(&port
->mpu_spin
);
935 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
939 * If message was sent synchronously from the beginning
940 * the wakeup will be on the message structure, else it
941 * will be on the port structure.
943 * ms_flags needs atomic op originator vs target MSGF_QUEUED
945 if (msg
->ms_flags
& MSGF_SYNC
) {
947 atomic_set_int(&msg
->ms_flags
, MSGF_WAITING
);
950 port
->mp_flags
|= MSGPORTF_WAITING
;
954 * Only messages which support abort can be interrupted.
955 * We must still wait for message completion regardless.
957 if ((flags
& PCATCH
) && sentabort
== 0) {
958 error
= ssleep(won
, &port
->mpu_spin
, PCATCH
, "waitmsg", 0);
961 spin_unlock(&port
->mpu_spin
);
963 spin_lock(&port
->mpu_spin
);
966 error
= ssleep(won
, &port
->mpu_spin
, 0, "waitmsg", 0);
968 /* see note at the top on the MSGPORTF_WAITING flag */
971 * Turn EINTR into ERESTART if the signal indicates.
973 if (sentabort
&& msg
->ms_error
== EINTR
)
974 msg
->ms_error
= sentabort
;
975 if (msg
->ms_flags
& MSGF_QUEUED
)
976 _lwkt_pullmsg(port
, msg
);
977 spin_unlock(&port
->mpu_spin
);
979 spin_lock(&port
->mpu_spin
);
980 if (msg
->ms_flags
& MSGF_QUEUED
) {
981 _lwkt_pullmsg(port
, msg
);
983 spin_unlock(&port
->mpu_spin
);
985 return(msg
->ms_error
);
990 lwkt_spin_waitport(lwkt_port_t port
, int flags
)
995 spin_lock(&port
->mpu_spin
);
996 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
997 port
->mp_flags
|= MSGPORTF_WAITING
;
998 error
= ssleep(port
, &port
->mpu_spin
, flags
, "waitport", 0);
999 /* see note at the top on the MSGPORTF_WAITING flag */
1001 spin_unlock(&port
->mpu_spin
);
1005 _lwkt_pullmsg(port
, msg
);
1006 spin_unlock(&port
->mpu_spin
);
1012 lwkt_spin_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1016 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
1018 if (msg
->ms_flags
& MSGF_SYNC
) {
1020 * If a synchronous completion has been requested, just wakeup
1021 * the message without bothering to queue it to the target port.
1023 * ms_flags protected by reply port spinlock
1025 spin_lock(&port
->mpu_spin
);
1026 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1028 if (msg
->ms_flags
& MSGF_WAITING
) {
1029 msg
->ms_flags
&= ~MSGF_WAITING
;
1032 spin_unlock(&port
->mpu_spin
);
1037 * If an asynchronous completion has been requested the message
1038 * must be queued to the reply port.
1040 spin_lock(&port
->mpu_spin
);
1041 _lwkt_enqueue_reply(port
, msg
);
1043 if (port
->mp_flags
& MSGPORTF_WAITING
) {
1044 port
->mp_flags
&= ~MSGPORTF_WAITING
;
1047 spin_unlock(&port
->mpu_spin
);
1054 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1056 * This function could _only_ be used when caller is in the same thread
1057 * as the message's target port owner thread.
1060 lwkt_spin_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
1064 KASSERT(port
->mpu_td
== curthread
,
1065 ("message could only be dropped in the same thread "
1066 "as the message target port thread\n"));
1067 spin_lock(&port
->mpu_spin
);
1068 if ((msg
->ms_flags
& (MSGF_REPLY
|MSGF_QUEUED
)) == MSGF_QUEUED
) {
1069 _lwkt_pullmsg(port
, msg
);
1070 msg
->ms_flags
|= MSGF_DONE
;
1075 spin_unlock(&port
->mpu_spin
);
1080 /************************************************************************
1081 * SERIALIZER PORT BACKEND *
1082 ************************************************************************
1084 * This backend uses serializer to protect port accessing. Callers are
1085 * assumed to have serializer held. This kind of port is usually created
1086 * by network device driver along with _one_ lwkt thread to pipeline
1087 * operations which may temporarily release serializer.
1089 * Implementation is based on SPIN PORT BACKEND.
1094 lwkt_serialize_getport(lwkt_port_t port
)
1098 ASSERT_SERIALIZED(port
->mpu_serialize
);
1100 if ((msg
= _lwkt_pollmsg(port
)) != NULL
)
1101 _lwkt_pullmsg(port
, msg
);
1107 lwkt_serialize_putport(lwkt_port_t port
, lwkt_msg_t msg
)
1109 KKASSERT((msg
->ms_flags
& (MSGF_DONE
| MSGF_REPLY
)) == 0);
1110 ASSERT_SERIALIZED(port
->mpu_serialize
);
1112 msg
->ms_target_port
= port
;
1113 _lwkt_pushmsg(port
, msg
);
1114 if (port
->mp_flags
& MSGPORTF_WAITING
) {
1115 port
->mp_flags
&= ~MSGPORTF_WAITING
;
1123 lwkt_serialize_waitmsg(lwkt_msg_t msg
, int flags
)
1129 KASSERT((msg
->ms_flags
& MSGF_DROPABLE
) == 0,
1130 ("can't wait dropable message"));
1132 if ((msg
->ms_flags
& MSGF_DONE
) == 0) {
1133 port
= msg
->ms_reply_port
;
1135 ASSERT_SERIALIZED(port
->mpu_serialize
);
1138 while ((msg
->ms_flags
& MSGF_DONE
) == 0) {
1142 * If message was sent synchronously from the beginning
1143 * the wakeup will be on the message structure, else it
1144 * will be on the port structure.
1146 if (msg
->ms_flags
& MSGF_SYNC
) {
1150 port
->mp_flags
|= MSGPORTF_WAITING
;
1154 * Only messages which support abort can be interrupted.
1155 * We must still wait for message completion regardless.
1157 if ((flags
& PCATCH
) && sentabort
== 0) {
1158 error
= zsleep(won
, port
->mpu_serialize
, PCATCH
, "waitmsg", 0);
1161 lwkt_serialize_exit(port
->mpu_serialize
);
1163 lwkt_serialize_enter(port
->mpu_serialize
);
1166 error
= zsleep(won
, port
->mpu_serialize
, 0, "waitmsg", 0);
1168 /* see note at the top on the MSGPORTF_WAITING flag */
1171 * Turn EINTR into ERESTART if the signal indicates.
1173 if (sentabort
&& msg
->ms_error
== EINTR
)
1174 msg
->ms_error
= sentabort
;
1175 if (msg
->ms_flags
& MSGF_QUEUED
)
1176 _lwkt_pullmsg(port
, msg
);
1178 if (msg
->ms_flags
& MSGF_QUEUED
) {
1179 port
= msg
->ms_reply_port
;
1181 ASSERT_SERIALIZED(port
->mpu_serialize
);
1182 _lwkt_pullmsg(port
, msg
);
1185 return(msg
->ms_error
);
1190 lwkt_serialize_waitport(lwkt_port_t port
, int flags
)
1195 ASSERT_SERIALIZED(port
->mpu_serialize
);
1197 while ((msg
= _lwkt_pollmsg(port
)) == NULL
) {
1198 port
->mp_flags
|= MSGPORTF_WAITING
;
1199 error
= zsleep(port
, port
->mpu_serialize
, flags
, "waitport", 0);
1200 /* see note at the top on the MSGPORTF_WAITING flag */
1204 _lwkt_pullmsg(port
, msg
);
1210 lwkt_serialize_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1212 KKASSERT((msg
->ms_flags
& (MSGF_DONE
|MSGF_QUEUED
)) == 0);
1213 ASSERT_SERIALIZED(port
->mpu_serialize
);
1215 if (msg
->ms_flags
& MSGF_SYNC
) {
1217 * If a synchronous completion has been requested, just wakeup
1218 * the message without bothering to queue it to the target port.
1220 * (both sides synchronized via serialized reply port)
1222 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1226 * If an asynchronous completion has been requested the message
1227 * must be queued to the reply port.
1229 _lwkt_enqueue_reply(port
, msg
);
1230 if (port
->mp_flags
& MSGPORTF_WAITING
) {
1231 port
->mp_flags
&= ~MSGPORTF_WAITING
;
1237 /************************************************************************
1238 * PANIC AND SPECIAL PORT FUNCTIONS *
1239 ************************************************************************/
1242 * You can point a port's reply vector at this function if you just want
1243 * the message marked done, without any queueing or signaling. This is
1244 * often used for structure-embedded messages.
1248 lwkt_null_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1250 msg
->ms_flags
|= MSGF_DONE
| MSGF_REPLY
;
1255 lwkt_panic_getport(lwkt_port_t port
)
1257 panic("lwkt_getport() illegal on port %p", port
);
1262 lwkt_panic_putport(lwkt_port_t port
, lwkt_msg_t msg
)
1264 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port
, msg
);
1269 lwkt_panic_waitmsg(lwkt_msg_t msg
, int flags
)
1271 panic("port %p msg %p cannot be waited on", msg
->ms_reply_port
, msg
);
1276 lwkt_panic_waitport(lwkt_port_t port
, int flags
)
1278 panic("port %p cannot be waited on", port
);
1283 lwkt_panic_replyport(lwkt_port_t port
, lwkt_msg_t msg
)
1285 panic("lwkt_replymsg() is illegal on port %p msg %p", port
, msg
);
1290 lwkt_panic_dropmsg(lwkt_port_t port
, lwkt_msg_t msg
)
1292 panic("lwkt_dropmsg() is illegal on port %p msg %p", port
, msg
);
1299 lwkt_panic_putport_oncpu(lwkt_port_t port
, lwkt_msg_t msg
)
1301 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",