2 Copyright (C) 2007, The Perl Foundation.
7 src/scheduler.c - The core routines for the concurrency scheduler
11 Each interpreter has a concurrency scheduler element in its core struct. The
12 scheduler is responsible for receiveing, dispatching, and monitoring events,
13 exceptions, async I/O, and concurrent tasks (threads).
19 #include "parrot/parrot.h"
20 #include "parrot/scheduler_private.h"
22 #include "pmc/pmc_scheduler.h"
23 #include "pmc/pmc_task.h"
24 #include "pmc/pmc_timer.h"
26 #include "scheduler.str"
30 /* HEADERIZER HFILE: include/parrot/scheduler.h */
32 /* HEADERIZER BEGIN: static */
33 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
35 static void scheduler_process_messages(PARROT_INTERP
,
36 ARGMOD(PMC
*scheduler
))
37 __attribute__nonnull__(1)
38 __attribute__nonnull__(2)
39 FUNC_MODIFIES(*scheduler
);
41 static void scheduler_process_wait_list(PARROT_INTERP
,
42 ARGMOD(PMC
*scheduler
))
43 __attribute__nonnull__(1)
44 __attribute__nonnull__(2)
45 FUNC_MODIFIES(*scheduler
);
47 /* Don't modify between HEADERIZER BEGIN / HEADERIZER END. Your changes will be lost. */
48 /* HEADERIZER END: static */
52 =head2 Scheduler Interface Functions
54 Functions to interface with the concurrency scheduler.
58 =item C<void Parrot_cx_init_scheduler>
60 Initalize the concurrency scheduler for the interpreter.
67 Parrot_cx_init_scheduler(PARROT_INTERP
)
69 if (!interp
->parent_interpreter
) {
72 /* Add the very first interpreter to the list of interps. */
73 pt_add_to_interpreters(interp
, NULL
);
75 scheduler
= pmc_new(interp
, enum_class_Scheduler
);
76 scheduler
= VTABLE_share_ro(interp
, scheduler
);
78 interp
->scheduler
= scheduler
;
85 =item C<void Parrot_cx_check_tasks>
87 If a wake request has been received, handle tasks.
94 Parrot_cx_check_tasks(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
96 if (SCHEDULER_wake_requested_TEST(scheduler
))
97 Parrot_cx_handle_tasks(interp
, interp
->scheduler
);
102 =item C<void Parrot_cx_handle_tasks>
104 Handle the pending tasks in the scheduler's task list. Returns when there are
105 no more pending tasks. Returns 0 to terminate the scheduler runloop, or 1 to
106 continue the runloop.
114 Parrot_cx_handle_tasks(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
116 SCHEDULER_wake_requested_CLEAR(scheduler
);
117 Parrot_cx_refresh_task_list(interp
, scheduler
);
119 while (VTABLE_get_integer(interp
, scheduler
) > 0) {
120 PMC
* const task
= VTABLE_pop_pmc(interp
, scheduler
);
121 if (!PMC_IS_NULL(task
)) {
122 PMC
*type_pmc
= VTABLE_get_attr_str(interp
, task
, CONST_STRING(interp
, "type"));
123 STRING
*type
= VTABLE_get_string(interp
, type_pmc
);
125 if (string_equal(interp
, type
, CONST_STRING(interp
, "callback")) == 0) {
126 Parrot_cx_invoke_callback(interp
, task
);
128 else if (string_equal(interp
, type
, CONST_STRING(interp
, "timer")) == 0) {
129 Parrot_cx_timer_invoke(interp
, task
);
131 else if (string_equal(interp
, type
, CONST_STRING(interp
, "event")) == 0) {
132 PMC
* const handler
= Parrot_cx_find_handler_for_task(interp
, task
);
133 if (!PMC_IS_NULL(handler
)) {
135 VTABLE_get_attr_str(interp
, handler
, CONST_STRING(interp
, "code"));
136 Parrot_runops_fromc_args_event(interp
, handler_sub
,
137 "vPP", handler
, task
);
141 real_exception(interp
, NULL
, INVALID_OPERATION
,
142 "Unknown task type '%Ss'.\n", type
);
145 Parrot_cx_delete_task(interp
, task
);
148 /* If the scheduler was flagged to terminate, make sure you process all
150 if (SCHEDULER_terminate_requested_TEST(scheduler
))
151 Parrot_cx_refresh_task_list(interp
, scheduler
);
153 } /* end of pending tasks */
158 =item C<void Parrot_cx_refresh_task_list>
160 Tell the scheduler to perform maintenance on its list of active tasks, checking
161 for completed timers or sleep events, sorting for priority, checking for
169 Parrot_cx_refresh_task_list(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
171 scheduler_process_wait_list(interp
, scheduler
);
172 scheduler_process_messages(interp
, scheduler
);
174 /* TODO: Sort the task list index */
176 SCHEDULER_cache_valid_SET(scheduler
);
182 =item C<void Parrot_cx_runloop_wake>
184 Wake a sleeping scheduler runloop (generally called when new tasks are added to
185 the scheduler's task list).
192 Parrot_cx_runloop_wake(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
194 enable_event_checking(interp
);
195 SCHEDULER_wake_requested_SET(scheduler
);
201 =item C<void Parrot_cx_runloop_end>
203 Schedule an event to terminate the scheduler runloop.
211 Parrot_cx_runloop_end(PARROT_INTERP
)
213 SCHEDULER_terminate_requested_SET(interp
->scheduler
);
214 Parrot_cx_handle_tasks(interp
, interp
->scheduler
);
219 =item C<void Parrot_cx_schedule_task>
221 Add a task to scheduler's task list. Cannot be called across
222 interpreters/threads, must be called from within the interpreter's runloop.
230 Parrot_cx_schedule_task(PARROT_INTERP
, ARGIN(PMC
*task
))
232 if (interp
->scheduler
)
233 VTABLE_push_pmc(interp
, interp
->scheduler
, task
);
235 real_exception(interp
, NULL
, INVALID_OPERATION
,
236 "Scheduler was not initialized for this interpreter.\n");
241 =item C<void Parrot_cx_schedule_timer>
243 Create a new timer event due at C<diff> from now, repeated at C<interval>
244 and running the passed C<sub>.
252 Parrot_cx_schedule_timer(PARROT_INTERP
,
253 ARGIN_NULLOK(STRING
*type
), FLOATVAL duration
, FLOATVAL interval
,
254 INTVAL repeat
, ARGIN_NULLOK(PMC
*sub
))
256 PMC
* const timer
= pmc_new(interp
, enum_class_Timer
);
258 VTABLE_set_number_keyed_int(interp
, timer
, PARROT_TIMER_NSEC
, duration
);
259 VTABLE_set_number_keyed_int(interp
, timer
, PARROT_TIMER_INTERVAL
, interval
);
260 VTABLE_set_integer_keyed_int(interp
, timer
, PARROT_TIMER_REPEAT
, repeat
);
262 if (!PMC_IS_NULL(sub
))
263 VTABLE_set_pmc_keyed_int(interp
, timer
, PARROT_TIMER_HANDLER
, sub
);
265 if (!STRING_IS_NULL(type
))
266 VTABLE_set_string_native(interp
, timer
, type
);
268 if (repeat
&& FLOAT_IS_ZERO(interval
))
269 VTABLE_set_number_keyed_int(interp
, timer
, PARROT_TIMER_INTERVAL
, duration
);
271 Parrot_cx_schedule_task(interp
, timer
);
276 =item C<void Parrot_cx_schedule_repeat>
278 Add a repeat task to scheduler's task list.
286 Parrot_cx_schedule_repeat(PARROT_INTERP
, ARGIN(PMC
*task
))
288 INTVAL repeat
= VTABLE_get_integer_keyed_int(interp
, task
,
289 PARROT_TIMER_REPEAT
);
290 FLOATVAL duration
= VTABLE_get_number_keyed_int(interp
, task
,
291 PARROT_TIMER_INTERVAL
);
293 PMC
*repeat_task
= VTABLE_clone(interp
, task
);
294 VTABLE_set_number_keyed_int(interp
, repeat_task
, PARROT_TIMER_NSEC
, duration
);
297 VTABLE_set_integer_keyed_int(interp
, repeat_task
,
298 PARROT_TIMER_REPEAT
, repeat
- 1);
300 Parrot_cx_schedule_task(interp
, repeat_task
);
306 =item C<void Parrot_cx_schedule_callback>
308 Create a new callback event, with an argument for the call.
316 Parrot_cx_schedule_callback(PARROT_INTERP
,
317 ARGIN(PMC
*user_data
), ARGIN(char *ext_data
))
319 PMC
*callback
= pmc_new(interp
, enum_class_Task
);
320 Parrot_Task
* const task_struct
= PARROT_TASK(callback
);
322 task_struct
->type
= CONST_STRING(interp
, "callback");
323 task_struct
->data
= user_data
;
324 task_struct
->cb_data
= ext_data
;
326 Parrot_cx_schedule_task(interp
, callback
);
331 =item C<void Parrot_cx_request_suspend_for_gc>
333 Tell the scheduler to suspend for GC at the next safe pause.
341 Parrot_cx_request_suspend_for_gc(PARROT_INTERP
)
344 fprintf(stderr
, "requesting gc suspend [interp=%p]\n", interp
);
346 Parrot_cx_send_message(interp
, CONST_STRING(interp
, "suspend_for_gc"), PMCNULL
);
351 =item C<void Parrot_cx_delete_task>
353 Remove a task from the scheduler's task list.
361 Parrot_cx_delete_task(PARROT_INTERP
, ARGIN(PMC
*task
))
363 if (interp
->scheduler
) {
364 const INTVAL tid
= VTABLE_get_integer(interp
, task
);
365 VTABLE_delete_keyed_int(interp
, interp
->scheduler
, tid
);
368 real_exception(interp
, NULL
, INVALID_OPERATION
,
369 "Scheduler was not initialized for this interpreter.\n");
375 =item C<PMC * Parrot_cx_delete_suspend_for_gc>
377 Remove a message that would suspend GC from the message queue. (Provided for
378 backward compatibility in the threads implementation.)
385 PARROT_CAN_RETURN_NULL
387 Parrot_cx_delete_suspend_for_gc(PARROT_INTERP
)
389 if (interp
->scheduler
) {
390 Parrot_Scheduler
* sched_struct
= PARROT_SCHEDULER(interp
->scheduler
);
391 INTVAL num_tasks
, index
;
394 fprintf(stderr
, "called delete_suspend_for_gc\n");
398 fprintf(stderr
, "locking msg_lock (delete) [interp=%p]\n", interp
);
400 LOCK(sched_struct
->msg_lock
);
401 /* Search the task index for GC suspend tasks */
402 num_tasks
= VTABLE_elements(interp
, sched_struct
->messages
);
403 for (index
= 0; index
< num_tasks
; index
++) {
404 PMC
*message
= VTABLE_get_pmc_keyed_int(interp
, sched_struct
->messages
, index
);
405 if (!PMC_IS_NULL(message
)
406 && string_equal(interp
, VTABLE_get_string(interp
, message
),
407 CONST_STRING(interp
, "suspend_for_gc")) == 0) {
408 VTABLE_delete_keyed_int(interp
, sched_struct
->messages
, index
);
409 UNLOCK(sched_struct
->msg_lock
);
414 fprintf(stderr
, "unlocking msg_lock (delete) [interp=%p]\n", interp
);
416 UNLOCK(sched_struct
->msg_lock
);
420 real_exception(interp
, NULL
, INVALID_OPERATION
,
421 "Scheduler was not initialized for this interpreter.\n");
428 =item C<void Parrot_cx_add_handler>
430 Add a task handler to scheduler's list of handlers.
438 Parrot_cx_add_handler(PARROT_INTERP
, ARGIN(PMC
*handler
))
440 if (interp
->scheduler
)
441 Parrot_PCCINVOKE(interp
, interp
->scheduler
,
442 CONST_STRING(interp
, "add_handler"), "P->", handler
);
444 real_exception(interp
, NULL
, INVALID_OPERATION
,
445 "Scheduler was not initialized for this interpreter.\n");
453 =head2 Scheduler Message Interface Functions
455 Functions that are used to interface with the message queue in the concurrency
460 =item C<void Parrot_cx_send_message>
462 Send a message to a scheduler in a different interpreter/thread.
470 Parrot_cx_send_message(PARROT_INTERP
, ARGIN(STRING
*messagetype
), ARGIN_NULLOK(PMC
*payload
))
472 if (interp
->scheduler
) {
473 Parrot_Scheduler
* sched_struct
= PARROT_SCHEDULER(interp
->scheduler
);
474 PMC
*message
= pmc_new(interp
, enum_class_SchedulerMessage
);
475 VTABLE_set_string_native(interp
, message
, messagetype
);
476 message
= VTABLE_share_ro(interp
, message
);
479 fprintf(stderr
, "sending message[interp=%p]\n", interp
);
483 fprintf(stderr
, "locking msg_lock (send) [interp=%p]\n", interp
);
485 LOCK(sched_struct
->msg_lock
);
486 VTABLE_push_pmc(interp
, sched_struct
->messages
, message
);
488 fprintf(stderr
, "unlocking msg_lock (send) [interp=%p]\n", interp
);
490 UNLOCK(sched_struct
->msg_lock
);
491 Parrot_cx_runloop_wake(interp
, interp
->scheduler
);
499 =item C<void Parrot_cx_broadcast_message>
501 Send a message to the schedulers in all interpreters/threads linked to this
510 Parrot_cx_broadcast_message(PARROT_INTERP
, ARGIN(STRING
*messagetype
), ARGIN_NULLOK(PMC
*data
))
513 LOCK(interpreter_array_mutex
);
514 for (i
= 0; i
< n_interpreters
; ++i
) {
515 Parrot_Interp other_interp
= interpreter_array
[i
];
516 if (interp
== other_interp
)
518 Parrot_cx_send_message(other_interp
, messagetype
, data
);
520 UNLOCK(interpreter_array_mutex
);
528 =head2 Task Interface Functions
530 Functions that are used to interface with a specific task in the concurrency scheduler.
534 =item C<PMC * Parrot_cx_find_handler_for_task>
536 Retrieve a handler appropriate to a given task. If the scheduler has no
537 appropriate handler, returns PMCNULL.
544 PARROT_CAN_RETURN_NULL
546 Parrot_cx_find_handler_for_task(PARROT_INTERP
, ARGIN(PMC
*task
))
548 PMC
*handler
= PMCNULL
;
550 fprintf(stderr
, "searching for handler\n");
553 if (interp
->scheduler
)
554 Parrot_PCCINVOKE(interp
, interp
->scheduler
,
555 CONST_STRING(interp
, "find_handler"), "P->P", task
, &handler
);
557 real_exception(interp
, NULL
, INVALID_OPERATION
,
558 "Scheduler was not initialized for this interpreter.\n");
561 fprintf(stderr
, "done searching for handler\n");
568 =item C<void Parrot_cx_timer_invoke>
570 Run the associated code block for a timer event, when the timer fires.
577 Parrot_cx_timer_invoke(PARROT_INTERP
, ARGIN(PMC
*timer
))
579 Parrot_Timer
* const timer_struct
= PARROT_TIMER(timer
);
581 fprintf(stderr
, "current timer time: %f, %f\n",
582 timer_struct
->birthtime
+ timer_struct
->duration
,
583 Parrot_floatval_time());
585 if (!PMC_IS_NULL(timer_struct
->codeblock
)) {
586 Parrot_runops_fromc_args_event(interp
,
587 timer_struct
->codeblock
, "v");
593 =item C<void Parrot_cx_invoke_callback>
595 Run the associated code block for a callback event.
602 Parrot_cx_invoke_callback(PARROT_INTERP
, ARGIN(PMC
*callback
))
604 Parrot_Task
* const task_struct
= PARROT_TASK(callback
);
605 if (!PMC_IS_NULL(task_struct
->data
)) {
606 Parrot_run_callback(interp
, task_struct
->data
,
607 task_struct
->cb_data
);
615 =head2 Opcode Functions
617 Functions that are called from within opcodes, that take and return an
618 opcode_t* to allow for changing the code flow.
623 =item C<opcode_t * Parrot_cx_schedule_sleep>
625 Add a sleep timer to the scheduler. This function is called by the C<sleep>
633 PARROT_WARN_UNUSED_RESULT
634 PARROT_CAN_RETURN_NULL
636 Parrot_cx_schedule_sleep(PARROT_INTERP
, FLOATVAL time
, ARGIN_NULLOK(opcode_t
*next
))
638 #if PARROT_HAS_THREADS
639 Parrot_cond condition
;
641 FLOATVAL timer_end
= time
+ Parrot_floatval_time();
642 struct timespec time_struct
;
644 /* Tell the scheduler runloop to wake, this is a good time to process
646 Parrot_cx_runloop_wake(interp
, interp
->scheduler
);
648 /* Tell this thread to sleep for the requested time. */
649 COND_INIT(condition
);
652 time_struct
.tv_sec
= (time_t) timer_end
;
653 time_struct
.tv_nsec
= (long)((timer_end
- time_struct
.tv_sec
)*1000.0f
) *1000L*1000L;
654 COND_TIMED_WAIT(condition
, lock
, &time_struct
);
656 COND_DESTROY(condition
);
659 /* A more primitive, platform-specific, non-threaded form of sleep. */
660 Parrot_sleep((UINTVAL
) ceil(time
));
670 =head2 Internal Functions
672 Functions that are only used within the scheduler.
676 =item C<static void scheduler_process_wait_list>
678 Scheduler maintenance, scan the list of waiting tasks to see if any are ready
679 to become active tasks.
686 scheduler_process_wait_list(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
688 Parrot_Scheduler
* sched_struct
= PARROT_SCHEDULER(scheduler
);
689 INTVAL num_tasks
, index
;
691 /* Sweep the wait list for completed timers */
692 num_tasks
= VTABLE_elements(interp
, sched_struct
->wait_index
);
693 for (index
= 0; index
< num_tasks
; index
++) {
694 INTVAL tid
= VTABLE_get_integer_keyed_int(interp
, sched_struct
->wait_index
, index
);
696 PMC
*task
= VTABLE_get_pmc_keyed_int(interp
, sched_struct
->task_list
, tid
);
697 if (PMC_IS_NULL(task
)) {
698 /* Cleanup expired tasks. */
699 VTABLE_set_integer_keyed_int(interp
, sched_struct
->wait_index
, index
, 0);
702 /* Move the timer to the active task list if the timer has
704 FLOATVAL timer_end_time
= VTABLE_get_number_keyed_int(interp
,
705 task
, PARROT_TIMER_NSEC
);
706 if (timer_end_time
<= Parrot_floatval_time()) {
707 VTABLE_push_integer(interp
, sched_struct
->task_index
, tid
);
708 VTABLE_set_integer_keyed_int(interp
, sched_struct
->wait_index
, index
, 0);
709 Parrot_cx_schedule_repeat(interp
, task
);
710 SCHEDULER_cache_valid_CLEAR(scheduler
);
721 =item C<static void scheduler_process_messages>
723 Scheduler maintenance, scan the list of messages sent from other schedulers and
724 take appropriate action on any received.
731 scheduler_process_messages(PARROT_INTERP
, ARGMOD(PMC
*scheduler
))
733 Parrot_Scheduler
* sched_struct
= PARROT_SCHEDULER(scheduler
);
737 fprintf(stderr
, "processing messages [interp=%p]\n", interp
);
740 while (VTABLE_elements(interp
, sched_struct
->messages
) > 0) {
742 fprintf(stderr
, "locking msg_lock (process) [interp=%p]\n", interp
);
744 LOCK(sched_struct
->msg_lock
);
745 message
= VTABLE_pop_pmc(interp
, sched_struct
->messages
);
747 fprintf(stderr
, "unlocking msg_lock (process) [interp=%p]\n", interp
);
749 UNLOCK(sched_struct
->msg_lock
);
750 if (!PMC_IS_NULL(message
)
751 && string_equal(interp
, VTABLE_get_string(interp
, message
),
752 CONST_STRING(interp
, "suspend_for_gc")) == 0) {
754 fprintf(stderr
, "found a suspend, suspending [interp=%p]\n", interp
);
756 pt_suspend_self_for_gc(interp
);
773 * c-file-style: "parrot"
775 * vim: expandtab shiftwidth=4: