tagged release 0.6.4
[parrot.git] / src / scheduler.c
blobab8e58e771dcb4da363a26aed4c5a7d66c666fe8
1 /*
2 Copyright (C) 2007, The Perl Foundation.
3 $Id$
5 =head1 NAME
7 src/scheduler.c - The core routines for the concurrency scheduler
9 =head1 DESCRIPTION
11 Each interpreter has a concurrency scheduler element in its core struct. The
12 scheduler is responsible for receiveing, dispatching, and monitoring events,
13 exceptions, async I/O, and concurrent tasks (threads).
15 =cut
19 #include "parrot/parrot.h"
20 #include "parrot/scheduler_private.h"
22 #include "pmc/pmc_scheduler.h"
23 #include "pmc/pmc_task.h"
24 #include "pmc/pmc_timer.h"
26 #include "scheduler.str"
28 #define CX_DEBUG 0
30 /* HEADERIZER HFILE: include/parrot/scheduler.h */
32 /* HEADERIZER BEGIN: static */
33 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
35 static void scheduler_process_messages(PARROT_INTERP,
36 ARGMOD(PMC *scheduler))
37 __attribute__nonnull__(1)
38 __attribute__nonnull__(2)
39 FUNC_MODIFIES(*scheduler);
41 static void scheduler_process_wait_list(PARROT_INTERP,
42 ARGMOD(PMC *scheduler))
43 __attribute__nonnull__(1)
44 __attribute__nonnull__(2)
45 FUNC_MODIFIES(*scheduler);
47 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
48 /* HEADERIZER END: static */
52 =head2 Scheduler Interface Functions
54 Functions to interface with the concurrency scheduler.
56 =over 4
58 =item C<void Parrot_cx_init_scheduler>
60 Initalize the concurrency scheduler for the interpreter.
62 =cut
66 void
67 Parrot_cx_init_scheduler(PARROT_INTERP)
69 if (!interp->parent_interpreter) {
70 PMC *scheduler;
72 /* Add the very first interpreter to the list of interps. */
73 pt_add_to_interpreters(interp, NULL);
75 scheduler = pmc_new(interp, enum_class_Scheduler);
76 scheduler = VTABLE_share_ro(interp, scheduler);
78 interp->scheduler = scheduler;
85 =item C<void Parrot_cx_check_tasks>
87 If a wake request has been received, handle tasks.
89 =cut
93 void
94 Parrot_cx_check_tasks(PARROT_INTERP, ARGMOD(PMC *scheduler))
96 if (SCHEDULER_wake_requested_TEST(scheduler))
97 Parrot_cx_handle_tasks(interp, interp->scheduler);
102 =item C<void Parrot_cx_handle_tasks>
104 Handle the pending tasks in the scheduler's task list. Returns when there are
105 no more pending tasks. Returns 0 to terminate the scheduler runloop, or 1 to
106 continue the runloop.
108 =cut
112 PARROT_API
113 void
114 Parrot_cx_handle_tasks(PARROT_INTERP, ARGMOD(PMC *scheduler))
116 SCHEDULER_wake_requested_CLEAR(scheduler);
117 Parrot_cx_refresh_task_list(interp, scheduler);
119 while (VTABLE_get_integer(interp, scheduler) > 0) {
120 PMC * const task = VTABLE_pop_pmc(interp, scheduler);
121 if (!PMC_IS_NULL(task)) {
122 PMC *type_pmc = VTABLE_get_attr_str(interp, task, CONST_STRING(interp, "type"));
123 STRING *type = VTABLE_get_string(interp, type_pmc);
125 if (string_equal(interp, type, CONST_STRING(interp, "callback")) == 0) {
126 Parrot_cx_invoke_callback(interp, task);
128 else if (string_equal(interp, type, CONST_STRING(interp, "timer")) == 0) {
129 Parrot_cx_timer_invoke(interp, task);
131 else if (string_equal(interp, type, CONST_STRING(interp, "event")) == 0) {
132 PMC * const handler = Parrot_cx_find_handler_for_task(interp, task);
133 if (!PMC_IS_NULL(handler)) {
134 PMC * handler_sub =
135 VTABLE_get_attr_str(interp, handler, CONST_STRING(interp, "code"));
136 Parrot_runops_fromc_args_event(interp, handler_sub,
137 "vPP", handler, task);
140 else {
141 real_exception(interp, NULL, INVALID_OPERATION,
142 "Unknown task type '%Ss'.\n", type);
145 Parrot_cx_delete_task(interp, task);
148 /* If the scheduler was flagged to terminate, make sure you process all
149 * tasks. */
150 if (SCHEDULER_terminate_requested_TEST(scheduler))
151 Parrot_cx_refresh_task_list(interp, scheduler);
153 } /* end of pending tasks */
158 =item C<void Parrot_cx_refresh_task_list>
160 Tell the scheduler to perform maintenance on its list of active tasks, checking
161 for completed timers or sleep events, sorting for priority, checking for
162 messages, etc.
164 =cut
168 void
169 Parrot_cx_refresh_task_list(PARROT_INTERP, ARGMOD(PMC *scheduler))
171 scheduler_process_wait_list(interp, scheduler);
172 scheduler_process_messages(interp, scheduler);
174 /* TODO: Sort the task list index */
176 SCHEDULER_cache_valid_SET(scheduler);
177 return;
182 =item C<void Parrot_cx_runloop_wake>
184 Wake a sleeping scheduler runloop (generally called when new tasks are added to
185 the scheduler's task list).
187 =cut
191 void
192 Parrot_cx_runloop_wake(PARROT_INTERP, ARGMOD(PMC *scheduler))
194 enable_event_checking(interp);
195 SCHEDULER_wake_requested_SET(scheduler);
201 =item C<void Parrot_cx_runloop_end>
203 Schedule an event to terminate the scheduler runloop.
205 =cut
209 PARROT_API
210 void
211 Parrot_cx_runloop_end(PARROT_INTERP)
213 SCHEDULER_terminate_requested_SET(interp->scheduler);
214 Parrot_cx_handle_tasks(interp, interp->scheduler);
219 =item C<void Parrot_cx_schedule_task>
221 Add a task to scheduler's task list. Cannot be called across
222 interpreters/threads, must be called from within the interpreter's runloop.
224 =cut
228 PARROT_API
229 void
230 Parrot_cx_schedule_task(PARROT_INTERP, ARGIN(PMC *task))
232 if (interp->scheduler)
233 VTABLE_push_pmc(interp, interp->scheduler, task);
234 else
235 real_exception(interp, NULL, INVALID_OPERATION,
236 "Scheduler was not initialized for this interpreter.\n");
241 =item C<void Parrot_cx_schedule_timer>
243 Create a new timer event due at C<diff> from now, repeated at C<interval>
244 and running the passed C<sub>.
246 =cut
250 PARROT_API
251 void
252 Parrot_cx_schedule_timer(PARROT_INTERP,
253 ARGIN_NULLOK(STRING *type), FLOATVAL duration, FLOATVAL interval,
254 INTVAL repeat, ARGIN_NULLOK(PMC *sub))
256 PMC * const timer = pmc_new(interp, enum_class_Timer);
258 VTABLE_set_number_keyed_int(interp, timer, PARROT_TIMER_NSEC, duration);
259 VTABLE_set_number_keyed_int(interp, timer, PARROT_TIMER_INTERVAL, interval);
260 VTABLE_set_integer_keyed_int(interp, timer, PARROT_TIMER_REPEAT, repeat);
262 if (!PMC_IS_NULL(sub))
263 VTABLE_set_pmc_keyed_int(interp, timer, PARROT_TIMER_HANDLER, sub);
265 if (!STRING_IS_NULL(type))
266 VTABLE_set_string_native(interp, timer, type);
268 if (repeat && FLOAT_IS_ZERO(interval))
269 VTABLE_set_number_keyed_int(interp, timer, PARROT_TIMER_INTERVAL, duration);
271 Parrot_cx_schedule_task(interp, timer);
276 =item C<void Parrot_cx_schedule_repeat>
278 Add a repeat task to scheduler's task list.
280 =cut
284 PARROT_API
285 void
286 Parrot_cx_schedule_repeat(PARROT_INTERP, ARGIN(PMC *task))
288 INTVAL repeat = VTABLE_get_integer_keyed_int(interp, task,
289 PARROT_TIMER_REPEAT);
290 FLOATVAL duration = VTABLE_get_number_keyed_int(interp, task,
291 PARROT_TIMER_INTERVAL);
292 if (repeat != 0) {
293 PMC *repeat_task = VTABLE_clone(interp, task);
294 VTABLE_set_number_keyed_int(interp, repeat_task, PARROT_TIMER_NSEC, duration);
296 if (repeat > 0)
297 VTABLE_set_integer_keyed_int(interp, repeat_task,
298 PARROT_TIMER_REPEAT, repeat - 1);
300 Parrot_cx_schedule_task(interp, repeat_task);
306 =item C<void Parrot_cx_schedule_callback>
308 Create a new callback event, with an argument for the call.
310 =cut
314 PARROT_API
315 void
316 Parrot_cx_schedule_callback(PARROT_INTERP,
317 ARGIN(PMC *user_data), ARGIN(char *ext_data))
319 PMC *callback = pmc_new(interp, enum_class_Task);
320 Parrot_Task * const task_struct = PARROT_TASK(callback);
322 task_struct->type = CONST_STRING(interp, "callback");
323 task_struct->data = user_data;
324 task_struct->cb_data = ext_data;
326 Parrot_cx_schedule_task(interp, callback);
331 =item C<void Parrot_cx_request_suspend_for_gc>
333 Tell the scheduler to suspend for GC at the next safe pause.
335 =cut
339 PARROT_API
340 void
341 Parrot_cx_request_suspend_for_gc(PARROT_INTERP)
343 #if CX_DEBUG
344 fprintf(stderr, "requesting gc suspend [interp=%p]\n", interp);
345 #endif
346 Parrot_cx_send_message(interp, CONST_STRING(interp, "suspend_for_gc"), PMCNULL);
351 =item C<void Parrot_cx_delete_task>
353 Remove a task from the scheduler's task list.
355 =cut
359 PARROT_API
360 void
361 Parrot_cx_delete_task(PARROT_INTERP, ARGIN(PMC *task))
363 if (interp->scheduler) {
364 const INTVAL tid = VTABLE_get_integer(interp, task);
365 VTABLE_delete_keyed_int(interp, interp->scheduler, tid);
367 else
368 real_exception(interp, NULL, INVALID_OPERATION,
369 "Scheduler was not initialized for this interpreter.\n");
375 =item C<PMC * Parrot_cx_delete_suspend_for_gc>
377 Remove a message that would suspend GC from the message queue. (Provided for
378 backward compatibility in the threads implementation.)
380 =cut
384 PARROT_API
385 PARROT_CAN_RETURN_NULL
386 PMC *
387 Parrot_cx_delete_suspend_for_gc(PARROT_INTERP)
389 if (interp->scheduler) {
390 Parrot_Scheduler * sched_struct = PARROT_SCHEDULER(interp->scheduler);
391 INTVAL num_tasks, index;
393 #if CX_DEBUG
394 fprintf(stderr, "called delete_suspend_for_gc\n");
395 #endif
397 #if CX_DEBUG
398 fprintf(stderr, "locking msg_lock (delete) [interp=%p]\n", interp);
399 #endif
400 LOCK(sched_struct->msg_lock);
401 /* Search the task index for GC suspend tasks */
402 num_tasks = VTABLE_elements(interp, sched_struct->messages);
403 for (index = 0; index < num_tasks; index++) {
404 PMC *message = VTABLE_get_pmc_keyed_int(interp, sched_struct->messages, index);
405 if (!PMC_IS_NULL(message)
406 && string_equal(interp, VTABLE_get_string(interp, message),
407 CONST_STRING(interp, "suspend_for_gc")) == 0) {
408 VTABLE_delete_keyed_int(interp, sched_struct->messages, index);
409 UNLOCK(sched_struct->msg_lock);
410 return message;
413 #if CX_DEBUG
414 fprintf(stderr, "unlocking msg_lock (delete) [interp=%p]\n", interp);
415 #endif
416 UNLOCK(sched_struct->msg_lock);
419 else
420 real_exception(interp, NULL, INVALID_OPERATION,
421 "Scheduler was not initialized for this interpreter.\n");
423 return PMCNULL;
428 =item C<void Parrot_cx_add_handler>
430 Add a task handler to scheduler's list of handlers.
432 =cut
436 PARROT_API
437 void
438 Parrot_cx_add_handler(PARROT_INTERP, ARGIN(PMC *handler))
440 if (interp->scheduler)
441 Parrot_PCCINVOKE(interp, interp->scheduler,
442 CONST_STRING(interp, "add_handler"), "P->", handler);
443 else
444 real_exception(interp, NULL, INVALID_OPERATION,
445 "Scheduler was not initialized for this interpreter.\n");
446 return;
451 =back
453 =head2 Scheduler Message Interface Functions
455 Functions that are used to interface with the message queue in the concurrency
456 scheduler.
458 =over 4
460 =item C<void Parrot_cx_send_message>
462 Send a message to a scheduler in a different interpreter/thread.
464 =cut
468 PARROT_API
469 void
470 Parrot_cx_send_message(PARROT_INTERP, ARGIN(STRING *messagetype), ARGIN_NULLOK(PMC *payload))
472 if (interp->scheduler) {
473 Parrot_Scheduler * sched_struct = PARROT_SCHEDULER(interp->scheduler);
474 PMC *message = pmc_new(interp, enum_class_SchedulerMessage);
475 VTABLE_set_string_native(interp, message, messagetype);
476 message = VTABLE_share_ro(interp, message);
478 #if CX_DEBUG
479 fprintf(stderr, "sending message[interp=%p]\n", interp);
480 #endif
482 #if CX_DEBUG
483 fprintf(stderr, "locking msg_lock (send) [interp=%p]\n", interp);
484 #endif
485 LOCK(sched_struct->msg_lock);
486 VTABLE_push_pmc(interp, sched_struct->messages, message);
487 #if CX_DEBUG
488 fprintf(stderr, "unlocking msg_lock (send) [interp=%p]\n", interp);
489 #endif
490 UNLOCK(sched_struct->msg_lock);
491 Parrot_cx_runloop_wake(interp, interp->scheduler);
499 =item C<void Parrot_cx_broadcast_message>
501 Send a message to the schedulers in all interpreters/threads linked to this
502 one.
504 =cut
508 PARROT_API
509 void
510 Parrot_cx_broadcast_message(PARROT_INTERP, ARGIN(STRING *messagetype), ARGIN_NULLOK(PMC *data))
512 UINTVAL i;
513 LOCK(interpreter_array_mutex);
514 for (i = 0; i < n_interpreters; ++i) {
515 Parrot_Interp other_interp = interpreter_array[i];
516 if (interp == other_interp)
517 continue;
518 Parrot_cx_send_message(other_interp, messagetype, data);
520 UNLOCK(interpreter_array_mutex);
526 =back
528 =head2 Task Interface Functions
530 Functions that are used to interface with a specific task in the concurrency scheduler.
532 =over 4
534 =item C<PMC * Parrot_cx_find_handler_for_task>
536 Retrieve a handler appropriate to a given task. If the scheduler has no
537 appropriate handler, returns PMCNULL.
539 =cut
543 PARROT_API
544 PARROT_CAN_RETURN_NULL
545 PMC *
546 Parrot_cx_find_handler_for_task(PARROT_INTERP, ARGIN(PMC *task))
548 PMC *handler = PMCNULL;
549 #if CX_DEBUG
550 fprintf(stderr, "searching for handler\n");
551 #endif
553 if (interp->scheduler)
554 Parrot_PCCINVOKE(interp, interp->scheduler,
555 CONST_STRING(interp, "find_handler"), "P->P", task, &handler);
556 else
557 real_exception(interp, NULL, INVALID_OPERATION,
558 "Scheduler was not initialized for this interpreter.\n");
560 #if CX_DEBUG
561 fprintf(stderr, "done searching for handler\n");
562 #endif
563 return handler;
568 =item C<void Parrot_cx_timer_invoke>
570 Run the associated code block for a timer event, when the timer fires.
572 =cut
576 void
577 Parrot_cx_timer_invoke(PARROT_INTERP, ARGIN(PMC *timer))
579 Parrot_Timer * const timer_struct = PARROT_TIMER(timer);
580 #if CX_DEBUG
581 fprintf(stderr, "current timer time: %f, %f\n",
582 timer_struct->birthtime + timer_struct->duration,
583 Parrot_floatval_time());
584 #endif
585 if (!PMC_IS_NULL(timer_struct->codeblock)) {
586 Parrot_runops_fromc_args_event(interp,
587 timer_struct->codeblock, "v");
593 =item C<void Parrot_cx_invoke_callback>
595 Run the associated code block for a callback event.
597 =cut
601 void
602 Parrot_cx_invoke_callback(PARROT_INTERP, ARGIN(PMC *callback))
604 Parrot_Task * const task_struct = PARROT_TASK(callback);
605 if (!PMC_IS_NULL(task_struct->data)) {
606 Parrot_run_callback(interp, task_struct->data,
607 task_struct->cb_data);
613 =back
615 =head2 Opcode Functions
617 Functions that are called from within opcodes, that take and return an
618 opcode_t* to allow for changing the code flow.
620 =over 4
623 =item C<opcode_t * Parrot_cx_schedule_sleep>
625 Add a sleep timer to the scheduler. This function is called by the C<sleep>
626 opcode.
628 =cut
632 PARROT_API
633 PARROT_WARN_UNUSED_RESULT
634 PARROT_CAN_RETURN_NULL
635 opcode_t *
636 Parrot_cx_schedule_sleep(PARROT_INTERP, FLOATVAL time, ARGIN_NULLOK(opcode_t *next))
638 #if PARROT_HAS_THREADS
639 Parrot_cond condition;
640 Parrot_mutex lock;
641 FLOATVAL timer_end = time + Parrot_floatval_time();
642 struct timespec time_struct;
644 /* Tell the scheduler runloop to wake, this is a good time to process
645 * pending tasks. */
646 Parrot_cx_runloop_wake(interp, interp->scheduler);
648 /* Tell this thread to sleep for the requested time. */
649 COND_INIT(condition);
650 MUTEX_INIT(lock);
651 LOCK(lock);
652 time_struct.tv_sec = (time_t) timer_end;
653 time_struct.tv_nsec = (long)((timer_end - time_struct.tv_sec)*1000.0f) *1000L*1000L;
654 COND_TIMED_WAIT(condition, lock, &time_struct);
655 UNLOCK(lock);
656 COND_DESTROY(condition);
657 MUTEX_DESTROY(lock);
658 #else
659 /* A more primitive, platform-specific, non-threaded form of sleep. */
660 Parrot_sleep((UINTVAL) ceil(time));
661 #endif
662 return next;
668 =back
670 =head2 Internal Functions
672 Functions that are only used within the scheduler.
674 =over 4
676 =item C<static void scheduler_process_wait_list>
678 Scheduler maintenance, scan the list of waiting tasks to see if any are ready
679 to become active tasks.
681 =cut
685 static void
686 scheduler_process_wait_list(PARROT_INTERP, ARGMOD(PMC *scheduler))
688 Parrot_Scheduler * sched_struct = PARROT_SCHEDULER(scheduler);
689 INTVAL num_tasks, index;
691 /* Sweep the wait list for completed timers */
692 num_tasks = VTABLE_elements(interp, sched_struct->wait_index);
693 for (index = 0; index < num_tasks; index++) {
694 INTVAL tid = VTABLE_get_integer_keyed_int(interp, sched_struct->wait_index, index);
695 if (tid > 0) {
696 PMC *task = VTABLE_get_pmc_keyed_int(interp, sched_struct->task_list, tid);
697 if (PMC_IS_NULL(task)) {
698 /* Cleanup expired tasks. */
699 VTABLE_set_integer_keyed_int(interp, sched_struct->wait_index, index, 0);
701 else {
702 /* Move the timer to the active task list if the timer has
703 * completed. */
704 FLOATVAL timer_end_time = VTABLE_get_number_keyed_int(interp,
705 task, PARROT_TIMER_NSEC);
706 if (timer_end_time <= Parrot_floatval_time()) {
707 VTABLE_push_integer(interp, sched_struct->task_index, tid);
708 VTABLE_set_integer_keyed_int(interp, sched_struct->wait_index, index, 0);
709 Parrot_cx_schedule_repeat(interp, task);
710 SCHEDULER_cache_valid_CLEAR(scheduler);
719 =over 4
721 =item C<static void scheduler_process_messages>
723 Scheduler maintenance, scan the list of messages sent from other schedulers and
724 take appropriate action on any received.
726 =cut
730 static void
731 scheduler_process_messages(PARROT_INTERP, ARGMOD(PMC *scheduler))
733 Parrot_Scheduler * sched_struct = PARROT_SCHEDULER(scheduler);
734 PMC *message;
736 #if CX_DEBUG
737 fprintf(stderr, "processing messages [interp=%p]\n", interp);
738 #endif
740 while (VTABLE_elements(interp, sched_struct->messages) > 0) {
741 #if CX_DEBUG
742 fprintf(stderr, "locking msg_lock (process) [interp=%p]\n", interp);
743 #endif
744 LOCK(sched_struct->msg_lock);
745 message = VTABLE_pop_pmc(interp, sched_struct->messages);
746 #if CX_DEBUG
747 fprintf(stderr, "unlocking msg_lock (process) [interp=%p]\n", interp);
748 #endif
749 UNLOCK(sched_struct->msg_lock);
750 if (!PMC_IS_NULL(message)
751 && string_equal(interp, VTABLE_get_string(interp, message),
752 CONST_STRING(interp, "suspend_for_gc")) == 0) {
753 #if CX_DEBUG
754 fprintf(stderr, "found a suspend, suspending [interp=%p]\n", interp);
755 #endif
756 pt_suspend_self_for_gc(interp);
764 =back
766 =cut
772 * Local variables:
773 * c-file-style: "parrot"
774 * End:
775 * vim: expandtab shiftwidth=4: