[t][TT #1119] Convert t/op/bitwise.t to PIR
[parrot.git] / src / events.c
blob8d2ec138d4fa80403f66d0c41c44808ea98284cc
1 /*
2 Copyright (C) 2001-2008, Parrot Foundation.
3 $Id$
5 =head1 NAME
7 src/events.c - Event handling stuff
9 =head1 DESCRIPTION
11 An event_thread handles async events for all interpreters. When events
12 are due, they are placed in per interpreter task_queues, where they are
13 handled then by the C<check_event*> opcodes.
15 IO events and signals are caught in the io_thread, which again
16 dispatches these to one or all interpreters.
18 =over 4
20 =cut
24 #include "parrot/parrot.h"
25 #include "parrot/events.h"
26 #include "events.str"
28 typedef struct pending_io_events {
29 parrot_event **events;
30 size_t n;
31 size_t alloced;
32 } pending_io_events;
34 /* HEADERIZER HFILE: include/parrot/events.h */
35 /* HEADERIZER BEGIN: static */
36 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
38 PARROT_WARN_UNUSED_RESULT
39 PARROT_CAN_RETURN_NULL
40 static opcode_t * do_event(PARROT_INTERP,
41 ARGIN(parrot_event* event),
42 ARGIN_NULLOK(opcode_t *next))
43 __attribute__nonnull__(1)
44 __attribute__nonnull__(2);
46 PARROT_MALLOC
47 PARROT_CANNOT_RETURN_NULL
48 static QUEUE_ENTRY* dup_entry(ARGIN(const QUEUE_ENTRY *entry))
49 __attribute__nonnull__(1);
51 PARROT_WARN_UNUSED_RESULT
52 PARROT_CANNOT_RETURN_NULL
53 static QUEUE_ENTRY* dup_entry_interval(
54 ARGIN(QUEUE_ENTRY *entry),
55 FLOATVAL now)
56 __attribute__nonnull__(1);
58 PARROT_WARN_UNUSED_RESULT
59 PARROT_CAN_RETURN_NULL
60 static void* event_thread(ARGMOD(void *data))
61 __attribute__nonnull__(1)
62 FUNC_MODIFIES(*data);
64 static void event_to_exception(PARROT_INTERP,
65 ARGIN(const parrot_event* event))
66 __attribute__nonnull__(1)
67 __attribute__nonnull__(2);
69 static void init_events_all(PARROT_INTERP)
70 __attribute__nonnull__(1);
72 static void init_events_first(PARROT_INTERP)
73 __attribute__nonnull__(1);
75 PARROT_CAN_RETURN_NULL
76 static void* io_thread(SHIM(void *data));
78 static void io_thread_ready_rd(ARGMOD(pending_io_events *ios), int ready_rd)
79 __attribute__nonnull__(1)
80 FUNC_MODIFIES(*ios);
82 static void Parrot_sigaction(int sig, ARGIN(void (*handler)(int)))
83 __attribute__nonnull__(2);
85 static void Parrot_unblock_signal(int sig);
86 static int process_events(ARGMOD(QUEUE *event_q))
87 __attribute__nonnull__(1)
88 FUNC_MODIFIES(*event_q);
90 static void schedule_signal_event(int signum);
91 static void sig_handler(int signum);
92 static void stop_io_thread(void);
93 static void store_io_event(
94 ARGMOD(pending_io_events *ios),
95 ARGIN(parrot_event *ev))
96 __attribute__nonnull__(1)
97 __attribute__nonnull__(2)
98 FUNC_MODIFIES(*ios);
100 PARROT_WARN_UNUSED_RESULT
101 PARROT_CAN_RETURN_NULL
102 static opcode_t * wait_for_wakeup(PARROT_INTERP,
103 ARGIN_NULLOK(opcode_t *next))
104 __attribute__nonnull__(1);
106 #define ASSERT_ARGS_do_event __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
107 PARROT_ASSERT_ARG(interp) \
108 , PARROT_ASSERT_ARG(event))
109 #define ASSERT_ARGS_dup_entry __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
110 PARROT_ASSERT_ARG(entry))
111 #define ASSERT_ARGS_dup_entry_interval __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
112 PARROT_ASSERT_ARG(entry))
113 #define ASSERT_ARGS_event_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
114 PARROT_ASSERT_ARG(data))
115 #define ASSERT_ARGS_event_to_exception __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
116 PARROT_ASSERT_ARG(interp) \
117 , PARROT_ASSERT_ARG(event))
118 #define ASSERT_ARGS_init_events_all __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
119 PARROT_ASSERT_ARG(interp))
120 #define ASSERT_ARGS_init_events_first __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
121 PARROT_ASSERT_ARG(interp))
122 #define ASSERT_ARGS_io_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = (0)
123 #define ASSERT_ARGS_io_thread_ready_rd __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
124 PARROT_ASSERT_ARG(ios))
125 #define ASSERT_ARGS_Parrot_sigaction __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
126 PARROT_ASSERT_ARG(handler))
127 #define ASSERT_ARGS_Parrot_unblock_signal __attribute__unused__ int _ASSERT_ARGS_CHECK = (0)
128 #define ASSERT_ARGS_process_events __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
129 PARROT_ASSERT_ARG(event_q))
130 #define ASSERT_ARGS_schedule_signal_event __attribute__unused__ int _ASSERT_ARGS_CHECK = (0)
131 #define ASSERT_ARGS_sig_handler __attribute__unused__ int _ASSERT_ARGS_CHECK = (0)
132 #define ASSERT_ARGS_stop_io_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = (0)
133 #define ASSERT_ARGS_store_io_event __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
134 PARROT_ASSERT_ARG(ios) \
135 , PARROT_ASSERT_ARG(ev))
136 #define ASSERT_ARGS_wait_for_wakeup __attribute__unused__ int _ASSERT_ARGS_CHECK = (\
137 PARROT_ASSERT_ARG(interp))
138 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
139 /* HEADERIZER END: static */
142 * event debugging stuff - turn it off before running tests
144 #define EVENT_DEBUG 0
146 * not yet - need to sort out platform code and fix exceptions first
147 * TODO get some config for POSIX compliant
148 * TODO create API for extenders like ponie - events disabled for now
150 #if defined(linux) || defined(darwin)
151 # define INSTALL_EVENT_HANDLER 0
152 #else
153 # define INSTALL_EVENT_HANDLER 0
154 #endif
156 #if EVENT_DEBUG
157 # define edebug(x) fprintf (x)
158 static const char *ev_names[] = {
159 "EVENT_TYPE_NONE",
160 "EVENT_TYPE_EVENT",
161 "EVENT_TYPE_IO",
162 "EVENT_TYPE_MSG",
163 "EVENT_TYPE_TIMER",
164 "EVENT_TYPE_CALL_BACK",
165 "EVENT_TYPE_SLEEP",
166 "EVENT_TYPE_TERMINATE",
167 "EVENT_TYPE_EVENT_TERMINATE",
168 "EVENT_TYPE_CLASS_CHANGED",
169 "EVENT_TYPE_SIGNAL",
170 "EVENT_TYPE_SUSPEND_FOR_GC"
172 static const char*
173 et(const parrot_event* const e)
175 return ev_names[e->type];
178 #else
179 # define edebug(x)
180 #endif
183 /* forward defs */
186 * we have exactly one global event_queue
187 * TODO task prio handling
189 static QUEUE *event_queue;
190 #define TASK_PRIO 10
193 * user accessible signals like SIGINT
195 #ifndef SIGINT
196 # define SIGINT -4711
197 #endif
198 #ifndef SIGHUP
199 # define SIGHUP -4712
200 #endif
203 * XXX need a configure test
204 * should be sig_atomic_t
206 static int sig_int, sig_hup;
209 * a pipe is used to send messages to the IO thread
211 static int pipe_fds[2];
212 #define PIPE_READ_FD pipe_fds[0]
213 #define PIPE_WRITE_FD pipe_fds[1]
216 * a structure to communicate with the io_thread
218 typedef struct io_thread_msg {
219 INTVAL command;
220 parrot_event *ev;
221 } io_thread_msg;
226 =back
228 =head2 Signal Handling
230 =over 4
232 =item C<static void sig_handler(int signum)>
234 Handle signal C<signum>.
236 TODO - Only C<SIGHUP> is handled at the moment for testing
238 =cut
242 static void
243 sig_handler(int signum)
245 ASSERT_ARGS(sig_handler)
246 switch (signum) {
247 case SIGINT:
248 sig_int = 1;
249 break;
250 case SIGHUP:
251 sig_hup = 1;
252 break;
253 default:
254 break;
260 =item C<static void Parrot_sigaction(int sig, void (*handler(int)))>
262 Signal handlers are common to all threads, signal block masks are
263 specific, so we install one handler then block that signal and unblock
264 it in the thread, that will receive that signal.
266 =cut
270 static void
271 Parrot_sigaction(int sig, ARGIN(void (*handler)(int)))
273 ASSERT_ARGS(Parrot_sigaction)
274 #ifdef PARROT_HAS_SIGACTION
275 struct sigaction action;
276 sigset_t block_mask;
278 /* install handler */
279 action.sa_handler = handler;
280 sigemptyset(&action.sa_mask);
281 action.sa_flags = 0;
282 sigaction(sig, &action, NULL);
284 /* block that signal */
285 sigemptyset(&block_mask);
286 sigaddset(&block_mask, sig);
287 sigprocmask(SIG_BLOCK, &block_mask, NULL);
288 #else
289 UNUSED(sig);
290 UNUSED(handler);
291 #endif
297 =item C<static void Parrot_unblock_signal(int sig)>
299 unblock a signal
301 =cut
305 static void
306 Parrot_unblock_signal(int sig)
308 ASSERT_ARGS(Parrot_unblock_signal)
309 #ifdef PARROT_HAS_SIGACTION
310 sigset_t block_mask;
312 sigemptyset(&block_mask);
313 sigaddset(&block_mask, sig);
314 sigprocmask(SIG_UNBLOCK, &block_mask, NULL);
315 #else
316 UNUSED(sig);
317 #endif
323 =item C<void Parrot_init_signals(void)>
325 Set up actions to handle signals.
326 Only SIGHUP handled at the moment.
328 =cut
332 PARROT_EXPORT
333 void
334 Parrot_init_signals(void)
336 ASSERT_ARGS(Parrot_init_signals)
338 * SIGFPE is architecture specific - some signal an error,
339 * some don't, so we have to use direct checks if we are dividing
340 * by zero.
342 Parrot_sigaction(SIGHUP, sig_handler);
347 =back
349 =head2 Initialization
351 =over 4
353 =item C<static void init_events_first(PARROT_INTERP)>
355 Init event system for first interpreter.
357 =cut
361 static void
362 init_events_first(PARROT_INTERP)
364 ASSERT_ARGS(init_events_first)
365 Parrot_thread ev_handle;
366 #ifndef WIN32
367 Parrot_thread io_handle;
368 #endif
371 * be sure all init is done only once
372 * we could use pthread_once for that too
374 if (event_queue)
375 PANIC(interp, "event queue already exists - missing parent_interp?");
377 * create event queue
379 event_queue = queue_init(TASK_PRIO);
381 * we use a message pipe to send IO related stuff to the
382 * IO thread
384 #ifndef WIN32
386 * pipes on WIN32 don't support select
387 * s. p6i: "event.c - of signals and pipes"
389 if (pipe(pipe_fds))
390 Parrot_ex_throw_from_c_args(interp, NULL, 1, "Couldn't create message pipe");
391 #endif
393 * now set some sig handlers before any thread is started, so
394 * that all threads inherit the signal block mask
396 #if INSTALL_EVENT_HANDLER
397 Parrot_init_signals();
398 #endif
400 * we start an event_handler thread
402 THREAD_CREATE_DETACHED(ev_handle, event_thread, event_queue);
404 * and a signal and IO handler thread
406 #ifndef WIN32
407 THREAD_CREATE_DETACHED(io_handle, io_thread, event_queue);
408 #endif
413 =item C<static void init_events_all(PARROT_INTERP)>
415 Init events for all interpreters.
417 =cut
421 static void
422 init_events_all(PARROT_INTERP)
424 ASSERT_ARGS(init_events_all)
426 * create per interpreter task queue
428 interp->task_queue = queue_init(0);
433 =item C<void Parrot_init_events(PARROT_INTERP)>
435 Initialize the event system.
437 =cut
441 PARROT_EXPORT
442 void
443 Parrot_init_events(PARROT_INTERP)
445 ASSERT_ARGS(Parrot_init_events)
446 if (!interp->parent_interpreter) {
447 /* add the very first interpreter to the list of interps. */
448 pt_add_to_interpreters(interp, NULL);
449 init_events_first(interp);
451 init_events_all(interp);
456 =back
458 =head2 Event Handler Functions
460 =over 4
462 =item C<void Parrot_schedule_event(PARROT_INTERP, parrot_event* ev)>
464 Create queue entry and insert event into task queue.
466 =cut
470 PARROT_EXPORT
471 void
472 Parrot_schedule_event(PARROT_INTERP, ARGMOD(parrot_event* ev))
474 ASSERT_ARGS(Parrot_schedule_event)
475 QUEUE_ENTRY * const entry = mem_allocate_typed(QUEUE_ENTRY);
476 entry->next = NULL;
477 ev->interp = interp;
478 entry->data = ev;
479 switch (ev->type) {
480 case EVENT_TYPE_TIMER:
481 case EVENT_TYPE_SLEEP:
482 entry->type = QUEUE_ENTRY_TYPE_TIMED_EVENT;
483 insert_entry(event_queue, entry);
484 break;
485 case EVENT_TYPE_CALL_BACK:
486 case EVENT_TYPE_SIGNAL:
487 case EVENT_TYPE_IO:
488 entry->type = QUEUE_ENTRY_TYPE_EVENT;
489 unshift_entry(event_queue, entry);
490 break;
491 default:
492 entry->type = QUEUE_ENTRY_TYPE_EVENT;
493 push_entry(event_queue, entry);
494 break;
500 =item C<static void schedule_signal_event(int signum)>
502 create and schedule a signal event
504 =cut
508 static void
509 schedule_signal_event(int signum)
511 ASSERT_ARGS(schedule_signal_event)
512 parrot_event* const ev = mem_allocate_typed(parrot_event);
513 QUEUE_ENTRY * const entry = mem_allocate_typed(QUEUE_ENTRY);
515 entry->next = NULL;
516 entry->type = QUEUE_ENTRY_TYPE_EVENT;
517 ev->type = EVENT_TYPE_SIGNAL;
518 ev->u.signal = signum;
519 entry->data = ev;
521 * deliver to all interpreters
523 Parrot_schedule_broadcast_qentry(entry);
528 =item C<void Parrot_new_timer_event(PARROT_INTERP, PMC *timer, FLOATVAL diff,
529 FLOATVAL interval, int repeat, PMC *sub, parrot_event_type_enum typ)>
531 Create a new timer event due at C<diff> from now, repeated at C<interval>
532 and running the passed C<sub>.
534 =cut
538 PARROT_EXPORT
539 void
540 Parrot_new_timer_event(PARROT_INTERP, ARGIN_NULLOK(PMC *timer), FLOATVAL diff,
541 FLOATVAL interval, int repeat, ARGIN_NULLOK(PMC *sub), parrot_event_type_enum typ)
543 ASSERT_ARGS(Parrot_new_timer_event)
544 parrot_event* const ev = mem_allocate_typed(parrot_event);
546 const FLOATVAL now = Parrot_floatval_time();
548 ev->type = typ;
549 ev->u.timer_event.timer = timer;
550 ev->u.timer_event.abs_time = now + diff;
551 ev->u.timer_event.interval = interval;
552 ev->u.timer_event.repeat = repeat;
553 ev->u.timer_event.sub = sub;
555 if (repeat && FLOAT_IS_ZERO(interval))
556 ev->u.timer_event.interval = diff;
558 Parrot_schedule_event(interp, ev);
563 =item C<void Parrot_new_cb_event(PARROT_INTERP, PMC *cbi, char *ext)>
565 Prepare and schedule a callback event.
567 =cut
571 PARROT_EXPORT
572 void
573 Parrot_new_cb_event(PARROT_INTERP, ARGIN(PMC *cbi), ARGIN(char *ext))
575 ASSERT_ARGS(Parrot_new_cb_event)
576 parrot_event* const ev = mem_allocate_typed(parrot_event);
577 QUEUE_ENTRY* const entry = mem_allocate_typed(QUEUE_ENTRY);
579 entry->next = NULL;
580 entry->data = ev;
581 ev->interp = interp;
582 ev->type = EVENT_TYPE_CALL_BACK;
583 ev->u.call_back.cbi = cbi;
584 ev->u.call_back.external_data = ext;
585 Parrot_schedule_interp_qentry(interp, entry);
590 =item C<void Parrot_del_timer_event(PARROT_INTERP, const PMC *timer)>
592 Deactivate the timer identified by C<timer>.
594 =cut
598 PARROT_EXPORT
599 void
600 Parrot_del_timer_event(PARROT_INTERP, ARGIN(const PMC *timer))
602 ASSERT_ARGS(Parrot_del_timer_event)
603 QUEUE_ENTRY *entry;
605 LOCK(event_queue->queue_mutex);
607 for (entry = event_queue->head; entry; entry = entry->next) {
608 if (entry->type == QUEUE_ENTRY_TYPE_TIMED_EVENT) {
610 parrot_event * const event = (parrot_event *)entry->data;
612 if (event->interp == interp
613 && event->u.timer_event.timer == timer) {
614 event->u.timer_event.interval = 0.0;
615 event->type = EVENT_TYPE_NONE;
616 break;
620 UNLOCK(event_queue->queue_mutex);
625 =item C<void Parrot_new_terminate_event(PARROT_INTERP)>
627 Create a terminate event, interpreter will leave the run-loop when this
628 event arrives.
630 =cut
634 PARROT_EXPORT
635 void
636 Parrot_new_terminate_event(PARROT_INTERP)
638 ASSERT_ARGS(Parrot_new_terminate_event)
639 parrot_event* const ev = mem_allocate_typed(parrot_event);
640 ev->type = EVENT_TYPE_TERMINATE;
641 Parrot_schedule_event(interp, ev);
646 =item C<void Parrot_new_suspend_for_gc_event(PARROT_INTERP)>
648 Create a suspend-for-GC event, interpreter will wait on a condition
649 variable for GC to finish when the event arrives.
651 =cut
655 PARROT_EXPORT
656 void
657 Parrot_new_suspend_for_gc_event(PARROT_INTERP)
659 ASSERT_ARGS(Parrot_new_suspend_for_gc_event)
660 QUEUE_ENTRY *qe;
661 parrot_event* const ev = mem_allocate_typed(parrot_event);
662 ev->type = EVENT_TYPE_SUSPEND_FOR_GC;
663 qe = mem_allocate_typed(QUEUE_ENTRY);
664 qe->next = NULL;
665 qe->data = ev;
666 qe->type = QUEUE_ENTRY_TYPE_EVENT;
667 /* we don't use schedule_event because we must modify its
668 * task queue immediately
670 Parrot_schedule_interp_qentry(interp, qe);
675 =item C<void Parrot_kill_event_loop(PARROT_INTERP)>
677 Schedule event-loop terminate event. This shuts down the event thread.
679 =cut
683 PARROT_EXPORT
684 void
685 Parrot_kill_event_loop(PARROT_INTERP)
687 ASSERT_ARGS(Parrot_kill_event_loop)
688 parrot_event* const ev = mem_allocate_typed(parrot_event);
689 ev->type = EVENT_TYPE_EVENT_TERMINATE;
690 Parrot_schedule_event(interp, ev);
695 =item C<void Parrot_schedule_interp_qentry(PARROT_INTERP, struct QUEUE_ENTRY
696 *entry)>
698 Put a queue entry into the interpreters task queue and enable event
699 checking for the interpreter.
701 =cut
705 PARROT_EXPORT
706 void
707 Parrot_schedule_interp_qentry(PARROT_INTERP, ARGIN(struct QUEUE_ENTRY *entry))
709 ASSERT_ARGS(Parrot_schedule_interp_qentry)
710 parrot_event * const event = (parrot_event *)entry->data;
712 * sleep checks events when it awakes
714 edebug((stderr, "got entry - schedule_inter_qentry %s\n", et(event)));
715 if (event->type != EVENT_TYPE_SLEEP)
716 enable_event_checking(interp);
718 * do push_entry last - this signales the queue condition so the
719 * interpreter might starting process that event immediately
721 * we should better use a priority for placing the event
722 * in front or at the end of the queue
724 switch (event->type) {
725 case EVENT_TYPE_CALL_BACK:
726 case EVENT_TYPE_SIGNAL:
727 unshift_entry(interp->task_queue, entry);
728 break;
729 default:
730 push_entry(interp->task_queue, entry);
731 break;
737 =item C<void Parrot_schedule_broadcast_qentry(struct QUEUE_ENTRY *entry)>
739 Broadcast an event.
741 =cut
745 void
746 Parrot_schedule_broadcast_qentry(ARGIN(struct QUEUE_ENTRY *entry))
748 ASSERT_ARGS(Parrot_schedule_broadcast_qentry)
749 parrot_event * const event = (parrot_event *)entry->data;
751 switch (event->type) {
752 case EVENT_TYPE_SIGNAL:
753 edebug((stderr, "broadcast signal\n"));
755 * we don't have special signal handlers in usercode yet
756 * e.g.:
757 * install handler like exception handler *and*
758 * set a interpreter flag, that a handler exists
759 * we then could examine that flag (after LOCKing it)
760 * and dispatch the exception to all interpreters that
761 * handle it
762 * Finally, we send the first (main) interpreter that signal
764 * For now just send to all.
767 switch (event->u.signal) {
768 case SIGHUP:
769 case SIGINT:
771 if (n_interpreters) {
772 size_t i;
773 LOCK(interpreter_array_mutex);
774 for (i = 1; i < n_interpreters; ++i) {
775 Interp *interp;
776 edebug((stderr, "deliver SIGINT to %d\n", i));
777 interp = interpreter_array[i];
778 if (interp)
779 Parrot_schedule_interp_qentry(interp,
780 dup_entry(entry));
782 UNLOCK(interpreter_array_mutex);
784 Parrot_schedule_interp_qentry(interpreter_array[0], entry);
785 edebug((stderr, "deliver SIGINT to 0\n"));
787 break;
788 default:
789 mem_sys_free(entry);
790 mem_sys_free(event);
792 break;
793 default:
794 mem_sys_free(entry);
795 mem_sys_free(event);
796 exit_fatal(1, "Unknown event to broadcast");
797 break;
803 =back
805 =head2 IO Thread Handling
807 =over 4
809 =cut
813 #ifndef WIN32
817 =item C<static void store_io_event(pending_io_events *ios, parrot_event *ev)>
819 Stores an event in the event stack. Allocates memory if necessary.
821 =cut
825 static void
826 store_io_event(ARGMOD(pending_io_events *ios), ARGIN(parrot_event *ev))
828 ASSERT_ARGS(store_io_event)
829 if (!ios->alloced) {
830 ios->alloced = 16;
831 ios->events = mem_allocate_n_zeroed_typed(ios->alloced, parrot_event *);
833 else if (ios->n >= ios->alloced) {
834 ios->alloced *= 2;
835 mem_realloc_n_typed(ios->events, ios->alloced, parrot_event *);
837 ios->events[ios->n++] = ev;
842 =item C<static void io_thread_ready_rd(pending_io_events *ios, int ready_rd)>
844 Takes a list of pending i/o events and a file descriptor.
845 If the fd is ready to read, the event is removed from the
846 "pending" list and moved to the "scheduled" task queue.
848 =cut
852 static void
853 io_thread_ready_rd(ARGMOD(pending_io_events *ios), int ready_rd)
855 ASSERT_ARGS(io_thread_ready_rd)
856 size_t i;
858 for (i = 0; i < ios->n; ++i) {
859 parrot_event * const ev = ios->events[i];
860 PMC * const pio = ev->u.io_event.pio;
861 const int fd = Parrot_io_getfd(ev->interp, pio);
863 if (fd == ready_rd) {
864 /* remove from event list */
865 --ios->n;
867 for (; i < ios->n; ++i)
868 ios->events[i] = ios->events[i+1];
870 Parrot_schedule_event(ev->interp, ev);
871 break;
878 =item C<static void* io_thread(void *data)>
880 The IO thread uses select/poll to handle IO events and signals.
882 It waits on input from the message pipe to insert file descriptors in
883 the wait sets.
885 =cut
889 PARROT_CAN_RETURN_NULL
890 static void*
891 io_thread(SHIM(void *data))
893 ASSERT_ARGS(io_thread)
894 fd_set act_rfds, act_wfds;
895 int n_highest, i;
896 int running = 1;
897 pending_io_events ios;
899 ios.n = 0;
900 ios.alloced = 0;
901 ios.events = 0;
902 /* remember pending io events */
904 FD_ZERO(&act_rfds);
905 FD_ZERO(&act_wfds);
907 * Watch the reader end of the pipe for messages
909 FD_SET(PIPE_READ_FD, &act_rfds);
910 n_highest = PIPE_READ_FD + 1;
912 * all signals that we shall handle here have to be unblocked
913 * in this and only in this thread
915 Parrot_unblock_signal(SIGHUP);
916 while (running) {
917 fd_set rfds = act_rfds;
918 fd_set wfds = act_wfds;
919 const int retval = select(n_highest, &rfds, &wfds, NULL, NULL);
921 switch (retval) {
922 case -1:
923 if (errno == EINTR) {
924 edebug((stderr, "select EINTR\n"));
925 if (sig_int) {
926 edebug((stderr, "int arrived\n"));
927 sig_int = 0;
929 * signal the event thread
931 schedule_signal_event(SIGINT);
933 if (sig_hup) {
934 edebug((stderr, "int arrived\n"));
935 sig_hup = 0;
937 * signal the event thread
939 schedule_signal_event(SIGHUP);
943 break;
944 case 0: /* timeout - can't happen */
945 break;
946 default:
947 edebug((stderr, "IO ready\n"));
948 for (i = 0; i < n_highest; ++i) {
949 if (FD_ISSET(i, &rfds)) {
950 if (i == PIPE_READ_FD) {
951 io_thread_msg buf;
953 * a command arrived
955 edebug((stderr, "msg arrived\n"));
956 if (read(PIPE_READ_FD, &buf, sizeof (buf)) != sizeof (buf))
957 exit_fatal(1,
958 "read error from msg pipe");
959 switch (buf.command) {
960 case IO_THR_MSG_TERMINATE:
961 running = 0;
962 break;
963 case IO_THR_MSG_ADD_SELECT_RD:
965 PMC * const pio = buf.ev->u.io_event.pio;
966 const int fd = Parrot_io_getfd(buf.ev->interp, pio);
967 if (FD_ISSET(fd, &act_rfds)) {
968 mem_sys_free(buf.ev);
969 break;
971 FD_SET(fd, &act_rfds);
972 if (fd >= n_highest)
973 n_highest = fd + 1;
974 store_io_event(&ios, buf.ev);
976 break;
977 default:
978 /* TODO */
979 exit_fatal(1, "unhandled msg in pipe");
980 break;
984 else {
986 * one of the io_event fds is ready
987 * remove from active set, as we don't
988 * want to fire again during io_handler
989 * invocation
991 FD_CLR(i, &act_rfds);
992 io_thread_ready_rd(&ios, i);
996 /* TODO check fds */
997 break;
1000 edebug((stderr, "IO thread terminated\n"));
1001 close(PIPE_READ_FD);
1002 close(PIPE_WRITE_FD);
1003 return NULL;
1005 #endif
1009 =item C<static void stop_io_thread(void)>
1011 Tell the IO thread to stop.
1013 =cut
1017 static void
1018 stop_io_thread(void)
1020 ASSERT_ARGS(stop_io_thread)
1021 #ifndef WIN32
1022 io_thread_msg buf;
1024 * tell IO thread to stop
1026 memset(&buf, 0, sizeof (buf));
1027 buf.command = IO_THR_MSG_TERMINATE;
1028 if (write(PIPE_WRITE_FD, &buf, sizeof (buf)) != sizeof (buf))
1029 exit_fatal(1, "msg pipe write failed");
1030 #endif
1035 =item C<void Parrot_event_add_io_event(PARROT_INTERP, PMC *pio, PMC *sub, PMC
1036 *data, INTVAL which)>
1038 Create new i/o event.
1040 =cut
1044 PARROT_EXPORT
1045 void
1046 Parrot_event_add_io_event(PARROT_INTERP,
1047 ARGIN_NULLOK(PMC *pio), ARGIN_NULLOK(PMC *sub), ARGIN_NULLOK(PMC *data), INTVAL which)
1049 ASSERT_ARGS(Parrot_event_add_io_event)
1050 io_thread_msg buf;
1051 parrot_event * const event = mem_allocate_typed(parrot_event);
1053 event->type = EVENT_TYPE_IO;
1054 event->interp = interp;
1056 * TODO gc_register these PMCs as long as the event system
1057 * owns these 3
1058 * unregister, when event is passed to interp again
1060 event->u.io_event.pio = pio;
1061 event->u.io_event.handler = sub;
1062 event->u.io_event.user_data = data;
1064 buf.command = which;
1065 buf.ev = event;
1066 /* XXX Why isn't this entire function inside an ifndef WIN32? */
1067 #ifndef WIN32
1068 if (write(PIPE_WRITE_FD, &buf, sizeof (buf)) != sizeof (buf))
1069 Parrot_ex_throw_from_c_args(interp, NULL, 1, "msg pipe write failed");
1070 #endif
1076 =back
1078 =head2 Event Handler Thread Functions
1080 =over 4
1082 =item C<static QUEUE_ENTRY* dup_entry(const QUEUE_ENTRY *entry)>
1084 Duplicate queue entry.
1086 =cut
1090 PARROT_MALLOC
1091 PARROT_CANNOT_RETURN_NULL
1092 static QUEUE_ENTRY*
1093 dup_entry(ARGIN(const QUEUE_ENTRY *entry))
1095 ASSERT_ARGS(dup_entry)
1096 QUEUE_ENTRY * const new_entry = mem_allocate_typed(QUEUE_ENTRY);
1098 new_entry->next = NULL;
1099 new_entry->type = entry->type;
1100 new_entry->data = mem_allocate_typed(parrot_event);
1102 mem_sys_memcopy(new_entry->data, entry->data, sizeof (parrot_event));
1103 return new_entry;
1108 =item C<static QUEUE_ENTRY* dup_entry_interval(QUEUE_ENTRY *entry, FLOATVAL
1109 now)>
1111 Duplicate timed entry and add interval to C<abs_time>.
1113 =cut
1117 PARROT_WARN_UNUSED_RESULT
1118 PARROT_CANNOT_RETURN_NULL
1119 static QUEUE_ENTRY*
1120 dup_entry_interval(ARGIN(QUEUE_ENTRY *entry), FLOATVAL now)
1122 ASSERT_ARGS(dup_entry_interval)
1123 QUEUE_ENTRY * const new_entry = dup_entry(entry);
1124 parrot_event * const event = (parrot_event *)new_entry->data;
1126 event->u.timer_event.abs_time = now + event->u.timer_event.interval;
1128 return new_entry;
1133 =item C<static int process_events(QUEUE *event_q)>
1135 Do something, when an event arrived caller has locked the mutex returns
1136 0 if event thread terminates.
1138 =cut
1142 static int
1143 process_events(ARGMOD(QUEUE *event_q))
1145 ASSERT_ARGS(process_events)
1146 FLOATVAL now;
1147 QUEUE_ENTRY *entry;
1149 while ((entry = peek_entry(event_q)) != NULL) {
1151 * one or more entries arrived - we hold the mutex again
1152 * so we have to use the nonsyc_pop_entry to pop off event entries
1154 parrot_event *event = NULL;
1156 switch (entry->type) {
1157 case QUEUE_ENTRY_TYPE_EVENT:
1158 entry = nosync_pop_entry(event_q);
1159 event = (parrot_event *)entry->data;
1160 break;
1162 case QUEUE_ENTRY_TYPE_TIMED_EVENT:
1163 event = (parrot_event *)entry->data;
1164 now = Parrot_floatval_time();
1167 * if the timer_event isn't due yet, ignore the event
1168 * (we were signalled on insert of the event)
1169 * wait until we get at it again when time has elapsed
1171 if (now < event->u.timer_event.abs_time)
1172 return 1;
1173 entry = nosync_pop_entry(event_q);
1175 /* if event is repeated dup and reinsert it */
1177 if (event->u.timer_event.interval) {
1178 if (event->u.timer_event.repeat) {
1179 if (event->u.timer_event.repeat != -1)
1180 event->u.timer_event.repeat--;
1181 nosync_insert_entry(event_q,
1182 dup_entry_interval(entry, now));
1185 break;
1186 default:
1187 exit_fatal(1, "Unknown queue entry");
1189 PARROT_ASSERT(event);
1190 if (event->type == EVENT_TYPE_NONE) {
1191 mem_sys_free(entry);
1192 mem_sys_free(event);
1193 continue;
1195 else if (event->type == EVENT_TYPE_EVENT_TERMINATE) {
1196 mem_sys_free(entry);
1197 mem_sys_free(event);
1199 return 0;
1202 * now insert entry in interpreter task queue
1204 if (event->interp) {
1205 Parrot_schedule_interp_qentry(event->interp, entry);
1207 else {
1208 Parrot_schedule_broadcast_qentry(entry);
1210 } /* while events */
1211 return 1;
1216 =item C<static void* event_thread(void *data)>
1218 The event thread is started by the first interpreter. It handles all
1219 events for all interpreters.
1221 =cut
1225 PARROT_WARN_UNUSED_RESULT
1226 PARROT_CAN_RETURN_NULL
1227 static void*
1228 event_thread(ARGMOD(void *data))
1230 ASSERT_ARGS(event_thread)
1231 QUEUE * const event_q = (QUEUE *) data;
1232 int running = 1;
1234 LOCK(event_q->queue_mutex);
1236 * we might already have an event in the queue
1238 if (peek_entry(event_q))
1239 running = process_events(event_q);
1240 while (running) {
1241 QUEUE_ENTRY * const entry = peek_entry(event_q);
1243 if (!entry) {
1244 /* wait infinite until entry arrives */
1245 queue_wait(event_q);
1247 else if (entry->type == QUEUE_ENTRY_TYPE_TIMED_EVENT) {
1248 /* do a_timedwait for entry */
1249 struct timespec abs_time;
1250 parrot_event * const event = (parrot_event*)entry->data;
1251 const FLOATVAL when = event->u.timer_event.abs_time;
1253 abs_time.tv_sec = (time_t) when;
1254 abs_time.tv_nsec = (long)((when - abs_time.tv_sec)*1000.0f)
1255 *1000L*1000L;
1256 queue_timedwait(event_q, &abs_time);
1258 else {
1259 /* we shouldn't get here probably
1261 exit_fatal(1, "Spurious event");
1265 * one or more entries arrived - we hold the mutex again
1266 * so we have to use the nonsync_pop_entry to pop off event entries
1268 running = process_events(event_q);
1269 } /* event loop */
1271 * the main interpreter is dying
1272 * TODO empty the queue
1274 UNLOCK(event_q->queue_mutex);
1275 queue_destroy(event_q);
1276 stop_io_thread();
1277 edebug((stderr, "event thread stopped\n"));
1278 return NULL;
1283 =back
1285 =head2 Sleep Handling
1287 =over 4
1289 =item C<static opcode_t * wait_for_wakeup(PARROT_INTERP, opcode_t *next)>
1291 Sleep on the event queue condition. If an event arrives, the event
1292 is processed. Terminate the loop if sleeping is finished.
1294 =cut
1298 PARROT_WARN_UNUSED_RESULT
1299 PARROT_CAN_RETURN_NULL
1300 static opcode_t *
1301 wait_for_wakeup(PARROT_INTERP, ARGIN_NULLOK(opcode_t *next))
1303 ASSERT_ARGS(wait_for_wakeup)
1304 QUEUE * const tq = interp->task_queue;
1306 interp->sleeping = 1;
1309 * event handler like callbacks or timers are run as normal code
1310 * so inside such an event handler function, another event might get
1311 * handled, which is good (higher priority events can interrupt
1312 * other event handler). OTOH we must ensure that all state changes
1313 * are done in do_event and we should probably suspend nested
1314 * event handlers sometimes
1316 * FIXME: the same is true for the *next param:
1317 * get rid of that, instead mangle the resume flags
1318 * and offset to stop the runloop
1322 while (interp->sleeping) {
1323 QUEUE_ENTRY * const entry = wait_for_entry(tq);
1324 parrot_event * const event = (parrot_event*)entry->data;
1326 mem_sys_free(entry);
1327 edebug((stderr, "got ev %s head : %p\n", et(event), tq->head));
1328 next = do_event(interp, event, next);
1331 edebug((stderr, "woke up\n"));
1332 return next;
1337 =item C<opcode_t * Parrot_sleep_on_event(PARROT_INTERP, FLOATVAL t, opcode_t
1338 *next)>
1340 Go to sleep. This is called from the C<sleep> opcode.
1342 =cut
1346 PARROT_EXPORT
1347 PARROT_WARN_UNUSED_RESULT
1348 PARROT_CAN_RETURN_NULL
1349 opcode_t *
1350 Parrot_sleep_on_event(PARROT_INTERP, FLOATVAL t, ARGIN_NULLOK(opcode_t *next))
1352 ASSERT_ARGS(Parrot_sleep_on_event)
1353 #ifdef PARROT_HAS_THREADS
1355 if (interp->sleeping)
1356 fprintf(stderr, "nested sleep might not work\n");
1358 * place the opcode_t* next arg in the event data, so that
1359 * we can identify this event in wakeup
1361 Parrot_new_timer_event(interp, (PMC *) next, t, 0.0, 0,
1362 NULL, EVENT_TYPE_SLEEP);
1363 next = wait_for_wakeup(interp, next);
1364 #else
1366 * TODO check for nanosleep or such
1368 Parrot_sleep((UINTVAL) ceil(t));
1369 #endif
1370 return next;
1375 =back
1377 =head2 Event Handling for Run-Loops
1379 =over 4
1381 =item C<opcode_t * Parrot_do_check_events(PARROT_INTERP, opcode_t *next)>
1383 Explicitly C<sync> called by the check_event opcode from run loops.
1385 =cut
1389 PARROT_EXPORT
1390 PARROT_WARN_UNUSED_RESULT
1391 PARROT_CAN_RETURN_NULL
1392 opcode_t *
1393 Parrot_do_check_events(PARROT_INTERP, ARGIN_NULLOK(opcode_t *next))
1395 ASSERT_ARGS(Parrot_do_check_events)
1396 if (peek_entry(interp->task_queue))
1397 return Parrot_do_handle_events(interp, 0, next);
1399 return next;
1404 =item C<static void event_to_exception(PARROT_INTERP, const parrot_event*
1405 event)>
1407 Convert event to exception and throw it.
1409 =cut
1413 static void
1414 event_to_exception(PARROT_INTERP, ARGIN(const parrot_event* event))
1416 ASSERT_ARGS(event_to_exception)
1417 const int exit_code = -event->u.signal;
1419 switch (event->u.signal) {
1420 case SIGINT:
1421 case SIGHUP:
1423 * SIGINT is silent, if no exception handler is
1424 * installed: set severity to EXCEPT_exit
1427 STRING * const message = CONST_STRING(interp, "Caught signal.");
1428 PMC *exception = Parrot_ex_build_exception(interp,
1429 EXCEPT_exit, exit_code, message);
1431 Parrot_ex_throw_from_c(interp, exception);
1433 break;
1434 default:
1435 Parrot_ex_throw_from_c_args(interp, NULL, exit_code,
1436 "Caught signal.");
1437 break;
1443 =item C<static opcode_t * do_event(PARROT_INTERP, parrot_event* event, opcode_t
1444 *next)>
1446 Run user code or such. The C<event> argument is freed after execution.
1448 TODO: Instrument with splint args so splint knows event gets released.
1450 =cut
1454 PARROT_WARN_UNUSED_RESULT
1455 PARROT_CAN_RETURN_NULL
1456 static opcode_t *
1457 do_event(PARROT_INTERP, ARGIN(parrot_event* event), ARGIN_NULLOK(opcode_t *next))
1459 ASSERT_ARGS(do_event)
1460 edebug((stderr, "do_event %s\n", et(event)));
1461 switch (event->type) {
1462 case EVENT_TYPE_TERMINATE:
1463 next = NULL; /* this will terminate the run loop */
1464 break;
1465 case EVENT_TYPE_SIGNAL:
1466 interp->sleeping = 0;
1467 /* generate exception */
1468 event_to_exception(interp, event);
1469 /* not reached - will longjmp */
1470 break;
1471 case EVENT_TYPE_TIMER:
1472 /* run ops, save registers */
1473 Parrot_pcc_invoke_sub_from_c_args(interp,
1474 event->u.timer_event.sub, "->");
1475 break;
1476 case EVENT_TYPE_CALL_BACK:
1477 edebug((stderr, "starting user cb\n"));
1478 Parrot_run_callback(interp, event->u.call_back.cbi,
1479 event->u.call_back.external_data);
1480 break;
1481 case EVENT_TYPE_IO:
1482 edebug((stderr, "starting io handler\n"));
1483 Parrot_pcc_invoke_sub_from_c_args(interp,
1484 event->u.io_event.handler,
1485 "PP->",
1486 event->u.io_event.pio,
1487 event->u.io_event.user_data);
1488 break;
1489 case EVENT_TYPE_SLEEP:
1490 interp->sleeping = 0;
1491 break;
1492 case EVENT_TYPE_SUSPEND_FOR_GC:
1493 edebug((stderr, "suspend for gc\n"));
1494 pt_suspend_self_for_gc(interp);
1495 break;
1496 default:
1497 fprintf(stderr, "Unhandled event type %d\n", (int)event->type);
1498 break;
1500 mem_sys_free(event);
1501 return next;
1506 =item C<opcode_t * Parrot_do_handle_events(PARROT_INTERP, int restore, opcode_t
1507 *next)>
1509 Called by the C<check_event__> opcode from run loops or from above. When
1510 called from the C<check_events__> opcode, we have to restore the
1511 C<op_func_table>.
1513 =cut
1517 PARROT_EXPORT
1518 PARROT_WARN_UNUSED_RESULT
1519 PARROT_CAN_RETURN_NULL
1520 opcode_t *
1521 Parrot_do_handle_events(PARROT_INTERP, int restore, ARGIN_NULLOK(opcode_t *next))
1523 ASSERT_ARGS(Parrot_do_handle_events)
1524 QUEUE * const tq = interp->task_queue;
1526 if (restore)
1527 disable_event_checking(interp);
1529 if (!peek_entry(tq))
1530 return next;
1532 while (peek_entry(tq)) {
1533 QUEUE_ENTRY * const entry = pop_entry(tq);
1534 parrot_event * const event = (parrot_event*)entry->data;
1536 mem_sys_free(entry);
1537 next = do_event(interp, event, next);
1540 return next;
1545 =back
1547 =head1 SEE ALSO
1549 F<include/parrot/events.h> and F<docs/dev/events.pod>.
1551 =cut
1557 * Local variables:
1558 * c-file-style: "parrot"
1559 * End:
1560 * vim: expandtab shiftwidth=4: