HAMMER 60I/Many: Mirroring
[dragonfly.git] / sys / kern / lwkt_msgport.c
blob224339cd6faa62193e1c1606a3dc140f8fd568eb
1 /*
2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
34 * NOTE! This file may be compiled for userland libraries as well as for
35 * the kernel.
37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.46 2008/05/18 20:57:56 nth Exp $
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77 /************************************************************************
78 * MESSAGE FUNCTIONS *
79 ************************************************************************/
82 * lwkt_sendmsg()
84 * Request asynchronous completion and call lwkt_beginmsg(). The
85 * target port can opt to execute the message synchronously or
86 * asynchronously and this function will automatically queue the
87 * response if the target executes the message synchronously.
89 * NOTE: The message is in an indeterminant state until this call
90 * returns. The caller should not mess with it (e.g. try to abort it)
91 * until then.
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
96 int error;
98 KKASSERT(msg->ms_reply_port != NULL &&
99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 lwkt_replymsg(msg, error);
107 * lwkt_domsg()
109 * Request asynchronous completion and call lwkt_beginmsg(). The
110 * target port can opt to execute the message synchronously or
111 * asynchronously and this function will automatically queue the
112 * response if the target executes the message synchronously.
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
117 int error;
119 KKASSERT(msg->ms_reply_port != NULL &&
120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122 msg->ms_flags |= MSGF_SYNC;
123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 error = lwkt_waitmsg(msg, flags);
125 } else {
126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
128 return(error);
132 * lwkt_forwardmsg()
134 * Forward a message received on one port to another port.
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
139 int error;
141 crit_enter();
142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143 if ((error = port->mp_putport(port, msg)) != EASYNC)
144 lwkt_replymsg(msg, error);
145 crit_exit();
146 return(error);
150 * lwkt_abortmsg()
152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
153 * The caller must ensure that the message will not be both replied AND
154 * destroyed while the abort is in progress.
156 * This function issues a callback which might block!
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
162 * A critical section protects us from reply IPIs on this cpu.
164 crit_enter();
167 * Shortcut the operation if the message has already been returned.
168 * The callback typically constructs a lwkt_msg with the abort request,
169 * issues it synchronously, and waits for completion. The callback
170 * is not required to actually abort the message and the target port,
171 * upon receiving an abort request message generated by the callback
172 * should check whether the original message has already completed or
173 * not.
175 if (msg->ms_flags & MSGF_ABORTABLE) {
176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 msg->ms_abortfn(msg);
179 crit_exit();
182 /************************************************************************
183 * PORT INITIALIZATION API *
184 ************************************************************************/
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
192 static void *lwkt_spin_getport(lwkt_port_t port);
193 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
194 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
195 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
196 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 * Core port initialization (internal)
214 static __inline
215 void
216 _lwkt_initport(lwkt_port_t port,
217 void *(*gportfn)(lwkt_port_t),
218 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
219 int (*wmsgfn)(lwkt_msg_t, int),
220 void *(*wportfn)(lwkt_port_t, int),
221 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
223 bzero(port, sizeof(*port));
224 TAILQ_INIT(&port->mp_msgq);
225 port->mp_getport = gportfn;
226 port->mp_putport = pportfn;
227 port->mp_waitmsg = wmsgfn;
228 port->mp_waitport = wportfn;
229 port->mp_replyport = rportfn;
233 * lwkt_initport_thread()
235 * Initialize a port for use by a particular thread. The port may
236 * only be used by <td>.
238 void
239 lwkt_initport_thread(lwkt_port_t port, thread_t td)
241 _lwkt_initport(port,
242 lwkt_thread_getport,
243 lwkt_thread_putport,
244 lwkt_thread_waitmsg,
245 lwkt_thread_waitport,
246 lwkt_thread_replyport);
247 port->mpu_td = td;
251 * lwkt_initport_spin()
253 * Initialize a port for use with descriptors that might be accessed
254 * via multiple LWPs, processes, or threads. Has somewhat more
255 * overhead then thread ports.
257 void
258 lwkt_initport_spin(lwkt_port_t port)
260 _lwkt_initport(port,
261 lwkt_spin_getport,
262 lwkt_spin_putport,
263 lwkt_spin_waitmsg,
264 lwkt_spin_waitport,
265 lwkt_spin_replyport);
266 spin_init(&port->mpu_spin);
269 void
270 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
272 _lwkt_initport(port,
273 lwkt_serialize_getport,
274 lwkt_serialize_putport,
275 lwkt_serialize_waitmsg,
276 lwkt_serialize_waitport,
277 lwkt_serialize_replyport);
278 port->mpu_serialize = slz;
282 * Similar to the standard initport, this function simply marks the message
283 * as being done and does not attempt to return it to an originating port.
285 void
286 lwkt_initport_replyonly_null(lwkt_port_t port)
288 _lwkt_initport(port,
289 lwkt_panic_getport,
290 lwkt_panic_putport,
291 lwkt_panic_waitmsg,
292 lwkt_panic_waitport,
293 lwkt_null_replyport);
297 * Initialize a reply-only port, typically used as a message sink. Such
298 * ports can only be used as a reply port.
300 void
301 lwkt_initport_replyonly(lwkt_port_t port,
302 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
304 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
305 lwkt_panic_waitmsg, lwkt_panic_waitport,
306 rportfn);
309 void
310 lwkt_initport_putonly(lwkt_port_t port,
311 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
313 _lwkt_initport(port, lwkt_panic_getport, pportfn,
314 lwkt_panic_waitmsg, lwkt_panic_waitport,
315 lwkt_panic_replyport);
318 void
319 lwkt_initport_panic(lwkt_port_t port)
321 _lwkt_initport(port,
322 lwkt_panic_getport, lwkt_panic_putport,
323 lwkt_panic_waitmsg, lwkt_panic_waitport,
324 lwkt_panic_replyport);
328 * lwkt_getport()
330 * Retrieve the next message from the port's message queue, return NULL
331 * if no messages are pending. The retrieved message will either be a
332 * request or a reply based on the MSGF_REPLY bit.
334 * The calling thread MUST own the port.
337 static __inline
338 void
339 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
342 * normal case, remove and return the message.
344 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
345 msg->ms_flags &= ~MSGF_QUEUED;
348 /************************************************************************
349 * THREAD PORT BACKEND *
350 ************************************************************************
352 * This backend is used when the port a message is retrieved from is owned
353 * by a single thread (the calling thread). Messages are IPId to the
354 * correct cpu before being enqueued to a port. Note that this is fairly
355 * optimal since scheduling would have had to do an IPI anyway if the
356 * message were headed to a different cpu.
359 #ifdef SMP
362 * This function completes reply processing for the default case in the
363 * context of the originating cpu.
365 static
366 void
367 lwkt_thread_replyport_remote(lwkt_msg_t msg)
369 lwkt_port_t port = msg->ms_reply_port;
372 * Chase any thread migration that occurs
374 if (port->mpu_td->td_gd != mycpu) {
375 lwkt_send_ipiq(port->mpu_td->td_gd,
376 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
377 return;
381 * Cleanup
383 #ifdef INVARIANTS
384 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
385 msg->ms_flags &= ~MSGF_INTRANSIT;
386 #endif
387 if (msg->ms_flags & MSGF_SYNC) {
388 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
389 } else {
390 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
391 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
393 if (port->mp_flags & MSGPORTF_WAITING)
394 lwkt_schedule(port->mpu_td);
397 #endif
400 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
402 * Called with the reply port as an argument but in the context of the
403 * original target port. Completion must occur on the target port's
404 * cpu.
406 * The critical section protects us from IPIs on the this CPU.
408 void
409 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
411 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
413 if (msg->ms_flags & MSGF_SYNC) {
415 * If a synchronous completion has been requested, just wakeup
416 * the message without bothering to queue it to the target port.
418 * Assume the target thread is non-preemptive, so no critical
419 * section is required.
421 #ifdef SMP
422 if (port->mpu_td->td_gd == mycpu) {
423 #endif
424 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
425 if (port->mp_flags & MSGPORTF_WAITING)
426 lwkt_schedule(port->mpu_td);
427 #ifdef SMP
428 } else {
429 #ifdef INVARIANTS
430 msg->ms_flags |= MSGF_INTRANSIT;
431 #endif
432 msg->ms_flags |= MSGF_REPLY;
433 lwkt_send_ipiq(port->mpu_td->td_gd,
434 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
436 #endif
437 } else {
439 * If an asynchronous completion has been requested the message
440 * must be queued to the reply port.
442 * A critical section is required to interlock the port queue.
444 #ifdef SMP
445 if (port->mpu_td->td_gd == mycpu) {
446 #endif
447 crit_enter();
448 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
449 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
450 if (port->mp_flags & MSGPORTF_WAITING)
451 lwkt_schedule(port->mpu_td);
452 crit_exit();
453 #ifdef SMP
454 } else {
455 #ifdef INVARIANTS
456 msg->ms_flags |= MSGF_INTRANSIT;
457 #endif
458 msg->ms_flags |= MSGF_REPLY;
459 lwkt_send_ipiq(port->mpu_td->td_gd,
460 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
462 #endif
467 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
469 * Called with the target port as an argument but in the context of the
470 * reply port. This function always implements an asynchronous put to
471 * the target message port, and thus returns EASYNC.
473 * The message must already have cleared MSGF_DONE and MSGF_REPLY
476 #ifdef SMP
478 static
479 void
480 lwkt_thread_putport_remote(lwkt_msg_t msg)
482 lwkt_port_t port = msg->ms_target_port;
485 * Chase any thread migration that occurs
487 if (port->mpu_td->td_gd != mycpu) {
488 lwkt_send_ipiq(port->mpu_td->td_gd,
489 (ipifunc1_t)lwkt_thread_putport_remote, msg);
490 return;
494 * Cleanup
496 #ifdef INVARIANTS
497 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
498 msg->ms_flags &= ~MSGF_INTRANSIT;
499 #endif
500 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
501 msg->ms_flags |= MSGF_QUEUED;
502 if (port->mp_flags & MSGPORTF_WAITING)
503 lwkt_schedule(port->mpu_td);
506 #endif
508 static
510 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
512 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
514 msg->ms_target_port = port;
515 #ifdef SMP
516 if (port->mpu_td->td_gd == mycpu) {
517 #endif
518 crit_enter();
519 msg->ms_flags |= MSGF_QUEUED;
520 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
521 if (port->mp_flags & MSGPORTF_WAITING)
522 lwkt_schedule(port->mpu_td);
523 crit_exit();
524 #ifdef SMP
525 } else {
526 #ifdef INVARIANTS
527 msg->ms_flags |= MSGF_INTRANSIT;
528 #endif
529 lwkt_send_ipiq(port->mpu_td->td_gd,
530 (ipifunc1_t)lwkt_thread_putport_remote, msg);
532 #endif
533 return (EASYNC);
537 * lwkt_thread_getport()
539 * Retrieve the next message from the port or NULL if no messages
540 * are ready.
542 void *
543 lwkt_thread_getport(lwkt_port_t port)
545 lwkt_msg_t msg;
547 KKASSERT(port->mpu_td == curthread);
549 crit_enter_quick(port->mpu_td);
550 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
551 _lwkt_pullmsg(port, msg);
552 crit_exit_quick(port->mpu_td);
553 return(msg);
557 * lwkt_thread_waitmsg()
559 * Wait for a particular message to be replied. We must be the only
560 * thread waiting on the message. The port must be owned by the
561 * caller.
564 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
566 if ((msg->ms_flags & MSGF_DONE) == 0) {
568 * If the done bit was not set we have to block until it is.
570 lwkt_port_t port = msg->ms_reply_port;
571 thread_t td = curthread;
572 int sentabort;
574 KKASSERT(port->mpu_td == td);
575 crit_enter_quick(td);
576 sentabort = 0;
578 while ((msg->ms_flags & MSGF_DONE) == 0) {
579 port->mp_flags |= MSGPORTF_WAITING;
580 if (sentabort == 0) {
581 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
582 lwkt_abortmsg(msg);
584 } else {
585 lwkt_sleep("waitabt", 0);
587 port->mp_flags &= ~MSGPORTF_WAITING;
589 if (msg->ms_flags & MSGF_QUEUED)
590 _lwkt_pullmsg(port, msg);
591 crit_exit_quick(td);
592 } else {
594 * If the done bit was set we only have to mess around with the
595 * message if it is queued on the reply port.
597 if (msg->ms_flags & MSGF_QUEUED) {
598 lwkt_port_t port = msg->ms_reply_port;
599 thread_t td = curthread;
601 KKASSERT(port->mpu_td == td);
602 crit_enter_quick(td);
603 _lwkt_pullmsg(port, msg);
604 crit_exit_quick(td);
607 return(msg->ms_error);
610 void *
611 lwkt_thread_waitport(lwkt_port_t port, int flags)
613 thread_t td = curthread;
614 lwkt_msg_t msg;
615 int error;
617 KKASSERT(port->mpu_td == td);
618 crit_enter_quick(td);
619 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
620 port->mp_flags |= MSGPORTF_WAITING;
621 error = lwkt_sleep("waitport", flags);
622 port->mp_flags &= ~MSGPORTF_WAITING;
623 if (error)
624 goto done;
626 _lwkt_pullmsg(port, msg);
627 done:
628 crit_exit_quick(td);
629 return(msg);
632 /************************************************************************
633 * SPIN PORT BACKEND *
634 ************************************************************************
636 * This backend uses spinlocks instead of making assumptions about which
637 * thread is accessing the port. It must be used when a port is not owned
638 * by a particular thread. This is less optimal then thread ports but
639 * you don't have a choice if there are multiple threads accessing the port.
641 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
642 * on the message port, it is the responsibility of the code doing the
643 * wakeup to clear this flag rather then the blocked threads. Some
644 * superfluous wakeups may occur, which is ok.
646 * XXX synchronous message wakeups are not current optimized.
649 static
650 void *
651 lwkt_spin_getport(lwkt_port_t port)
653 lwkt_msg_t msg;
655 spin_lock_wr(&port->mpu_spin);
656 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
657 _lwkt_pullmsg(port, msg);
658 spin_unlock_wr(&port->mpu_spin);
659 return(msg);
662 static
664 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
666 int dowakeup;
668 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
670 msg->ms_target_port = port;
671 spin_lock_wr(&port->mpu_spin);
672 msg->ms_flags |= MSGF_QUEUED;
673 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
674 dowakeup = 0;
675 if (port->mp_flags & MSGPORTF_WAITING) {
676 port->mp_flags &= ~MSGPORTF_WAITING;
677 dowakeup = 1;
679 spin_unlock_wr(&port->mpu_spin);
680 if (dowakeup)
681 wakeup(port);
682 return (EASYNC);
685 static
687 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
689 lwkt_port_t port;
690 int sentabort;
691 int error;
693 if ((msg->ms_flags & MSGF_DONE) == 0) {
694 port = msg->ms_reply_port;
695 sentabort = 0;
696 spin_lock_wr(&port->mpu_spin);
697 while ((msg->ms_flags & MSGF_DONE) == 0) {
698 void *won;
701 * If message was sent synchronously from the beginning
702 * the wakeup will be on the message structure, else it
703 * will be on the port structure.
705 if (msg->ms_flags & MSGF_SYNC) {
706 won = msg;
707 } else {
708 won = port;
709 port->mp_flags |= MSGPORTF_WAITING;
713 * Only messages which support abort can be interrupted.
714 * We must still wait for message completion regardless.
716 if ((flags & PCATCH) && sentabort == 0) {
717 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
718 if (error) {
719 sentabort = error;
720 spin_unlock_wr(&port->mpu_spin);
721 lwkt_abortmsg(msg);
722 spin_lock_wr(&port->mpu_spin);
724 } else {
725 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
727 /* see note at the top on the MSGPORTF_WAITING flag */
730 * Turn EINTR into ERESTART if the signal indicates.
732 if (sentabort && msg->ms_error == EINTR)
733 msg->ms_error = sentabort;
734 if (msg->ms_flags & MSGF_QUEUED)
735 _lwkt_pullmsg(port, msg);
736 spin_unlock_wr(&port->mpu_spin);
737 } else {
738 if (msg->ms_flags & MSGF_QUEUED) {
739 port = msg->ms_reply_port;
740 spin_lock_wr(&port->mpu_spin);
741 _lwkt_pullmsg(port, msg);
742 spin_unlock_wr(&port->mpu_spin);
745 return(msg->ms_error);
748 static
749 void *
750 lwkt_spin_waitport(lwkt_port_t port, int flags)
752 lwkt_msg_t msg;
753 int error;
755 spin_lock_wr(&port->mpu_spin);
756 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
757 port->mp_flags |= MSGPORTF_WAITING;
758 error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
759 /* see note at the top on the MSGPORTF_WAITING flag */
760 if (error) {
761 spin_unlock_wr(&port->mpu_spin);
762 return(NULL);
765 _lwkt_pullmsg(port, msg);
766 spin_unlock_wr(&port->mpu_spin);
767 return(msg);
770 static
771 void
772 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
774 int dowakeup;
776 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
778 if (msg->ms_flags & MSGF_SYNC) {
780 * If a synchronous completion has been requested, just wakeup
781 * the message without bothering to queue it to the target port.
783 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
784 wakeup(msg);
785 } else {
787 * If an asynchronous completion has been requested the message
788 * must be queued to the reply port.
790 spin_lock_wr(&port->mpu_spin);
791 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
792 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
793 dowakeup = 0;
794 if (port->mp_flags & MSGPORTF_WAITING) {
795 port->mp_flags &= ~MSGPORTF_WAITING;
796 dowakeup = 1;
798 spin_unlock_wr(&port->mpu_spin);
799 if (dowakeup)
800 wakeup(port);
804 /************************************************************************
805 * SERIALIZER PORT BACKEND *
806 ************************************************************************
808 * This backend uses serializer to protect port accessing. Callers are
809 * assumed to have serializer held. This kind of port is usually created
810 * by network device driver along with _one_ lwkt thread to pipeline
811 * operations which may temporarily release serializer.
813 * Implementation is based on SPIN PORT BACKEND.
816 static
817 void *
818 lwkt_serialize_getport(lwkt_port_t port)
820 lwkt_msg_t msg;
822 ASSERT_SERIALIZED(port->mpu_serialize);
824 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
825 _lwkt_pullmsg(port, msg);
826 return(msg);
829 static
831 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
833 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
834 ASSERT_SERIALIZED(port->mpu_serialize);
836 msg->ms_target_port = port;
837 msg->ms_flags |= MSGF_QUEUED;
838 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
839 if (port->mp_flags & MSGPORTF_WAITING) {
840 port->mp_flags &= ~MSGPORTF_WAITING;
841 wakeup(port);
843 return (EASYNC);
846 static
848 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
850 lwkt_port_t port;
851 int sentabort;
852 int error;
854 if ((msg->ms_flags & MSGF_DONE) == 0) {
855 port = msg->ms_reply_port;
857 ASSERT_SERIALIZED(port->mpu_serialize);
859 sentabort = 0;
860 while ((msg->ms_flags & MSGF_DONE) == 0) {
861 void *won;
864 * If message was sent synchronously from the beginning
865 * the wakeup will be on the message structure, else it
866 * will be on the port structure.
868 if (msg->ms_flags & MSGF_SYNC) {
869 won = msg;
870 } else {
871 won = port;
872 port->mp_flags |= MSGPORTF_WAITING;
876 * Only messages which support abort can be interrupted.
877 * We must still wait for message completion regardless.
879 if ((flags & PCATCH) && sentabort == 0) {
880 error = serialize_sleep(won, port->mpu_serialize, PCATCH,
881 "waitmsg", 0);
882 if (error) {
883 sentabort = error;
884 lwkt_serialize_exit(port->mpu_serialize);
885 lwkt_abortmsg(msg);
886 lwkt_serialize_enter(port->mpu_serialize);
888 } else {
889 error = serialize_sleep(won, port->mpu_serialize, 0,
890 "waitmsg", 0);
892 /* see note at the top on the MSGPORTF_WAITING flag */
895 * Turn EINTR into ERESTART if the signal indicates.
897 if (sentabort && msg->ms_error == EINTR)
898 msg->ms_error = sentabort;
899 if (msg->ms_flags & MSGF_QUEUED)
900 _lwkt_pullmsg(port, msg);
901 } else {
902 if (msg->ms_flags & MSGF_QUEUED) {
903 port = msg->ms_reply_port;
905 ASSERT_SERIALIZED(port->mpu_serialize);
906 _lwkt_pullmsg(port, msg);
909 return(msg->ms_error);
912 static
913 void *
914 lwkt_serialize_waitport(lwkt_port_t port, int flags)
916 lwkt_msg_t msg;
917 int error;
919 ASSERT_SERIALIZED(port->mpu_serialize);
921 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
922 port->mp_flags |= MSGPORTF_WAITING;
923 error = serialize_sleep(port, port->mpu_serialize, flags,
924 "waitport", 0);
925 /* see note at the top on the MSGPORTF_WAITING flag */
926 if (error)
927 return(NULL);
929 _lwkt_pullmsg(port, msg);
930 return(msg);
933 static
934 void
935 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
937 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
938 ASSERT_SERIALIZED(port->mpu_serialize);
940 if (msg->ms_flags & MSGF_SYNC) {
942 * If a synchronous completion has been requested, just wakeup
943 * the message without bothering to queue it to the target port.
945 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
946 wakeup(msg);
947 } else {
949 * If an asynchronous completion has been requested the message
950 * must be queued to the reply port.
952 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
953 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
954 if (port->mp_flags & MSGPORTF_WAITING) {
955 port->mp_flags &= ~MSGPORTF_WAITING;
956 wakeup(port);
961 /************************************************************************
962 * PANIC AND SPECIAL PORT FUNCTIONS *
963 ************************************************************************/
966 * You can point a port's reply vector at this function if you just want
967 * the message marked done, without any queueing or signaling. This is
968 * often used for structure-embedded messages.
970 static
971 void
972 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
974 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
977 static
978 void *
979 lwkt_panic_getport(lwkt_port_t port)
981 panic("lwkt_getport() illegal on port %p", port);
984 static
986 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
988 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
991 static
993 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
995 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
998 static
999 void *
1000 lwkt_panic_waitport(lwkt_port_t port, int flags)
1002 panic("port %p cannot be waited on", port);
1005 static
1006 void
1007 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1009 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);