- Add comment for lwkt_initport_serialize()
[dragonfly.git] / sys / kern / lwkt_msgport.c
blobdb1e61271c1e3b7c32385e4bb3d16fceee2a1bd7
1 /*
2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
34 * NOTE! This file may be compiled for userland libraries as well as for
35 * the kernel.
37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.49 2008/11/01 11:17:52 sephe Exp $
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77 /************************************************************************
78 * MESSAGE FUNCTIONS *
79 ************************************************************************/
82 * lwkt_sendmsg()
84 * Request asynchronous completion and call lwkt_beginmsg(). The
85 * target port can opt to execute the message synchronously or
86 * asynchronously and this function will automatically queue the
87 * response if the target executes the message synchronously.
89 * NOTE: The message is in an indeterminant state until this call
90 * returns. The caller should not mess with it (e.g. try to abort it)
91 * until then.
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
96 int error;
98 KKASSERT(msg->ms_reply_port != NULL &&
99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 lwkt_replymsg(msg, error);
107 * lwkt_domsg()
109 * Request asynchronous completion and call lwkt_beginmsg(). The
110 * target port can opt to execute the message synchronously or
111 * asynchronously and this function will automatically queue the
112 * response if the target executes the message synchronously.
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
117 int error;
119 KKASSERT(msg->ms_reply_port != NULL &&
120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122 msg->ms_flags |= MSGF_SYNC;
123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 error = lwkt_waitmsg(msg, flags);
125 } else {
126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
128 return(error);
132 * lwkt_forwardmsg()
134 * Forward a message received on one port to another port.
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
139 int error;
141 crit_enter();
142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143 if ((error = port->mp_putport(port, msg)) != EASYNC)
144 lwkt_replymsg(msg, error);
145 crit_exit();
146 return(error);
150 * lwkt_abortmsg()
152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
153 * The caller must ensure that the message will not be both replied AND
154 * destroyed while the abort is in progress.
156 * This function issues a callback which might block!
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
162 * A critical section protects us from reply IPIs on this cpu.
164 crit_enter();
167 * Shortcut the operation if the message has already been returned.
168 * The callback typically constructs a lwkt_msg with the abort request,
169 * issues it synchronously, and waits for completion. The callback
170 * is not required to actually abort the message and the target port,
171 * upon receiving an abort request message generated by the callback
172 * should check whether the original message has already completed or
173 * not.
175 if (msg->ms_flags & MSGF_ABORTABLE) {
176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 msg->ms_abortfn(msg);
179 crit_exit();
182 /************************************************************************
183 * PORT INITIALIZATION API *
184 ************************************************************************/
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
192 static void *lwkt_spin_getport(lwkt_port_t port);
193 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
194 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
195 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
196 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 * Core port initialization (internal)
214 static __inline
215 void
216 _lwkt_initport(lwkt_port_t port,
217 void *(*gportfn)(lwkt_port_t),
218 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
219 int (*wmsgfn)(lwkt_msg_t, int),
220 void *(*wportfn)(lwkt_port_t, int),
221 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
223 bzero(port, sizeof(*port));
224 TAILQ_INIT(&port->mp_msgq);
225 port->mp_getport = gportfn;
226 port->mp_putport = pportfn;
227 port->mp_waitmsg = wmsgfn;
228 port->mp_waitport = wportfn;
229 port->mp_replyport = rportfn;
233 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
234 * we tell the scheduler not to reschedule if td is at a higher priority.
236 * This routine is called even if the thread is already scheduled so messages
237 * without NORESCHED will cause the target thread to be rescheduled even if
238 * prior messages did not.
240 static __inline
241 void
242 _lwkt_schedule_msg(thread_t td, int flags)
244 if (flags & MSGF_NORESCHED)
245 lwkt_schedule_noresched(td);
246 else
247 lwkt_schedule(td);
251 * lwkt_initport_thread()
253 * Initialize a port for use by a particular thread. The port may
254 * only be used by <td>.
256 void
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
259 _lwkt_initport(port,
260 lwkt_thread_getport,
261 lwkt_thread_putport,
262 lwkt_thread_waitmsg,
263 lwkt_thread_waitport,
264 lwkt_thread_replyport);
265 port->mpu_td = td;
269 * lwkt_initport_spin()
271 * Initialize a port for use with descriptors that might be accessed
272 * via multiple LWPs, processes, or threads. Has somewhat more
273 * overhead then thread ports.
275 void
276 lwkt_initport_spin(lwkt_port_t port)
278 _lwkt_initport(port,
279 lwkt_spin_getport,
280 lwkt_spin_putport,
281 lwkt_spin_waitmsg,
282 lwkt_spin_waitport,
283 lwkt_spin_replyport);
284 spin_init(&port->mpu_spin);
288 * lwkt_initport_serialize()
290 * Initialize a port for use with descriptors that might be accessed
291 * via multiple LWPs, processes, or threads. Callers are assumed to
292 * have held the serializer (slz).
294 void
295 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
297 _lwkt_initport(port,
298 lwkt_serialize_getport,
299 lwkt_serialize_putport,
300 lwkt_serialize_waitmsg,
301 lwkt_serialize_waitport,
302 lwkt_serialize_replyport);
303 port->mpu_serialize = slz;
307 * Similar to the standard initport, this function simply marks the message
308 * as being done and does not attempt to return it to an originating port.
310 void
311 lwkt_initport_replyonly_null(lwkt_port_t port)
313 _lwkt_initport(port,
314 lwkt_panic_getport,
315 lwkt_panic_putport,
316 lwkt_panic_waitmsg,
317 lwkt_panic_waitport,
318 lwkt_null_replyport);
322 * Initialize a reply-only port, typically used as a message sink. Such
323 * ports can only be used as a reply port.
325 void
326 lwkt_initport_replyonly(lwkt_port_t port,
327 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
329 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
330 lwkt_panic_waitmsg, lwkt_panic_waitport,
331 rportfn);
334 void
335 lwkt_initport_putonly(lwkt_port_t port,
336 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
338 _lwkt_initport(port, lwkt_panic_getport, pportfn,
339 lwkt_panic_waitmsg, lwkt_panic_waitport,
340 lwkt_panic_replyport);
343 void
344 lwkt_initport_panic(lwkt_port_t port)
346 _lwkt_initport(port,
347 lwkt_panic_getport, lwkt_panic_putport,
348 lwkt_panic_waitmsg, lwkt_panic_waitport,
349 lwkt_panic_replyport);
352 static __inline
353 void
354 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
357 * normal case, remove and return the message.
359 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
360 msg->ms_flags &= ~MSGF_QUEUED;
363 /************************************************************************
364 * THREAD PORT BACKEND *
365 ************************************************************************
367 * This backend is used when the port a message is retrieved from is owned
368 * by a single thread (the calling thread). Messages are IPId to the
369 * correct cpu before being enqueued to a port. Note that this is fairly
370 * optimal since scheduling would have had to do an IPI anyway if the
371 * message were headed to a different cpu.
374 #ifdef SMP
377 * This function completes reply processing for the default case in the
378 * context of the originating cpu.
380 static
381 void
382 lwkt_thread_replyport_remote(lwkt_msg_t msg)
384 lwkt_port_t port = msg->ms_reply_port;
385 int flags;
388 * Chase any thread migration that occurs
390 if (port->mpu_td->td_gd != mycpu) {
391 lwkt_send_ipiq(port->mpu_td->td_gd,
392 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
393 return;
397 * Cleanup
399 #ifdef INVARIANTS
400 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
401 msg->ms_flags &= ~MSGF_INTRANSIT;
402 #endif
403 flags = msg->ms_flags;
404 if (msg->ms_flags & MSGF_SYNC) {
405 cpu_sfence();
406 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
407 } else {
408 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
409 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
411 if (port->mp_flags & MSGPORTF_WAITING)
412 _lwkt_schedule_msg(port->mpu_td, flags);
415 #endif
418 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
420 * Called with the reply port as an argument but in the context of the
421 * original target port. Completion must occur on the target port's
422 * cpu.
424 * The critical section protects us from IPIs on the this CPU.
426 void
427 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
429 int flags;
431 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
433 if (msg->ms_flags & MSGF_SYNC) {
435 * If a synchronous completion has been requested, just wakeup
436 * the message without bothering to queue it to the target port.
438 * Assume the target thread is non-preemptive, so no critical
439 * section is required.
441 #ifdef SMP
442 if (port->mpu_td->td_gd == mycpu) {
443 #endif
444 flags = msg->ms_flags;
445 cpu_sfence();
446 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
447 if (port->mp_flags & MSGPORTF_WAITING)
448 _lwkt_schedule_msg(port->mpu_td, flags);
449 #ifdef SMP
450 } else {
451 #ifdef INVARIANTS
452 msg->ms_flags |= MSGF_INTRANSIT;
453 #endif
454 msg->ms_flags |= MSGF_REPLY;
455 lwkt_send_ipiq(port->mpu_td->td_gd,
456 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
458 #endif
459 } else {
461 * If an asynchronous completion has been requested the message
462 * must be queued to the reply port.
464 * A critical section is required to interlock the port queue.
466 #ifdef SMP
467 if (port->mpu_td->td_gd == mycpu) {
468 #endif
469 crit_enter();
470 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
471 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
472 if (port->mp_flags & MSGPORTF_WAITING)
473 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
474 crit_exit();
475 #ifdef SMP
476 } else {
477 #ifdef INVARIANTS
478 msg->ms_flags |= MSGF_INTRANSIT;
479 #endif
480 msg->ms_flags |= MSGF_REPLY;
481 lwkt_send_ipiq(port->mpu_td->td_gd,
482 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
484 #endif
489 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
491 * Called with the target port as an argument but in the context of the
492 * reply port. This function always implements an asynchronous put to
493 * the target message port, and thus returns EASYNC.
495 * The message must already have cleared MSGF_DONE and MSGF_REPLY
498 #ifdef SMP
500 static
501 void
502 lwkt_thread_putport_remote(lwkt_msg_t msg)
504 lwkt_port_t port = msg->ms_target_port;
507 * Chase any thread migration that occurs
509 if (port->mpu_td->td_gd != mycpu) {
510 lwkt_send_ipiq(port->mpu_td->td_gd,
511 (ipifunc1_t)lwkt_thread_putport_remote, msg);
512 return;
516 * Cleanup
518 #ifdef INVARIANTS
519 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
520 msg->ms_flags &= ~MSGF_INTRANSIT;
521 #endif
522 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
523 msg->ms_flags |= MSGF_QUEUED;
524 if (port->mp_flags & MSGPORTF_WAITING)
525 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
528 #endif
530 static
532 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
534 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
536 msg->ms_target_port = port;
537 #ifdef SMP
538 if (port->mpu_td->td_gd == mycpu) {
539 #endif
540 crit_enter();
541 msg->ms_flags |= MSGF_QUEUED;
542 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
543 if (port->mp_flags & MSGPORTF_WAITING)
544 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
545 crit_exit();
546 #ifdef SMP
547 } else {
548 #ifdef INVARIANTS
549 msg->ms_flags |= MSGF_INTRANSIT;
550 #endif
551 lwkt_send_ipiq(port->mpu_td->td_gd,
552 (ipifunc1_t)lwkt_thread_putport_remote, msg);
554 #endif
555 return (EASYNC);
559 * lwkt_thread_getport()
561 * Retrieve the next message from the port or NULL if no messages
562 * are ready.
564 void *
565 lwkt_thread_getport(lwkt_port_t port)
567 lwkt_msg_t msg;
569 KKASSERT(port->mpu_td == curthread);
571 crit_enter_quick(port->mpu_td);
572 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
573 _lwkt_pullmsg(port, msg);
574 crit_exit_quick(port->mpu_td);
575 return(msg);
579 * lwkt_thread_waitmsg()
581 * Wait for a particular message to be replied. We must be the only
582 * thread waiting on the message. The port must be owned by the
583 * caller.
586 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
588 if ((msg->ms_flags & MSGF_DONE) == 0) {
590 * If the done bit was not set we have to block until it is.
592 lwkt_port_t port = msg->ms_reply_port;
593 thread_t td = curthread;
594 int sentabort;
596 KKASSERT(port->mpu_td == td);
597 crit_enter_quick(td);
598 sentabort = 0;
600 while ((msg->ms_flags & MSGF_DONE) == 0) {
601 port->mp_flags |= MSGPORTF_WAITING;
602 if (sentabort == 0) {
603 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
604 lwkt_abortmsg(msg);
606 } else {
607 lwkt_sleep("waitabt", 0);
609 port->mp_flags &= ~MSGPORTF_WAITING;
611 if (msg->ms_flags & MSGF_QUEUED)
612 _lwkt_pullmsg(port, msg);
613 crit_exit_quick(td);
614 } else {
616 * If the done bit was set we only have to mess around with the
617 * message if it is queued on the reply port.
619 if (msg->ms_flags & MSGF_QUEUED) {
620 lwkt_port_t port = msg->ms_reply_port;
621 thread_t td = curthread;
623 KKASSERT(port->mpu_td == td);
624 crit_enter_quick(td);
625 _lwkt_pullmsg(port, msg);
626 crit_exit_quick(td);
629 return(msg->ms_error);
632 void *
633 lwkt_thread_waitport(lwkt_port_t port, int flags)
635 thread_t td = curthread;
636 lwkt_msg_t msg;
637 int error;
639 KKASSERT(port->mpu_td == td);
640 crit_enter_quick(td);
641 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
642 port->mp_flags |= MSGPORTF_WAITING;
643 error = lwkt_sleep("waitport", flags);
644 port->mp_flags &= ~MSGPORTF_WAITING;
645 if (error)
646 goto done;
648 _lwkt_pullmsg(port, msg);
649 done:
650 crit_exit_quick(td);
651 return(msg);
654 /************************************************************************
655 * SPIN PORT BACKEND *
656 ************************************************************************
658 * This backend uses spinlocks instead of making assumptions about which
659 * thread is accessing the port. It must be used when a port is not owned
660 * by a particular thread. This is less optimal then thread ports but
661 * you don't have a choice if there are multiple threads accessing the port.
663 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
664 * on the message port, it is the responsibility of the code doing the
665 * wakeup to clear this flag rather then the blocked threads. Some
666 * superfluous wakeups may occur, which is ok.
668 * XXX synchronous message wakeups are not current optimized.
671 static
672 void *
673 lwkt_spin_getport(lwkt_port_t port)
675 lwkt_msg_t msg;
677 spin_lock_wr(&port->mpu_spin);
678 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
679 _lwkt_pullmsg(port, msg);
680 spin_unlock_wr(&port->mpu_spin);
681 return(msg);
684 static
686 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
688 int dowakeup;
690 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
692 msg->ms_target_port = port;
693 spin_lock_wr(&port->mpu_spin);
694 msg->ms_flags |= MSGF_QUEUED;
695 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
696 dowakeup = 0;
697 if (port->mp_flags & MSGPORTF_WAITING) {
698 port->mp_flags &= ~MSGPORTF_WAITING;
699 dowakeup = 1;
701 spin_unlock_wr(&port->mpu_spin);
702 if (dowakeup)
703 wakeup(port);
704 return (EASYNC);
707 static
709 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
711 lwkt_port_t port;
712 int sentabort;
713 int error;
715 if ((msg->ms_flags & MSGF_DONE) == 0) {
716 port = msg->ms_reply_port;
717 sentabort = 0;
718 spin_lock_wr(&port->mpu_spin);
719 while ((msg->ms_flags & MSGF_DONE) == 0) {
720 void *won;
723 * If message was sent synchronously from the beginning
724 * the wakeup will be on the message structure, else it
725 * will be on the port structure.
727 if (msg->ms_flags & MSGF_SYNC) {
728 won = msg;
729 } else {
730 won = port;
731 port->mp_flags |= MSGPORTF_WAITING;
735 * Only messages which support abort can be interrupted.
736 * We must still wait for message completion regardless.
738 if ((flags & PCATCH) && sentabort == 0) {
739 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
740 if (error) {
741 sentabort = error;
742 spin_unlock_wr(&port->mpu_spin);
743 lwkt_abortmsg(msg);
744 spin_lock_wr(&port->mpu_spin);
746 } else {
747 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
749 /* see note at the top on the MSGPORTF_WAITING flag */
752 * Turn EINTR into ERESTART if the signal indicates.
754 if (sentabort && msg->ms_error == EINTR)
755 msg->ms_error = sentabort;
756 if (msg->ms_flags & MSGF_QUEUED)
757 _lwkt_pullmsg(port, msg);
758 spin_unlock_wr(&port->mpu_spin);
759 } else {
760 if (msg->ms_flags & MSGF_QUEUED) {
761 port = msg->ms_reply_port;
762 spin_lock_wr(&port->mpu_spin);
763 _lwkt_pullmsg(port, msg);
764 spin_unlock_wr(&port->mpu_spin);
767 return(msg->ms_error);
770 static
771 void *
772 lwkt_spin_waitport(lwkt_port_t port, int flags)
774 lwkt_msg_t msg;
775 int error;
777 spin_lock_wr(&port->mpu_spin);
778 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
779 port->mp_flags |= MSGPORTF_WAITING;
780 error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
781 /* see note at the top on the MSGPORTF_WAITING flag */
782 if (error) {
783 spin_unlock_wr(&port->mpu_spin);
784 return(NULL);
787 _lwkt_pullmsg(port, msg);
788 spin_unlock_wr(&port->mpu_spin);
789 return(msg);
792 static
793 void
794 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
796 int dowakeup;
798 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
800 if (msg->ms_flags & MSGF_SYNC) {
802 * If a synchronous completion has been requested, just wakeup
803 * the message without bothering to queue it to the target port.
805 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
806 wakeup(msg);
807 } else {
809 * If an asynchronous completion has been requested the message
810 * must be queued to the reply port.
812 spin_lock_wr(&port->mpu_spin);
813 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
814 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
815 dowakeup = 0;
816 if (port->mp_flags & MSGPORTF_WAITING) {
817 port->mp_flags &= ~MSGPORTF_WAITING;
818 dowakeup = 1;
820 spin_unlock_wr(&port->mpu_spin);
821 if (dowakeup)
822 wakeup(port);
826 /************************************************************************
827 * SERIALIZER PORT BACKEND *
828 ************************************************************************
830 * This backend uses serializer to protect port accessing. Callers are
831 * assumed to have serializer held. This kind of port is usually created
832 * by network device driver along with _one_ lwkt thread to pipeline
833 * operations which may temporarily release serializer.
835 * Implementation is based on SPIN PORT BACKEND.
838 static
839 void *
840 lwkt_serialize_getport(lwkt_port_t port)
842 lwkt_msg_t msg;
844 ASSERT_SERIALIZED(port->mpu_serialize);
846 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
847 _lwkt_pullmsg(port, msg);
848 return(msg);
851 static
853 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
855 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
856 ASSERT_SERIALIZED(port->mpu_serialize);
858 msg->ms_target_port = port;
859 msg->ms_flags |= MSGF_QUEUED;
860 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
861 if (port->mp_flags & MSGPORTF_WAITING) {
862 port->mp_flags &= ~MSGPORTF_WAITING;
863 wakeup(port);
865 return (EASYNC);
868 static
870 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
872 lwkt_port_t port;
873 int sentabort;
874 int error;
876 if ((msg->ms_flags & MSGF_DONE) == 0) {
877 port = msg->ms_reply_port;
879 ASSERT_SERIALIZED(port->mpu_serialize);
881 sentabort = 0;
882 while ((msg->ms_flags & MSGF_DONE) == 0) {
883 void *won;
886 * If message was sent synchronously from the beginning
887 * the wakeup will be on the message structure, else it
888 * will be on the port structure.
890 if (msg->ms_flags & MSGF_SYNC) {
891 won = msg;
892 } else {
893 won = port;
894 port->mp_flags |= MSGPORTF_WAITING;
898 * Only messages which support abort can be interrupted.
899 * We must still wait for message completion regardless.
901 if ((flags & PCATCH) && sentabort == 0) {
902 error = serialize_sleep(won, port->mpu_serialize, PCATCH,
903 "waitmsg", 0);
904 if (error) {
905 sentabort = error;
906 lwkt_serialize_exit(port->mpu_serialize);
907 lwkt_abortmsg(msg);
908 lwkt_serialize_enter(port->mpu_serialize);
910 } else {
911 error = serialize_sleep(won, port->mpu_serialize, 0,
912 "waitmsg", 0);
914 /* see note at the top on the MSGPORTF_WAITING flag */
917 * Turn EINTR into ERESTART if the signal indicates.
919 if (sentabort && msg->ms_error == EINTR)
920 msg->ms_error = sentabort;
921 if (msg->ms_flags & MSGF_QUEUED)
922 _lwkt_pullmsg(port, msg);
923 } else {
924 if (msg->ms_flags & MSGF_QUEUED) {
925 port = msg->ms_reply_port;
927 ASSERT_SERIALIZED(port->mpu_serialize);
928 _lwkt_pullmsg(port, msg);
931 return(msg->ms_error);
934 static
935 void *
936 lwkt_serialize_waitport(lwkt_port_t port, int flags)
938 lwkt_msg_t msg;
939 int error;
941 ASSERT_SERIALIZED(port->mpu_serialize);
943 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
944 port->mp_flags |= MSGPORTF_WAITING;
945 error = serialize_sleep(port, port->mpu_serialize, flags,
946 "waitport", 0);
947 /* see note at the top on the MSGPORTF_WAITING flag */
948 if (error)
949 return(NULL);
951 _lwkt_pullmsg(port, msg);
952 return(msg);
955 static
956 void
957 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
959 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
960 ASSERT_SERIALIZED(port->mpu_serialize);
962 if (msg->ms_flags & MSGF_SYNC) {
964 * If a synchronous completion has been requested, just wakeup
965 * the message without bothering to queue it to the target port.
967 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
968 wakeup(msg);
969 } else {
971 * If an asynchronous completion has been requested the message
972 * must be queued to the reply port.
974 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
975 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
976 if (port->mp_flags & MSGPORTF_WAITING) {
977 port->mp_flags &= ~MSGPORTF_WAITING;
978 wakeup(port);
983 /************************************************************************
984 * PANIC AND SPECIAL PORT FUNCTIONS *
985 ************************************************************************/
988 * You can point a port's reply vector at this function if you just want
989 * the message marked done, without any queueing or signaling. This is
990 * often used for structure-embedded messages.
992 static
993 void
994 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
996 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
999 static
1000 void *
1001 lwkt_panic_getport(lwkt_port_t port)
1003 panic("lwkt_getport() illegal on port %p", port);
1006 static
1008 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1010 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1013 static
1015 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1017 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1020 static
1021 void *
1022 lwkt_panic_waitport(lwkt_port_t port, int flags)
1024 panic("port %p cannot be waited on", port);
1027 static
1028 void
1029 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1031 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);