[TT #871] Add rand as a dynop, with tests
[parrot.git] / src / events.c
blob78d6c0b5cd4330250ec4e36580c257d54dcb010f
1 /*
2 Copyright (C) 2001-2008, Parrot Foundation.
3 $Id$
5 =head1 NAME
7 src/events.c - Event handling stuff
9 =head1 DESCRIPTION
11 An event_thread handles async events for all interpreters. When events
12 are due, they are placed in per interpreter task_queues, where they are
13 handled then by the C<check_event*> opcodes.
15 IO events and signals are caught in the io_thread, which again
16 dispatches these to one or all interpreters.
18 =over 4
20 =cut
24 #include "parrot/parrot.h"
25 #include "parrot/events.h"
26 #include "events.str"
28 typedef struct pending_io_events {
29 parrot_event **events;
30 size_t n;
31 size_t alloced;
32 } pending_io_events;
34 /* HEADERIZER HFILE: include/parrot/events.h */
35 /* HEADERIZER BEGIN: static */
36 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
38 PARROT_WARN_UNUSED_RESULT
39 PARROT_CAN_RETURN_NULL
40 static opcode_t * do_event(PARROT_INTERP,
41 ARGIN(parrot_event* event),
42 ARGIN_NULLOK(opcode_t *next))
43 __attribute__nonnull__(1)
44 __attribute__nonnull__(2);
46 PARROT_MALLOC
47 PARROT_CANNOT_RETURN_NULL
48 static QUEUE_ENTRY* dup_entry(ARGIN(const QUEUE_ENTRY *entry))
49 __attribute__nonnull__(1);
51 PARROT_WARN_UNUSED_RESULT
52 PARROT_CANNOT_RETURN_NULL
53 static QUEUE_ENTRY* dup_entry_interval(
54 ARGIN(QUEUE_ENTRY *entry),
55 FLOATVAL now)
56 __attribute__nonnull__(1);
58 PARROT_WARN_UNUSED_RESULT
59 PARROT_CAN_RETURN_NULL
60 static void* event_thread(ARGMOD(void *data))
61 __attribute__nonnull__(1)
62 FUNC_MODIFIES(*data);
64 static void event_to_exception(PARROT_INTERP,
65 ARGIN(const parrot_event* event))
66 __attribute__nonnull__(1)
67 __attribute__nonnull__(2);
69 static void init_events_all(PARROT_INTERP)
70 __attribute__nonnull__(1);
72 static void init_events_first(PARROT_INTERP)
73 __attribute__nonnull__(1);
75 PARROT_CAN_RETURN_NULL
76 static void* io_thread(SHIM(void *data));
78 static void io_thread_ready_rd(ARGMOD(pending_io_events *ios), int ready_rd)
79 __attribute__nonnull__(1)
80 FUNC_MODIFIES(*ios);
82 static void Parrot_sigaction(int sig, ARGIN(void (*handler)(int)))
83 __attribute__nonnull__(2);
85 static void Parrot_unblock_signal(int sig);
86 static int process_events(ARGMOD(QUEUE *event_q))
87 __attribute__nonnull__(1)
88 FUNC_MODIFIES(*event_q);
90 static void schedule_signal_event(int signum);
91 static void sig_handler(int signum);
92 static void stop_io_thread(void);
93 static void store_io_event(
94 ARGMOD(pending_io_events *ios),
95 ARGIN(parrot_event *ev))
96 __attribute__nonnull__(1)
97 __attribute__nonnull__(2)
98 FUNC_MODIFIES(*ios);
100 PARROT_WARN_UNUSED_RESULT
101 PARROT_CAN_RETURN_NULL
102 static opcode_t * wait_for_wakeup(PARROT_INTERP,
103 ARGIN_NULLOK(opcode_t *next))
104 __attribute__nonnull__(1);
106 #define ASSERT_ARGS_do_event __attribute__unused__ int _ASSERT_ARGS_CHECK = \
107 PARROT_ASSERT_ARG(interp) \
108 || PARROT_ASSERT_ARG(event)
109 #define ASSERT_ARGS_dup_entry __attribute__unused__ int _ASSERT_ARGS_CHECK = \
110 PARROT_ASSERT_ARG(entry)
111 #define ASSERT_ARGS_dup_entry_interval __attribute__unused__ int _ASSERT_ARGS_CHECK = \
112 PARROT_ASSERT_ARG(entry)
113 #define ASSERT_ARGS_event_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = \
114 PARROT_ASSERT_ARG(data)
115 #define ASSERT_ARGS_event_to_exception __attribute__unused__ int _ASSERT_ARGS_CHECK = \
116 PARROT_ASSERT_ARG(interp) \
117 || PARROT_ASSERT_ARG(event)
118 #define ASSERT_ARGS_init_events_all __attribute__unused__ int _ASSERT_ARGS_CHECK = \
119 PARROT_ASSERT_ARG(interp)
120 #define ASSERT_ARGS_init_events_first __attribute__unused__ int _ASSERT_ARGS_CHECK = \
121 PARROT_ASSERT_ARG(interp)
122 #define ASSERT_ARGS_io_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = 0
123 #define ASSERT_ARGS_io_thread_ready_rd __attribute__unused__ int _ASSERT_ARGS_CHECK = \
124 PARROT_ASSERT_ARG(ios)
125 #define ASSERT_ARGS_Parrot_sigaction __attribute__unused__ int _ASSERT_ARGS_CHECK = \
126 PARROT_ASSERT_ARG(handler)
127 #define ASSERT_ARGS_Parrot_unblock_signal __attribute__unused__ int _ASSERT_ARGS_CHECK = 0
128 #define ASSERT_ARGS_process_events __attribute__unused__ int _ASSERT_ARGS_CHECK = \
129 PARROT_ASSERT_ARG(event_q)
130 #define ASSERT_ARGS_schedule_signal_event __attribute__unused__ int _ASSERT_ARGS_CHECK = 0
131 #define ASSERT_ARGS_sig_handler __attribute__unused__ int _ASSERT_ARGS_CHECK = 0
132 #define ASSERT_ARGS_stop_io_thread __attribute__unused__ int _ASSERT_ARGS_CHECK = 0
133 #define ASSERT_ARGS_store_io_event __attribute__unused__ int _ASSERT_ARGS_CHECK = \
134 PARROT_ASSERT_ARG(ios) \
135 || PARROT_ASSERT_ARG(ev)
136 #define ASSERT_ARGS_wait_for_wakeup __attribute__unused__ int _ASSERT_ARGS_CHECK = \
137 PARROT_ASSERT_ARG(interp)
138 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
139 /* HEADERIZER END: static */
142 * event debugging stuff - turn it off before running tests
144 #define EVENT_DEBUG 0
146 * not yet - need to sort out platform code and fix exceptions first
147 * TODO get some config for POSIX compliant
148 * TODO create API for extenders like ponie - events disabled for now
150 #if defined(linux) || defined(darwin)
151 # define INSTALL_EVENT_HANDLER 0
152 #else
153 # define INSTALL_EVENT_HANDLER 0
154 #endif
156 #if EVENT_DEBUG
157 # define edebug(x) fprintf (x)
158 static const char *ev_names[] = {
159 "EVENT_TYPE_NONE",
160 "EVENT_TYPE_EVENT",
161 "EVENT_TYPE_IO",
162 "EVENT_TYPE_MSG",
163 "EVENT_TYPE_TIMER",
164 "EVENT_TYPE_CALL_BACK",
165 "EVENT_TYPE_SLEEP",
166 "EVENT_TYPE_TERMINATE",
167 "EVENT_TYPE_EVENT_TERMINATE",
168 "EVENT_TYPE_CLASS_CHANGED",
169 "EVENT_TYPE_SIGNAL",
170 "EVENT_TYPE_SUSPEND_FOR_GC"
172 static const char*
173 et(const parrot_event* const e)
175 return ev_names[e->type];
178 #else
179 # define edebug(x)
180 #endif
183 /* forward defs */
186 * we have exactly one global event_queue
187 * TODO task prio handling
189 static QUEUE *event_queue;
190 #define TASK_PRIO 10
193 * user accessible signals like SIGINT
195 #ifndef SIGINT
196 # define SIGINT -4711
197 #endif
198 #ifndef SIGHUP
199 # define SIGHUP -4712
200 #endif
203 * XXX need a configure test
204 * should be sig_atomic_t
206 static int sig_int, sig_hup;
209 * a pipe is used to send messages to the IO thread
211 static int pipe_fds[2];
212 #define PIPE_READ_FD pipe_fds[0]
213 #define PIPE_WRITE_FD pipe_fds[1]
216 * a structure to communicate with the io_thread
218 typedef struct io_thread_msg {
219 INTVAL command;
220 parrot_event *ev;
221 } io_thread_msg;
226 =back
228 =head2 Signal Handling
230 =over 4
232 =item C<static void sig_handler(int signum)>
234 Handle signal C<signum>.
236 TODO - Only C<SIGHUP> is handled at the moment for testing
238 =cut
242 static void
243 sig_handler(int signum)
245 ASSERT_ARGS(sig_handler)
246 switch (signum) {
247 case SIGINT:
248 sig_int = 1;
249 break;
250 case SIGHUP:
251 sig_hup = 1;
252 break;
253 default:
254 break;
260 =item C<static void Parrot_sigaction(int sig, void (*handler(int)))>
262 Signal handlers are common to all threads, signal block masks are
263 specific, so we install one handler then block that signal and unblock
264 it in the thread, that will receive that signal.
266 =cut
270 static void
271 Parrot_sigaction(int sig, ARGIN(void (*handler)(int)))
273 ASSERT_ARGS(Parrot_sigaction)
274 #ifdef PARROT_HAS_SIGACTION
275 struct sigaction action;
276 sigset_t block_mask;
278 /* install handler */
279 action.sa_handler = handler;
280 sigemptyset(&action.sa_mask);
281 action.sa_flags = 0;
282 sigaction(sig, &action, NULL);
284 /* block that signal */
285 sigemptyset(&block_mask);
286 sigaddset(&block_mask, sig);
287 sigprocmask(SIG_BLOCK, &block_mask, NULL);
288 #else
289 UNUSED(sig);
290 UNUSED(handler);
291 #endif
297 =item C<static void Parrot_unblock_signal(int sig)>
299 unblock a signal
301 =cut
305 static void
306 Parrot_unblock_signal(int sig)
308 ASSERT_ARGS(Parrot_unblock_signal)
309 #ifdef PARROT_HAS_SIGACTION
310 sigset_t block_mask;
312 sigemptyset(&block_mask);
313 sigaddset(&block_mask, sig);
314 sigprocmask(SIG_UNBLOCK, &block_mask, NULL);
315 #else
316 UNUSED(sig);
317 #endif
323 =item C<void Parrot_init_signals(void)>
325 Set up actions to handle signals.
326 Only SIGHUP handled at the moment.
328 =cut
332 PARROT_EXPORT
333 void
334 Parrot_init_signals(void)
336 ASSERT_ARGS(Parrot_init_signals)
338 * SIGFPE is architecture specific - some signal an error,
339 * some don't, so we have to use direct checks if we are dividing
340 * by zero.
342 Parrot_sigaction(SIGHUP, sig_handler);
347 =back
349 =head2 Initialization
351 =over 4
353 =item C<static void init_events_first(PARROT_INTERP)>
355 Init event system for first interpreter.
357 =cut
361 static void
362 init_events_first(PARROT_INTERP)
364 ASSERT_ARGS(init_events_first)
365 Parrot_thread ev_handle;
366 #ifndef WIN32
367 Parrot_thread io_handle;
368 #endif
371 * be sure all init is done only once
372 * we could use pthread_once for that too
374 if (event_queue)
375 PANIC(interp, "event queue already exists - missing parent_interp?");
377 * create event queue
379 event_queue = queue_init(TASK_PRIO);
381 * we use a message pipe to send IO related stuff to the
382 * IO thread
384 #ifndef WIN32
386 * pipes on WIN32 don't support select
387 * s. p6i: "event.c - of signals and pipes"
389 if (pipe(pipe_fds))
390 Parrot_ex_throw_from_c_args(interp, NULL, 1, "Couldn't create message pipe");
391 #endif
393 * now set some sig handlers before any thread is started, so
394 * that all threads inherit the signal block mask
396 #if INSTALL_EVENT_HANDLER
397 Parrot_init_signals();
398 #endif
400 * we start an event_handler thread
402 THREAD_CREATE_DETACHED(ev_handle, event_thread, event_queue);
404 * and a signal and IO handler thread
406 #ifndef WIN32
407 THREAD_CREATE_DETACHED(io_handle, io_thread, event_queue);
408 #endif
413 =item C<static void init_events_all(PARROT_INTERP)>
415 Init events for all interpreters.
417 =cut
421 static void
422 init_events_all(PARROT_INTERP)
424 ASSERT_ARGS(init_events_all)
426 * create per interpreter task queue
428 interp->task_queue = queue_init(0);
433 =item C<void Parrot_init_events(PARROT_INTERP)>
435 Initialize the event system.
437 =cut
441 PARROT_EXPORT
442 void
443 Parrot_init_events(PARROT_INTERP)
445 ASSERT_ARGS(Parrot_init_events)
446 if (!interp->parent_interpreter) {
447 /* add the very first interpreter to the list of interps. */
448 pt_add_to_interpreters(interp, NULL);
449 init_events_first(interp);
451 init_events_all(interp);
456 =back
458 =head2 Event Handler Functions
460 =over 4
462 =item C<void Parrot_schedule_event(PARROT_INTERP, parrot_event* ev)>
464 Create queue entry and insert event into task queue.
466 =cut
470 PARROT_EXPORT
471 void
472 Parrot_schedule_event(PARROT_INTERP, ARGMOD(parrot_event* ev))
474 ASSERT_ARGS(Parrot_schedule_event)
475 QUEUE_ENTRY * const entry = mem_allocate_typed(QUEUE_ENTRY);
476 entry->next = NULL;
477 ev->interp = interp;
478 entry->data = ev;
479 switch (ev->type) {
480 case EVENT_TYPE_TIMER:
481 case EVENT_TYPE_SLEEP:
482 entry->type = QUEUE_ENTRY_TYPE_TIMED_EVENT;
483 insert_entry(event_queue, entry);
484 break;
485 case EVENT_TYPE_CALL_BACK:
486 case EVENT_TYPE_SIGNAL:
487 case EVENT_TYPE_IO:
488 entry->type = QUEUE_ENTRY_TYPE_EVENT;
489 unshift_entry(event_queue, entry);
490 break;
491 default:
492 entry->type = QUEUE_ENTRY_TYPE_EVENT;
493 push_entry(event_queue, entry);
494 break;
500 =item C<static void schedule_signal_event(int signum)>
502 create and schedule a signal event
504 =cut
508 static void
509 schedule_signal_event(int signum)
511 ASSERT_ARGS(schedule_signal_event)
512 parrot_event* const ev = mem_allocate_typed(parrot_event);
513 QUEUE_ENTRY * const entry = mem_allocate_typed(QUEUE_ENTRY);
515 entry->next = NULL;
516 entry->type = QUEUE_ENTRY_TYPE_EVENT;
517 ev->type = EVENT_TYPE_SIGNAL;
518 ev->u.signal = signum;
519 entry->data = ev;
521 * deliver to all interpreters
523 Parrot_schedule_broadcast_qentry(entry);
528 =item C<void Parrot_new_timer_event(PARROT_INTERP, PMC *timer, FLOATVAL diff,
529 FLOATVAL interval, int repeat, PMC *sub, parrot_event_type_enum typ)>
531 Create a new timer event due at C<diff> from now, repeated at C<interval>
532 and running the passed C<sub>.
534 =cut
538 PARROT_EXPORT
539 void
540 Parrot_new_timer_event(PARROT_INTERP, ARGIN_NULLOK(PMC *timer), FLOATVAL diff,
541 FLOATVAL interval, int repeat, ARGIN_NULLOK(PMC *sub), parrot_event_type_enum typ)
543 ASSERT_ARGS(Parrot_new_timer_event)
544 parrot_event* const ev = mem_allocate_typed(parrot_event);
546 const FLOATVAL now = Parrot_floatval_time();
548 ev->type = typ;
549 ev->u.timer_event.timer = timer;
550 ev->u.timer_event.abs_time = now + diff;
551 ev->u.timer_event.interval = interval;
552 ev->u.timer_event.repeat = repeat;
553 ev->u.timer_event.sub = sub;
555 if (repeat && FLOAT_IS_ZERO(interval))
556 ev->u.timer_event.interval = diff;
558 Parrot_schedule_event(interp, ev);
563 =item C<void Parrot_new_cb_event(PARROT_INTERP, PMC *cbi, char *ext)>
565 Prepare and schedule a callback event.
567 =cut
571 PARROT_EXPORT
572 void
573 Parrot_new_cb_event(PARROT_INTERP, ARGIN(PMC *cbi), ARGIN(char *ext))
575 ASSERT_ARGS(Parrot_new_cb_event)
576 parrot_event* const ev = mem_allocate_typed(parrot_event);
577 QUEUE_ENTRY* const entry = mem_allocate_typed(QUEUE_ENTRY);
579 entry->next = NULL;
580 entry->data = ev;
581 ev->interp = interp;
582 ev->type = EVENT_TYPE_CALL_BACK;
583 ev->u.call_back.cbi = cbi;
584 ev->u.call_back.external_data = ext;
585 Parrot_schedule_interp_qentry(interp, entry);
590 =item C<void Parrot_del_timer_event(PARROT_INTERP, const PMC *timer)>
592 Deactivate the timer identified by C<timer>.
594 =cut
598 PARROT_EXPORT
599 void
600 Parrot_del_timer_event(PARROT_INTERP, ARGIN(const PMC *timer))
602 ASSERT_ARGS(Parrot_del_timer_event)
603 QUEUE_ENTRY *entry;
605 LOCK(event_queue->queue_mutex);
607 for (entry = event_queue->head; entry; entry = entry->next) {
608 if (entry->type == QUEUE_ENTRY_TYPE_TIMED_EVENT) {
610 parrot_event * const event = (parrot_event *)entry->data;
612 if (event->interp == interp
613 && event->u.timer_event.timer == timer) {
614 event->u.timer_event.interval = 0.0;
615 event->type = EVENT_TYPE_NONE;
616 break;
620 UNLOCK(event_queue->queue_mutex);
625 =item C<void Parrot_new_terminate_event(PARROT_INTERP)>
627 Create a terminate event, interpreter will leave the run-loop when this
628 event arrives.
630 =cut
634 PARROT_EXPORT
635 void
636 Parrot_new_terminate_event(PARROT_INTERP)
638 ASSERT_ARGS(Parrot_new_terminate_event)
639 parrot_event* const ev = mem_allocate_typed(parrot_event);
640 ev->type = EVENT_TYPE_TERMINATE;
641 Parrot_schedule_event(interp, ev);
646 =item C<void Parrot_new_suspend_for_gc_event(PARROT_INTERP)>
648 Create a suspend-for-GC event, interpreter will wait on a condition
649 variable for GC to finish when the event arrives.
651 =cut
655 PARROT_EXPORT
656 void
657 Parrot_new_suspend_for_gc_event(PARROT_INTERP)
659 ASSERT_ARGS(Parrot_new_suspend_for_gc_event)
660 QUEUE_ENTRY *qe;
661 parrot_event* const ev = mem_allocate_typed(parrot_event);
662 ev->type = EVENT_TYPE_SUSPEND_FOR_GC;
663 qe = mem_allocate_typed(QUEUE_ENTRY);
664 qe->next = NULL;
665 qe->data = ev;
666 qe->type = QUEUE_ENTRY_TYPE_EVENT;
667 /* we don't use schedule_event because we must modify its
668 * task queue immediately
670 Parrot_schedule_interp_qentry(interp, qe);
675 =item C<void Parrot_kill_event_loop(PARROT_INTERP)>
677 Schedule event-loop terminate event. This shuts down the event thread.
679 =cut
683 PARROT_EXPORT
684 void
685 Parrot_kill_event_loop(PARROT_INTERP)
687 ASSERT_ARGS(Parrot_kill_event_loop)
688 parrot_event* const ev = mem_allocate_typed(parrot_event);
689 ev->type = EVENT_TYPE_EVENT_TERMINATE;
690 Parrot_schedule_event(interp, ev);
695 =item C<void Parrot_schedule_interp_qentry(PARROT_INTERP, struct QUEUE_ENTRY
696 *entry)>
698 Put a queue entry into the interpreters task queue and enable event
699 checking for the interpreter.
701 =cut
705 PARROT_EXPORT
706 void
707 Parrot_schedule_interp_qentry(PARROT_INTERP, ARGIN(struct QUEUE_ENTRY *entry))
709 ASSERT_ARGS(Parrot_schedule_interp_qentry)
710 parrot_event * const event = (parrot_event *)entry->data;
712 * sleep checks events when it awakes
714 edebug((stderr, "got entry - schedule_inter_qentry %s\n", et(event)));
715 if (event->type != EVENT_TYPE_SLEEP)
716 enable_event_checking(interp);
718 * do push_entry last - this signales the queue condition so the
719 * interpreter might starting process that event immediately
721 * we should better use a priority for placing the event
722 * in front or at the end of the queue
724 switch (event->type) {
725 case EVENT_TYPE_CALL_BACK:
726 case EVENT_TYPE_SIGNAL:
727 unshift_entry(interp->task_queue, entry);
728 break;
729 default:
730 push_entry(interp->task_queue, entry);
731 break;
737 =item C<void Parrot_schedule_broadcast_qentry(struct QUEUE_ENTRY *entry)>
739 Broadcast an event.
741 =cut
745 void
746 Parrot_schedule_broadcast_qentry(ARGIN(struct QUEUE_ENTRY *entry))
748 ASSERT_ARGS(Parrot_schedule_broadcast_qentry)
749 parrot_event * const event = (parrot_event *)entry->data;
751 switch (event->type) {
752 case EVENT_TYPE_SIGNAL:
753 edebug((stderr, "broadcast signal\n"));
755 * we don't have special signal handlers in usercode yet
756 * e.g.:
757 * install handler like exception handler *and*
758 * set a interpreter flag, that a handler exists
759 * we then could examine that flag (after LOCKing it)
760 * and dispatch the exception to all interpreters that
761 * handle it
762 * Finally, we send the first (main) interpreter that signal
764 * For now just send to all.
767 switch (event->u.signal) {
768 case SIGHUP:
769 case SIGINT:
771 if (n_interpreters) {
772 size_t i;
773 LOCK(interpreter_array_mutex);
774 for (i = 1; i < n_interpreters; ++i) {
775 Interp *interp;
776 edebug((stderr, "deliver SIGINT to %d\n", i));
777 interp = interpreter_array[i];
778 if (interp)
779 Parrot_schedule_interp_qentry(interp,
780 dup_entry(entry));
782 UNLOCK(interpreter_array_mutex);
784 Parrot_schedule_interp_qentry(interpreter_array[0], entry);
785 edebug((stderr, "deliver SIGINT to 0\n"));
787 break;
788 default:
789 mem_sys_free(entry);
790 mem_sys_free(event);
792 break;
793 default:
794 mem_sys_free(entry);
795 mem_sys_free(event);
796 exit_fatal(1, "Unknown event to broadcast");
797 break;
803 =back
805 =head2 IO Thread Handling
807 =over 4
809 =cut
813 #ifndef WIN32
817 =item C<static void store_io_event(pending_io_events *ios, parrot_event *ev)>
819 Stores an event in the event stack. Allocates memory if necessary.
821 =cut
825 static void
826 store_io_event(ARGMOD(pending_io_events *ios), ARGIN(parrot_event *ev))
828 ASSERT_ARGS(store_io_event)
829 if (!ios->alloced) {
830 ios->alloced = 16;
831 ios->events = mem_allocate_n_zeroed_typed(ios->alloced, parrot_event *);
833 else if (ios->n >= ios->alloced) {
834 ios->alloced *= 2;
835 mem_realloc_n_typed(ios->events, ios->alloced, parrot_event *);
837 ios->events[ios->n++] = ev;
842 =item C<static void io_thread_ready_rd(pending_io_events *ios, int ready_rd)>
844 Takes a list of pending i/o events and a file descriptor.
845 If the fd is ready to read, the event is removed from the
846 "pending" list and moved to the "scheduled" task queue.
848 =cut
852 static void
853 io_thread_ready_rd(ARGMOD(pending_io_events *ios), int ready_rd)
855 ASSERT_ARGS(io_thread_ready_rd)
856 size_t i;
858 for (i = 0; i < ios->n; ++i) {
859 parrot_event * const ev = ios->events[i];
860 PMC * const pio = ev->u.io_event.pio;
861 const int fd = Parrot_io_getfd(ev->interp, pio);
863 if (fd == ready_rd) {
864 /* remove from event list */
865 --ios->n;
867 for (; i < ios->n; ++i)
868 ios->events[i] = ios->events[i+1];
870 Parrot_schedule_event(ev->interp, ev);
871 break;
878 =item C<static void* io_thread(void *data)>
880 The IO thread uses select/poll to handle IO events and signals.
882 It waits on input from the message pipe to insert file descriptors in
883 the wait sets.
885 =cut
889 PARROT_CAN_RETURN_NULL
890 static void*
891 io_thread(SHIM(void *data))
893 ASSERT_ARGS(io_thread)
894 fd_set act_rfds, act_wfds;
895 int n_highest, i;
896 int running = 1;
897 pending_io_events ios;
899 ios.n = 0;
900 ios.alloced = 0;
901 ios.events = 0;
902 /* remember pending io events */
904 FD_ZERO(&act_rfds);
905 FD_ZERO(&act_wfds);
907 * Watch the reader end of the pipe for messages
909 FD_SET(PIPE_READ_FD, &act_rfds);
910 n_highest = PIPE_READ_FD + 1;
912 * all signals that we shall handle here have to be unblocked
913 * in this and only in this thread
915 Parrot_unblock_signal(SIGHUP);
916 while (running) {
917 fd_set rfds = act_rfds;
918 fd_set wfds = act_wfds;
919 const int retval = select(n_highest, &rfds, &wfds, NULL, NULL);
921 switch (retval) {
922 case -1:
923 if (errno == EINTR) {
924 edebug((stderr, "select EINTR\n"));
925 if (sig_int) {
926 edebug((stderr, "int arrived\n"));
927 sig_int = 0;
929 * signal the event thread
931 schedule_signal_event(SIGINT);
933 if (sig_hup) {
934 edebug((stderr, "int arrived\n"));
935 sig_hup = 0;
937 * signal the event thread
939 schedule_signal_event(SIGHUP);
943 break;
944 case 0: /* timeout - can't happen */
945 break;
946 default:
947 edebug((stderr, "IO ready\n"));
948 for (i = 0; i < n_highest; ++i) {
949 if (FD_ISSET(i, &rfds)) {
950 if (i == PIPE_READ_FD) {
951 io_thread_msg buf;
953 * a command arrived
955 edebug((stderr, "msg arrived\n"));
956 if (read(PIPE_READ_FD, &buf, sizeof (buf)) != sizeof (buf))
957 exit_fatal(1,
958 "read error from msg pipe");
959 switch (buf.command) {
960 case IO_THR_MSG_TERMINATE:
961 running = 0;
962 break;
963 case IO_THR_MSG_ADD_SELECT_RD:
965 PMC * const pio = buf.ev->u.io_event.pio;
966 const int fd = Parrot_io_getfd(buf.ev->interp, pio);
967 if (FD_ISSET(fd, &act_rfds)) {
968 mem_sys_free(buf.ev);
969 break;
971 FD_SET(fd, &act_rfds);
972 if (fd >= n_highest)
973 n_highest = fd + 1;
974 store_io_event(&ios, buf.ev);
976 break;
977 /* TODO */
978 default:
979 exit_fatal(1,
980 "unhandled msg in pipe");
981 break;
985 else {
987 * one of the io_event fds is ready
988 * remove from active set, as we don't
989 * want to fire again during io_handler
990 * invocation
992 FD_CLR(i, &act_rfds);
993 io_thread_ready_rd(&ios, i);
997 /* TODO check fds */
998 break;
1001 edebug((stderr, "IO thread terminated\n"));
1002 close(PIPE_READ_FD);
1003 close(PIPE_WRITE_FD);
1004 return NULL;
1006 #endif
1010 =item C<static void stop_io_thread(void)>
1012 Tell the IO thread to stop.
1014 =cut
1018 static void
1019 stop_io_thread(void)
1021 ASSERT_ARGS(stop_io_thread)
1022 #ifndef WIN32
1023 io_thread_msg buf;
1025 * tell IO thread to stop
1027 memset(&buf, 0, sizeof (buf));
1028 buf.command = IO_THR_MSG_TERMINATE;
1029 if (write(PIPE_WRITE_FD, &buf, sizeof (buf)) != sizeof (buf))
1030 exit_fatal(1, "msg pipe write failed");
1031 #endif
1036 =item C<void Parrot_event_add_io_event(PARROT_INTERP, PMC *pio, PMC *sub, PMC
1037 *data, INTVAL which)>
1039 Create new i/o event.
1041 =cut
1045 PARROT_EXPORT
1046 void
1047 Parrot_event_add_io_event(PARROT_INTERP,
1048 ARGIN_NULLOK(PMC *pio), ARGIN_NULLOK(PMC *sub), ARGIN_NULLOK(PMC *data), INTVAL which)
1050 ASSERT_ARGS(Parrot_event_add_io_event)
1051 io_thread_msg buf;
1052 parrot_event * const event = mem_allocate_typed(parrot_event);
1054 event->type = EVENT_TYPE_IO;
1055 event->interp = interp;
1057 * TODO gc_register these PMCs as long as the event system
1058 * owns these 3
1059 * unregister, when event is passed to interp again
1061 event->u.io_event.pio = pio;
1062 event->u.io_event.handler = sub;
1063 event->u.io_event.user_data = data;
1065 buf.command = which;
1066 buf.ev = event;
1067 /* XXX Why isn't this entire function inside an ifndef WIN32? */
1068 #ifndef WIN32
1069 if (write(PIPE_WRITE_FD, &buf, sizeof (buf)) != sizeof (buf))
1070 Parrot_ex_throw_from_c_args(interp, NULL, 1, "msg pipe write failed");
1071 #endif
1077 =back
1079 =head2 Event Handler Thread Functions
1081 =over 4
1083 =item C<static QUEUE_ENTRY* dup_entry(const QUEUE_ENTRY *entry)>
1085 Duplicate queue entry.
1087 =cut
1091 PARROT_MALLOC
1092 PARROT_CANNOT_RETURN_NULL
1093 static QUEUE_ENTRY*
1094 dup_entry(ARGIN(const QUEUE_ENTRY *entry))
1096 ASSERT_ARGS(dup_entry)
1097 QUEUE_ENTRY * const new_entry = mem_allocate_typed(QUEUE_ENTRY);
1099 new_entry->next = NULL;
1100 new_entry->type = entry->type;
1101 new_entry->data = mem_allocate_typed(parrot_event);
1103 mem_sys_memcopy(new_entry->data, entry->data, sizeof (parrot_event));
1104 return new_entry;
1109 =item C<static QUEUE_ENTRY* dup_entry_interval(QUEUE_ENTRY *entry, FLOATVAL
1110 now)>
1112 Duplicate timed entry and add interval to C<abs_time>.
1114 =cut
1118 PARROT_WARN_UNUSED_RESULT
1119 PARROT_CANNOT_RETURN_NULL
1120 static QUEUE_ENTRY*
1121 dup_entry_interval(ARGIN(QUEUE_ENTRY *entry), FLOATVAL now)
1123 ASSERT_ARGS(dup_entry_interval)
1124 QUEUE_ENTRY * const new_entry = dup_entry(entry);
1125 parrot_event * const event = (parrot_event *)new_entry->data;
1127 event->u.timer_event.abs_time = now + event->u.timer_event.interval;
1129 return new_entry;
1134 =item C<static int process_events(QUEUE *event_q)>
1136 Do something, when an event arrived caller has locked the mutex returns
1137 0 if event thread terminates.
1139 =cut
1143 static int
1144 process_events(ARGMOD(QUEUE *event_q))
1146 ASSERT_ARGS(process_events)
1147 FLOATVAL now;
1148 QUEUE_ENTRY *entry;
1150 while ((entry = peek_entry(event_q)) != NULL) {
1152 * one or more entries arrived - we hold the mutex again
1153 * so we have to use the nonsyc_pop_entry to pop off event entries
1155 parrot_event *event = NULL;
1157 switch (entry->type) {
1158 case QUEUE_ENTRY_TYPE_EVENT:
1159 entry = nosync_pop_entry(event_q);
1160 event = (parrot_event *)entry->data;
1161 break;
1163 case QUEUE_ENTRY_TYPE_TIMED_EVENT:
1164 event = (parrot_event *)entry->data;
1165 now = Parrot_floatval_time();
1168 * if the timer_event isn't due yet, ignore the event
1169 * (we were signalled on insert of the event)
1170 * wait until we get at it again when time has elapsed
1172 if (now < event->u.timer_event.abs_time)
1173 return 1;
1174 entry = nosync_pop_entry(event_q);
1176 /* if event is repeated dup and reinsert it */
1178 if (event->u.timer_event.interval) {
1179 if (event->u.timer_event.repeat) {
1180 if (event->u.timer_event.repeat != -1)
1181 event->u.timer_event.repeat--;
1182 nosync_insert_entry(event_q,
1183 dup_entry_interval(entry, now));
1186 break;
1187 default:
1188 exit_fatal(1, "Unknown queue entry");
1190 PARROT_ASSERT(event);
1191 if (event->type == EVENT_TYPE_NONE) {
1192 mem_sys_free(entry);
1193 mem_sys_free(event);
1194 continue;
1196 else if (event->type == EVENT_TYPE_EVENT_TERMINATE) {
1197 mem_sys_free(entry);
1198 mem_sys_free(event);
1200 return 0;
1203 * now insert entry in interpreter task queue
1205 if (event->interp) {
1206 Parrot_schedule_interp_qentry(event->interp, entry);
1208 else {
1209 Parrot_schedule_broadcast_qentry(entry);
1211 } /* while events */
1212 return 1;
1217 =item C<static void* event_thread(void *data)>
1219 The event thread is started by the first interpreter. It handles all
1220 events for all interpreters.
1222 =cut
1226 PARROT_WARN_UNUSED_RESULT
1227 PARROT_CAN_RETURN_NULL
1228 static void*
1229 event_thread(ARGMOD(void *data))
1231 ASSERT_ARGS(event_thread)
1232 QUEUE * const event_q = (QUEUE *) data;
1233 int running = 1;
1235 LOCK(event_q->queue_mutex);
1237 * we might already have an event in the queue
1239 if (peek_entry(event_q))
1240 running = process_events(event_q);
1241 while (running) {
1242 QUEUE_ENTRY * const entry = peek_entry(event_q);
1244 if (!entry) {
1245 /* wait infinite until entry arrives */
1246 queue_wait(event_q);
1248 else if (entry->type == QUEUE_ENTRY_TYPE_TIMED_EVENT) {
1249 /* do a_timedwait for entry */
1250 struct timespec abs_time;
1251 parrot_event * const event = (parrot_event*)entry->data;
1252 const FLOATVAL when = event->u.timer_event.abs_time;
1254 abs_time.tv_sec = (time_t) when;
1255 abs_time.tv_nsec = (long)((when - abs_time.tv_sec)*1000.0f)
1256 *1000L*1000L;
1257 queue_timedwait(event_q, &abs_time);
1259 else {
1260 /* we shouldn't get here probably
1262 exit_fatal(1, "Spurious event");
1266 * one or more entries arrived - we hold the mutex again
1267 * so we have to use the nonsync_pop_entry to pop off event entries
1269 running = process_events(event_q);
1270 } /* event loop */
1272 * the main interpreter is dying
1273 * TODO empty the queue
1275 UNLOCK(event_q->queue_mutex);
1276 queue_destroy(event_q);
1277 stop_io_thread();
1278 edebug((stderr, "event thread stopped\n"));
1279 return NULL;
1284 =back
1286 =head2 Sleep Handling
1288 =over 4
1290 =item C<static opcode_t * wait_for_wakeup(PARROT_INTERP, opcode_t *next)>
1292 Sleep on the event queue condition. If an event arrives, the event
1293 is processed. Terminate the loop if sleeping is finished.
1295 =cut
1299 PARROT_WARN_UNUSED_RESULT
1300 PARROT_CAN_RETURN_NULL
1301 static opcode_t *
1302 wait_for_wakeup(PARROT_INTERP, ARGIN_NULLOK(opcode_t *next))
1304 ASSERT_ARGS(wait_for_wakeup)
1305 QUEUE * const tq = interp->task_queue;
1307 interp->sleeping = 1;
1310 * event handler like callbacks or timers are run as normal code
1311 * so inside such an event handler function, another event might get
1312 * handled, which is good (higher priority events can interrupt
1313 * other event handler). OTOH we must ensure that all state changes
1314 * are done in do_event and we should probably suspend nested
1315 * event handlers sometimes
1317 * FIXME: the same is true for the *next param:
1318 * get rid of that, instead mangle the resume flags
1319 * and offset to stop the runloop
1323 while (interp->sleeping) {
1324 QUEUE_ENTRY * const entry = wait_for_entry(tq);
1325 parrot_event * const event = (parrot_event*)entry->data;
1327 mem_sys_free(entry);
1328 edebug((stderr, "got ev %s head : %p\n", et(event), tq->head));
1329 next = do_event(interp, event, next);
1332 edebug((stderr, "woke up\n"));
1333 return next;
1338 =item C<opcode_t * Parrot_sleep_on_event(PARROT_INTERP, FLOATVAL t, opcode_t
1339 *next)>
1341 Go to sleep. This is called from the C<sleep> opcode.
1343 =cut
1347 PARROT_EXPORT
1348 PARROT_WARN_UNUSED_RESULT
1349 PARROT_CAN_RETURN_NULL
1350 opcode_t *
1351 Parrot_sleep_on_event(PARROT_INTERP, FLOATVAL t, ARGIN_NULLOK(opcode_t *next))
1353 ASSERT_ARGS(Parrot_sleep_on_event)
1354 #ifdef PARROT_HAS_THREADS
1356 if (interp->sleeping)
1357 fprintf(stderr, "nested sleep might not work\n");
1359 * place the opcode_t* next arg in the event data, so that
1360 * we can identify this event in wakeup
1362 Parrot_new_timer_event(interp, (PMC *) next, t, 0.0, 0,
1363 NULL, EVENT_TYPE_SLEEP);
1364 next = wait_for_wakeup(interp, next);
1365 #else
1367 * TODO check for nanosleep or such
1369 Parrot_sleep((UINTVAL) ceil(t));
1370 #endif
1371 return next;
1376 =back
1378 =head2 Event Handling for Run-Loops
1380 =over 4
1382 =item C<opcode_t * Parrot_do_check_events(PARROT_INTERP, opcode_t *next)>
1384 Explicitly C<sync> called by the check_event opcode from run loops.
1386 =cut
1390 PARROT_EXPORT
1391 PARROT_WARN_UNUSED_RESULT
1392 PARROT_CAN_RETURN_NULL
1393 opcode_t *
1394 Parrot_do_check_events(PARROT_INTERP, ARGIN_NULLOK(opcode_t *next))
1396 ASSERT_ARGS(Parrot_do_check_events)
1397 if (peek_entry(interp->task_queue))
1398 return Parrot_do_handle_events(interp, 0, next);
1400 return next;
1405 =item C<static void event_to_exception(PARROT_INTERP, const parrot_event*
1406 event)>
1408 Convert event to exception and throw it.
1410 =cut
1414 static void
1415 event_to_exception(PARROT_INTERP, ARGIN(const parrot_event* event))
1417 ASSERT_ARGS(event_to_exception)
1418 const int exit_code = -event->u.signal;
1420 switch (event->u.signal) {
1421 case SIGINT:
1422 case SIGHUP:
1424 * SIGINT is silent, if no exception handler is
1425 * installed: set severity to EXCEPT_exit
1428 STRING * const message = CONST_STRING(interp, "Caught signal.");
1429 PMC *exception = Parrot_ex_build_exception(interp,
1430 EXCEPT_exit, exit_code, message);
1432 Parrot_ex_throw_from_c(interp, exception);
1434 break;
1435 default:
1436 Parrot_ex_throw_from_c_args(interp, NULL, exit_code,
1437 "Caught signal.");
1438 break;
1444 =item C<static opcode_t * do_event(PARROT_INTERP, parrot_event* event, opcode_t
1445 *next)>
1447 Run user code or such. The C<event> argument is freed after execution.
1449 TODO: Instrument with splint args so splint knows event gets released.
1451 =cut
1455 PARROT_WARN_UNUSED_RESULT
1456 PARROT_CAN_RETURN_NULL
1457 static opcode_t *
1458 do_event(PARROT_INTERP, ARGIN(parrot_event* event), ARGIN_NULLOK(opcode_t *next))
1460 ASSERT_ARGS(do_event)
1461 edebug((stderr, "do_event %s\n", et(event)));
1462 switch (event->type) {
1463 case EVENT_TYPE_TERMINATE:
1464 next = NULL; /* this will terminate the run loop */
1465 break;
1466 case EVENT_TYPE_SIGNAL:
1467 interp->sleeping = 0;
1468 /* generate exception */
1469 event_to_exception(interp, event);
1470 /* not reached - will longjmp */
1471 break;
1472 case EVENT_TYPE_TIMER:
1473 /* run ops, save registers */
1474 Parrot_runops_fromc_args_event(interp,
1475 event->u.timer_event.sub, "v");
1476 break;
1477 case EVENT_TYPE_CALL_BACK:
1478 edebug((stderr, "starting user cb\n"));
1479 Parrot_run_callback(interp, event->u.call_back.cbi,
1480 event->u.call_back.external_data);
1481 break;
1482 case EVENT_TYPE_IO:
1483 edebug((stderr, "starting io handler\n"));
1484 Parrot_runops_fromc_args_event(interp,
1485 event->u.io_event.handler,
1486 "vPP",
1487 event->u.io_event.pio,
1488 event->u.io_event.user_data);
1489 break;
1490 case EVENT_TYPE_SLEEP:
1491 interp->sleeping = 0;
1492 break;
1493 case EVENT_TYPE_SUSPEND_FOR_GC:
1494 edebug((stderr, "suspend for gc\n"));
1495 pt_suspend_self_for_gc(interp);
1496 break;
1497 default:
1498 fprintf(stderr, "Unhandled event type %d\n", (int)event->type);
1499 break;
1501 mem_sys_free(event);
1502 return next;
1507 =item C<opcode_t * Parrot_do_handle_events(PARROT_INTERP, int restore, opcode_t
1508 *next)>
1510 Called by the C<check_event__> opcode from run loops or from above. When
1511 called from the C<check_events__> opcode, we have to restore the
1512 C<op_func_table>.
1514 =cut
1518 PARROT_EXPORT
1519 PARROT_WARN_UNUSED_RESULT
1520 PARROT_CAN_RETURN_NULL
1521 opcode_t *
1522 Parrot_do_handle_events(PARROT_INTERP, int restore, ARGIN_NULLOK(opcode_t *next))
1524 ASSERT_ARGS(Parrot_do_handle_events)
1525 QUEUE * const tq = interp->task_queue;
1527 if (restore)
1528 disable_event_checking(interp);
1530 if (!peek_entry(tq))
1531 return next;
1533 while (peek_entry(tq)) {
1534 QUEUE_ENTRY * const entry = pop_entry(tq);
1535 parrot_event * const event = (parrot_event*)entry->data;
1537 mem_sys_free(entry);
1538 next = do_event(interp, event, next);
1541 return next;
1546 =back
1548 =head1 SEE ALSO
1550 F<include/parrot/events.h> and F<docs/dev/events.pod>.
1552 =cut
1558 * Local variables:
1559 * c-file-style: "parrot"
1560 * End:
1561 * vim: expandtab shiftwidth=4: