kernel - Fix excessive call stack depth on stuck interrupt
[dragonfly.git] / sys / kern / lwkt_msgport.c
blob48943f3309b344b152a84e0d98a0989ff18e27eb
1 /*
2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
34 * NOTE! This file may be compiled for userland libraries as well as for
35 * the kernel.
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
73 /************************************************************************
74 * MESSAGE FUNCTIONS *
75 ************************************************************************/
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
80 return port->mp_putport(port, msg);
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
86 return port->mp_putport_oncpu(port, msg);
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
92 KKASSERT(msg->ms_reply_port != NULL &&
93 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
100 int error;
102 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
104 * Target port opted to execute the message synchronously so
105 * queue the response.
107 lwkt_replymsg(msg, error);
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
114 int error;
116 if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
118 * Target port opted to execute the message synchronously so
119 * queue the response.
121 lwkt_replymsg(msg, error);
126 * lwkt_sendmsg()
128 * Request asynchronous completion and call lwkt_beginmsg(). The
129 * target port can opt to execute the message synchronously or
130 * asynchronously and this function will automatically queue the
131 * response if the target executes the message synchronously.
133 * NOTE: The message is in an indeterminant state until this call
134 * returns. The caller should not mess with it (e.g. try to abort it)
135 * until then.
137 * NOTE: Do not use this function to forward a message as we might
138 * clobber ms_flags in a SMP race.
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
143 _lwkt_sendmsg_prepare(port, msg);
144 _lwkt_sendmsg_start(port, msg);
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
150 _lwkt_sendmsg_prepare(port, msg);
151 _lwkt_sendmsg_start_oncpu(port, msg);
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
157 _lwkt_sendmsg_prepare(port, msg);
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
163 _lwkt_sendmsg_start(port, msg);
167 * lwkt_domsg()
169 * Request synchronous completion and call lwkt_beginmsg(). The
170 * target port can opt to execute the message synchronously or
171 * asynchronously and this function will automatically block and
172 * wait for a response if the target executes the message
173 * asynchronously.
175 * NOTE: Do not use this function to forward a message as we might
176 * clobber ms_flags in a SMP race.
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
181 int error;
183 KKASSERT(msg->ms_reply_port != NULL &&
184 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186 msg->ms_flags |= MSGF_SYNC;
187 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
189 * Target port opted to execute the message asynchronously so
190 * block and wait for a reply.
192 error = lwkt_waitmsg(msg, flags);
193 } else {
194 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
196 return(error);
200 * lwkt_forwardmsg()
202 * Forward a message received on one port to another port.
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
207 int error;
209 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210 if ((error = port->mp_putport(port, msg)) != EASYNC)
211 lwkt_replymsg(msg, error);
212 return(error);
216 * lwkt_abortmsg()
218 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
219 * The caller must ensure that the message will not be both replied AND
220 * destroyed while the abort is in progress.
222 * This function issues a callback which might block!
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
228 * A critical section protects us from reply IPIs on this cpu.
230 crit_enter();
233 * Shortcut the operation if the message has already been returned.
234 * The callback typically constructs a lwkt_msg with the abort request,
235 * issues it synchronously, and waits for completion. The callback
236 * is not required to actually abort the message and the target port,
237 * upon receiving an abort request message generated by the callback
238 * should check whether the original message has already completed or
239 * not.
241 if (msg->ms_flags & MSGF_ABORTABLE) {
242 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243 msg->ms_abortfn(msg);
245 crit_exit();
248 /************************************************************************
249 * PORT INITIALIZATION API *
250 ************************************************************************/
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
283 * Core port initialization (internal)
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288 void *(*gportfn)(lwkt_port_t),
289 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290 int (*wmsgfn)(lwkt_msg_t, int),
291 void *(*wportfn)(lwkt_port_t, int),
292 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
296 bzero(port, sizeof(*port));
297 port->mp_cpuid = -1;
298 TAILQ_INIT(&port->mp_msgq);
299 TAILQ_INIT(&port->mp_msgq_prio);
300 port->mp_getport = gportfn;
301 port->mp_putport = pportfn;
302 port->mp_waitmsg = wmsgfn;
303 port->mp_waitport = wportfn;
304 port->mp_replyport = rportfn;
305 port->mp_dropmsg = dmsgfn;
306 port->mp_putport_oncpu = pportfn_oncpu;
310 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
311 * we tell the scheduler not to reschedule if td is at a higher priority.
313 * This routine is called even if the thread is already scheduled.
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
319 lwkt_schedule(td);
323 * lwkt_initport_thread()
325 * Initialize a port for use by a particular thread. The port may
326 * only be used by <td>.
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
331 _lwkt_initport(port,
332 lwkt_thread_getport,
333 lwkt_thread_putport,
334 lwkt_thread_waitmsg,
335 lwkt_thread_waitport,
336 lwkt_thread_replyport,
337 lwkt_thread_dropmsg,
338 lwkt_thread_putport);
339 port->mpu_td = td;
343 * lwkt_initport_spin()
345 * Initialize a port for use with descriptors that might be accessed
346 * via multiple LWPs, processes, or threads. Has somewhat more
347 * overhead then thread ports.
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
352 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
355 if (td == NULL)
356 dmsgfn = lwkt_panic_dropmsg;
357 else
358 dmsgfn = lwkt_spin_dropmsg;
360 if (fixed_cpuid)
361 pportfn_oncpu = lwkt_spin_putport_oncpu;
362 else
363 pportfn_oncpu = lwkt_panic_putport_oncpu;
365 _lwkt_initport(port,
366 lwkt_spin_getport,
367 lwkt_spin_putport,
368 lwkt_spin_waitmsg,
369 lwkt_spin_waitport,
370 lwkt_spin_replyport,
371 dmsgfn,
372 pportfn_oncpu);
373 spin_init(&port->mpu_spin, "lwktinitport");
374 port->mpu_td = td;
375 if (fixed_cpuid)
376 port->mp_cpuid = td->td_gd->gd_cpuid;
380 * lwkt_initport_serialize()
382 * Initialize a port for use with descriptors that might be accessed
383 * via multiple LWPs, processes, or threads. Callers are assumed to
384 * have held the serializer (slz).
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
389 _lwkt_initport(port,
390 lwkt_serialize_getport,
391 lwkt_serialize_putport,
392 lwkt_serialize_waitmsg,
393 lwkt_serialize_waitport,
394 lwkt_serialize_replyport,
395 lwkt_panic_dropmsg,
396 lwkt_panic_putport_oncpu);
397 port->mpu_serialize = slz;
401 * Similar to the standard initport, this function simply marks the message
402 * as being done and does not attempt to return it to an originating port.
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
407 _lwkt_initport(port,
408 lwkt_panic_getport,
409 lwkt_panic_putport,
410 lwkt_panic_waitmsg,
411 lwkt_panic_waitport,
412 lwkt_null_replyport,
413 lwkt_panic_dropmsg,
414 lwkt_panic_putport_oncpu);
418 * Initialize a reply-only port, typically used as a message sink. Such
419 * ports can only be used as a reply port.
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
425 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426 lwkt_panic_waitmsg, lwkt_panic_waitport,
427 rportfn, lwkt_panic_dropmsg,
428 lwkt_panic_putport_oncpu);
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
435 _lwkt_initport(port, lwkt_panic_getport, pportfn,
436 lwkt_panic_waitmsg, lwkt_panic_waitport,
437 lwkt_panic_replyport, lwkt_panic_dropmsg,
438 lwkt_panic_putport_oncpu);
441 void
442 lwkt_initport_panic(lwkt_port_t port)
444 _lwkt_initport(port,
445 lwkt_panic_getport, lwkt_panic_putport,
446 lwkt_panic_waitmsg, lwkt_panic_waitport,
447 lwkt_panic_replyport, lwkt_panic_dropmsg,
448 lwkt_panic_putport_oncpu);
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
455 lwkt_msg_queue *queue;
458 * normal case, remove and return the message.
460 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461 queue = &port->mp_msgq_prio;
462 else
463 queue = &port->mp_msgq;
464 TAILQ_REMOVE(queue, msg, ms_node);
467 * atomic op needed for spin ports
469 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
476 lwkt_msg_queue *queue;
479 * atomic op needed for spin ports
481 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483 queue = &port->mp_msgq_prio;
484 else
485 queue = &port->mp_msgq;
486 TAILQ_INSERT_TAIL(queue, msg, ms_node);
488 if (msg->ms_flags & MSGF_RECEIPT) {
490 * In case this message is forwarded later, the receipt
491 * flag was cleared once the receipt function is called.
493 atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT);
494 msg->ms_receiptfn(msg, port);
498 static __inline
499 lwkt_msg_t
500 _lwkt_pollmsg(lwkt_port_t port)
502 lwkt_msg_t msg;
504 msg = TAILQ_FIRST(&port->mp_msgq_prio);
505 if (__predict_false(msg != NULL))
506 return msg;
509 * Priority queue has no message, fallback to non-priority queue.
511 return TAILQ_FIRST(&port->mp_msgq);
514 static __inline
515 void
516 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
519 * atomic op needed for spin ports
521 _lwkt_pushmsg(port, msg);
522 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
525 /************************************************************************
526 * THREAD PORT BACKEND *
527 ************************************************************************
529 * This backend is used when the port a message is retrieved from is owned
530 * by a single thread (the calling thread). Messages are IPId to the
531 * correct cpu before being enqueued to a port. Note that this is fairly
532 * optimal since scheduling would have had to do an IPI anyway if the
533 * message were headed to a different cpu.
537 * This function completes reply processing for the default case in the
538 * context of the originating cpu.
540 static
541 void
542 lwkt_thread_replyport_remote(lwkt_msg_t msg)
544 lwkt_port_t port = msg->ms_reply_port;
545 int flags;
548 * Chase any thread migration that occurs
550 if (port->mpu_td->td_gd != mycpu) {
551 lwkt_send_ipiq(port->mpu_td->td_gd,
552 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
553 return;
557 * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
559 #ifdef INVARIANTS
560 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
561 msg->ms_flags &= ~MSGF_INTRANSIT;
562 #endif
563 flags = msg->ms_flags;
564 if (msg->ms_flags & MSGF_SYNC) {
565 cpu_sfence();
566 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
567 } else {
568 _lwkt_enqueue_reply(port, msg);
570 if (port->mp_flags & MSGPORTF_WAITING)
571 _lwkt_schedule_msg(port->mpu_td, flags);
575 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
577 * Called with the reply port as an argument but in the context of the
578 * original target port. Completion must occur on the target port's
579 * cpu.
581 * The critical section protects us from IPIs on the this CPU.
583 static
584 void
585 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
587 int flags;
589 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
591 if (msg->ms_flags & MSGF_SYNC) {
593 * If a synchronous completion has been requested, just wakeup
594 * the message without bothering to queue it to the target port.
596 * Assume the target thread is non-preemptive, so no critical
597 * section is required.
599 if (port->mpu_td->td_gd == mycpu) {
600 crit_enter();
601 flags = msg->ms_flags;
602 cpu_sfence();
603 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
604 if (port->mp_flags & MSGPORTF_WAITING)
605 _lwkt_schedule_msg(port->mpu_td, flags);
606 crit_exit();
607 } else {
608 #ifdef INVARIANTS
609 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
610 #endif
611 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
612 lwkt_send_ipiq(port->mpu_td->td_gd,
613 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
615 } else {
617 * If an asynchronous completion has been requested the message
618 * must be queued to the reply port.
620 * A critical section is required to interlock the port queue.
622 if (port->mpu_td->td_gd == mycpu) {
623 crit_enter();
624 _lwkt_enqueue_reply(port, msg);
625 if (port->mp_flags & MSGPORTF_WAITING)
626 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
627 crit_exit();
628 } else {
629 #ifdef INVARIANTS
630 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
631 #endif
632 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
633 lwkt_send_ipiq(port->mpu_td->td_gd,
634 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
640 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
642 * This function could _only_ be used when caller is in the same thread
643 * as the message's target port owner thread.
645 static int
646 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
648 int error;
650 KASSERT(port->mpu_td == curthread,
651 ("message could only be dropped in the same thread "
652 "as the message target port thread"));
653 crit_enter_quick(port->mpu_td);
654 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
655 _lwkt_pullmsg(port, msg);
656 atomic_set_int(&msg->ms_flags, MSGF_DONE);
657 error = 0;
658 } else {
659 error = ENOENT;
661 crit_exit_quick(port->mpu_td);
663 return (error);
667 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
669 * Called with the target port as an argument but in the context of the
670 * reply port. This function always implements an asynchronous put to
671 * the target message port, and thus returns EASYNC.
673 * The message must already have cleared MSGF_DONE and MSGF_REPLY
675 static
676 void
677 lwkt_thread_putport_remote(lwkt_msg_t msg)
679 lwkt_port_t port = msg->ms_target_port;
682 * Chase any thread migration that occurs
684 if (port->mpu_td->td_gd != mycpu) {
685 lwkt_send_ipiq(port->mpu_td->td_gd,
686 (ipifunc1_t)lwkt_thread_putport_remote, msg);
687 return;
691 * An atomic op is needed on ms_flags vs originator. Also
692 * note that the originator might be using a different type
693 * of msgport.
695 #ifdef INVARIANTS
696 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
697 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
698 #endif
699 _lwkt_pushmsg(port, msg);
700 if (port->mp_flags & MSGPORTF_WAITING)
701 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
704 static
706 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
708 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
710 msg->ms_target_port = port;
711 if (port->mpu_td->td_gd == mycpu) {
712 crit_enter();
713 _lwkt_pushmsg(port, msg);
714 if (port->mp_flags & MSGPORTF_WAITING)
715 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
716 crit_exit();
717 } else {
718 #ifdef INVARIANTS
720 * Cleanup.
722 * An atomic op is needed on ms_flags vs originator. Also
723 * note that the originator might be using a different type
724 * of msgport.
726 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
727 #endif
728 lwkt_send_ipiq(port->mpu_td->td_gd,
729 (ipifunc1_t)lwkt_thread_putport_remote, msg);
731 return (EASYNC);
735 * lwkt_thread_getport()
737 * Retrieve the next message from the port or NULL if no messages
738 * are ready.
740 static
741 void *
742 lwkt_thread_getport(lwkt_port_t port)
744 lwkt_msg_t msg;
746 KKASSERT(port->mpu_td == curthread);
748 crit_enter_quick(port->mpu_td);
749 if ((msg = _lwkt_pollmsg(port)) != NULL)
750 _lwkt_pullmsg(port, msg);
751 crit_exit_quick(port->mpu_td);
752 return(msg);
756 * lwkt_thread_waitmsg()
758 * Wait for a particular message to be replied. We must be the only
759 * thread waiting on the message. The port must be owned by the
760 * caller.
762 static
764 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
766 thread_t td = curthread;
768 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
769 ("can't wait dropable message"));
771 if ((msg->ms_flags & MSGF_DONE) == 0) {
773 * If the done bit was not set we have to block until it is.
775 lwkt_port_t port = msg->ms_reply_port;
776 int sentabort;
778 KKASSERT(port->mpu_td == td);
779 crit_enter_quick(td);
780 sentabort = 0;
782 while ((msg->ms_flags & MSGF_DONE) == 0) {
783 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
784 if (sentabort == 0) {
785 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
786 lwkt_abortmsg(msg);
788 } else {
789 lwkt_sleep("waitabt", 0);
791 port->mp_flags &= ~MSGPORTF_WAITING;
793 if (msg->ms_flags & MSGF_QUEUED)
794 _lwkt_pullmsg(port, msg);
795 crit_exit_quick(td);
796 } else {
798 * If the done bit was set we only have to mess around with the
799 * message if it is queued on the reply port.
801 crit_enter_quick(td);
802 if (msg->ms_flags & MSGF_QUEUED) {
803 lwkt_port_t port = msg->ms_reply_port;
804 thread_t td __debugvar = curthread;
806 KKASSERT(port->mpu_td == td);
807 _lwkt_pullmsg(port, msg);
809 crit_exit_quick(td);
811 return(msg->ms_error);
815 * lwkt_thread_waitport()
817 * Wait for a new message to be available on the port. We must be the
818 * the only thread waiting on the port. The port must be owned by caller.
820 static
821 void *
822 lwkt_thread_waitport(lwkt_port_t port, int flags)
824 thread_t td = curthread;
825 lwkt_msg_t msg;
826 int error;
828 KKASSERT(port->mpu_td == td);
829 crit_enter_quick(td);
830 while ((msg = _lwkt_pollmsg(port)) == NULL) {
831 port->mp_flags |= MSGPORTF_WAITING;
832 error = lwkt_sleep("waitport", flags);
833 port->mp_flags &= ~MSGPORTF_WAITING;
834 if (error)
835 goto done;
837 _lwkt_pullmsg(port, msg);
838 done:
839 crit_exit_quick(td);
840 return(msg);
843 /************************************************************************
844 * SPIN PORT BACKEND *
845 ************************************************************************
847 * This backend uses spinlocks instead of making assumptions about which
848 * thread is accessing the port. It must be used when a port is not owned
849 * by a particular thread. This is less optimal then thread ports but
850 * you don't have a choice if there are multiple threads accessing the port.
852 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
853 * on the message port, it is the responsibility of the code doing the
854 * wakeup to clear this flag rather then the blocked threads. Some
855 * superfluous wakeups may occur, which is ok.
857 * XXX synchronous message wakeups are not current optimized.
860 static
861 void *
862 lwkt_spin_getport(lwkt_port_t port)
864 lwkt_msg_t msg;
866 spin_lock(&port->mpu_spin);
867 if ((msg = _lwkt_pollmsg(port)) != NULL)
868 _lwkt_pullmsg(port, msg);
869 spin_unlock(&port->mpu_spin);
870 return(msg);
873 static __inline int
874 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
876 int dowakeup;
878 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
880 msg->ms_target_port = port;
881 spin_lock(&port->mpu_spin);
882 _lwkt_pushmsg(port, msg);
883 dowakeup = 0;
884 if (port->mp_flags & MSGPORTF_WAITING) {
885 port->mp_flags &= ~MSGPORTF_WAITING;
886 dowakeup = 1;
888 spin_unlock(&port->mpu_spin);
890 return dowakeup;
893 static
895 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
897 if (lwkt_spin_putport_only(port, msg))
898 wakeup(port);
899 return (EASYNC);
902 static
904 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
906 KASSERT(port->mp_cpuid == mycpuid,
907 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
908 port->mp_cpuid, mycpuid));
909 if (lwkt_spin_putport_only(port, msg))
910 wakeup_mycpu(port);
911 return (EASYNC);
914 static
916 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
918 lwkt_port_t port;
919 int sentabort;
920 int error;
922 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
923 ("can't wait dropable message"));
924 port = msg->ms_reply_port;
926 if ((msg->ms_flags & MSGF_DONE) == 0) {
927 sentabort = 0;
928 spin_lock(&port->mpu_spin);
929 while ((msg->ms_flags & MSGF_DONE) == 0) {
930 void *won;
933 * If message was sent synchronously from the beginning
934 * the wakeup will be on the message structure, else it
935 * will be on the port structure.
937 * ms_flags needs atomic op originator vs target MSGF_QUEUED
939 if (msg->ms_flags & MSGF_SYNC) {
940 won = msg;
941 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
942 } else {
943 won = port;
944 port->mp_flags |= MSGPORTF_WAITING;
948 * Only messages which support abort can be interrupted.
949 * We must still wait for message completion regardless.
951 if ((flags & PCATCH) && sentabort == 0) {
952 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
953 if (error) {
954 sentabort = error;
955 spin_unlock(&port->mpu_spin);
956 lwkt_abortmsg(msg);
957 spin_lock(&port->mpu_spin);
959 } else {
960 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
962 /* see note at the top on the MSGPORTF_WAITING flag */
965 * Turn EINTR into ERESTART if the signal indicates.
967 if (sentabort && msg->ms_error == EINTR)
968 msg->ms_error = sentabort;
969 if (msg->ms_flags & MSGF_QUEUED)
970 _lwkt_pullmsg(port, msg);
971 spin_unlock(&port->mpu_spin);
972 } else {
973 spin_lock(&port->mpu_spin);
974 if (msg->ms_flags & MSGF_QUEUED) {
975 _lwkt_pullmsg(port, msg);
977 spin_unlock(&port->mpu_spin);
979 return(msg->ms_error);
982 static
983 void *
984 lwkt_spin_waitport(lwkt_port_t port, int flags)
986 lwkt_msg_t msg;
987 int error;
989 spin_lock(&port->mpu_spin);
990 while ((msg = _lwkt_pollmsg(port)) == NULL) {
991 port->mp_flags |= MSGPORTF_WAITING;
992 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
993 /* see note at the top on the MSGPORTF_WAITING flag */
994 if (error) {
995 spin_unlock(&port->mpu_spin);
996 return(NULL);
999 _lwkt_pullmsg(port, msg);
1000 spin_unlock(&port->mpu_spin);
1001 return(msg);
1004 static
1005 void
1006 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
1008 int dowakeup;
1010 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1012 if (msg->ms_flags & MSGF_SYNC) {
1014 * If a synchronous completion has been requested, just wakeup
1015 * the message without bothering to queue it to the target port.
1017 * ms_flags protected by reply port spinlock
1019 spin_lock(&port->mpu_spin);
1020 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1021 dowakeup = 0;
1022 if (msg->ms_flags & MSGF_WAITING) {
1023 msg->ms_flags &= ~MSGF_WAITING;
1024 dowakeup = 1;
1026 spin_unlock(&port->mpu_spin);
1027 if (dowakeup)
1028 wakeup(msg);
1029 } else {
1031 * If an asynchronous completion has been requested the message
1032 * must be queued to the reply port.
1034 spin_lock(&port->mpu_spin);
1035 _lwkt_enqueue_reply(port, msg);
1036 dowakeup = 0;
1037 if (port->mp_flags & MSGPORTF_WAITING) {
1038 port->mp_flags &= ~MSGPORTF_WAITING;
1039 dowakeup = 1;
1041 spin_unlock(&port->mpu_spin);
1042 if (dowakeup)
1043 wakeup(port);
1048 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1050 * This function could _only_ be used when caller is in the same thread
1051 * as the message's target port owner thread.
1053 static int
1054 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1056 int error;
1058 KASSERT(port->mpu_td == curthread,
1059 ("message could only be dropped in the same thread "
1060 "as the message target port thread\n"));
1061 spin_lock(&port->mpu_spin);
1062 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1063 _lwkt_pullmsg(port, msg);
1064 msg->ms_flags |= MSGF_DONE;
1065 error = 0;
1066 } else {
1067 error = ENOENT;
1069 spin_unlock(&port->mpu_spin);
1071 return (error);
1074 /************************************************************************
1075 * SERIALIZER PORT BACKEND *
1076 ************************************************************************
1078 * This backend uses serializer to protect port accessing. Callers are
1079 * assumed to have serializer held. This kind of port is usually created
1080 * by network device driver along with _one_ lwkt thread to pipeline
1081 * operations which may temporarily release serializer.
1083 * Implementation is based on SPIN PORT BACKEND.
1086 static
1087 void *
1088 lwkt_serialize_getport(lwkt_port_t port)
1090 lwkt_msg_t msg;
1092 ASSERT_SERIALIZED(port->mpu_serialize);
1094 if ((msg = _lwkt_pollmsg(port)) != NULL)
1095 _lwkt_pullmsg(port, msg);
1096 return(msg);
1099 static
1101 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1103 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1104 ASSERT_SERIALIZED(port->mpu_serialize);
1106 msg->ms_target_port = port;
1107 _lwkt_pushmsg(port, msg);
1108 if (port->mp_flags & MSGPORTF_WAITING) {
1109 port->mp_flags &= ~MSGPORTF_WAITING;
1110 wakeup(port);
1112 return (EASYNC);
1115 static
1117 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1119 lwkt_port_t port;
1120 int sentabort;
1121 int error;
1123 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1124 ("can't wait dropable message"));
1126 if ((msg->ms_flags & MSGF_DONE) == 0) {
1127 port = msg->ms_reply_port;
1129 ASSERT_SERIALIZED(port->mpu_serialize);
1131 sentabort = 0;
1132 while ((msg->ms_flags & MSGF_DONE) == 0) {
1133 void *won;
1136 * If message was sent synchronously from the beginning
1137 * the wakeup will be on the message structure, else it
1138 * will be on the port structure.
1140 if (msg->ms_flags & MSGF_SYNC) {
1141 won = msg;
1142 } else {
1143 won = port;
1144 port->mp_flags |= MSGPORTF_WAITING;
1148 * Only messages which support abort can be interrupted.
1149 * We must still wait for message completion regardless.
1151 if ((flags & PCATCH) && sentabort == 0) {
1152 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1153 if (error) {
1154 sentabort = error;
1155 lwkt_serialize_exit(port->mpu_serialize);
1156 lwkt_abortmsg(msg);
1157 lwkt_serialize_enter(port->mpu_serialize);
1159 } else {
1160 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1162 /* see note at the top on the MSGPORTF_WAITING flag */
1165 * Turn EINTR into ERESTART if the signal indicates.
1167 if (sentabort && msg->ms_error == EINTR)
1168 msg->ms_error = sentabort;
1169 if (msg->ms_flags & MSGF_QUEUED)
1170 _lwkt_pullmsg(port, msg);
1171 } else {
1172 if (msg->ms_flags & MSGF_QUEUED) {
1173 port = msg->ms_reply_port;
1175 ASSERT_SERIALIZED(port->mpu_serialize);
1176 _lwkt_pullmsg(port, msg);
1179 return(msg->ms_error);
1182 static
1183 void *
1184 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1186 lwkt_msg_t msg;
1187 int error;
1189 ASSERT_SERIALIZED(port->mpu_serialize);
1191 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1192 port->mp_flags |= MSGPORTF_WAITING;
1193 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1194 /* see note at the top on the MSGPORTF_WAITING flag */
1195 if (error)
1196 return(NULL);
1198 _lwkt_pullmsg(port, msg);
1199 return(msg);
1202 static
1203 void
1204 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1206 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1207 ASSERT_SERIALIZED(port->mpu_serialize);
1209 if (msg->ms_flags & MSGF_SYNC) {
1211 * If a synchronous completion has been requested, just wakeup
1212 * the message without bothering to queue it to the target port.
1214 * (both sides synchronized via serialized reply port)
1216 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1217 wakeup(msg);
1218 } else {
1220 * If an asynchronous completion has been requested the message
1221 * must be queued to the reply port.
1223 _lwkt_enqueue_reply(port, msg);
1224 if (port->mp_flags & MSGPORTF_WAITING) {
1225 port->mp_flags &= ~MSGPORTF_WAITING;
1226 wakeup(port);
1231 /************************************************************************
1232 * PANIC AND SPECIAL PORT FUNCTIONS *
1233 ************************************************************************/
1236 * You can point a port's reply vector at this function if you just want
1237 * the message marked done, without any queueing or signaling. This is
1238 * often used for structure-embedded messages.
1240 static
1241 void
1242 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1244 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1247 static
1248 void *
1249 lwkt_panic_getport(lwkt_port_t port)
1251 panic("lwkt_getport() illegal on port %p", port);
1254 static
1256 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1258 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1261 static
1263 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1265 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1268 static
1269 void *
1270 lwkt_panic_waitport(lwkt_port_t port, int flags)
1272 panic("port %p cannot be waited on", port);
1275 static
1276 void
1277 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1279 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1282 static
1284 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1286 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1287 /* NOT REACHED */
1288 return (ENOENT);
1291 static
1293 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1295 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1296 port, msg);
1297 /* NOT REACHED */
1298 return (ENOENT);