Tempfile document updated.
[ruby.git] / thread.c
blob7034c21f29888e49460842b6d6bf0cc6870941fd
1 /**********************************************************************
3 thread.c -
5 $Author$
7 Copyright (C) 2004-2007 Koichi Sasada
9 **********************************************************************/
12 YARV Thread Design
14 model 1: Userlevel Thread
15 Same as traditional ruby thread.
17 model 2: Native Thread with Global VM lock
18 Using pthread (or Windows thread) and Ruby threads run concurrent.
20 model 3: Native Thread with fine grain lock
21 Using pthread and Ruby threads run concurrent or parallel.
23 model 4: M:N User:Native threads with Global VM lock
24 Combination of model 1 and 2
26 model 5: M:N User:Native thread with fine grain lock
27 Combination of model 1 and 3
29 ------------------------------------------------------------------------
31 model 2:
32 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run.
33 When thread scheduling, running thread release GVL. If running thread
34 try blocking operation, this thread must release GVL and another
35 thread can continue this flow. After blocking operation, thread
36 must check interrupt (RUBY_VM_CHECK_INTS).
38 Every VM can run parallel.
40 Ruby threads are scheduled by OS thread scheduler.
42 ------------------------------------------------------------------------
44 model 3:
45 Every threads run concurrent or parallel and to access shared object
46 exclusive access control is needed. For example, to access String
47 object or Array object, fine grain lock must be locked every time.
52 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc
53 * 2.15 or later and set _FORTIFY_SOURCE > 0.
54 * However, the implementation is wrong. Even though Linux's select(2)
55 * supports large fd size (>FD_SETSIZE), it wrongly assumes fd is always
56 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT,
57 * it doesn't work correctly and makes program abort. Therefore we need to
58 * disable FORTIFY_SOURCE until glibc fixes it.
60 #undef _FORTIFY_SOURCE
61 #undef __USE_FORTIFY_LEVEL
62 #define __USE_FORTIFY_LEVEL 0
64 /* for model 2 */
66 #include "ruby/internal/config.h"
68 #ifdef __linux__
69 // Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included.
70 # include <alloca.h>
71 #endif
73 #define TH_SCHED(th) (&(th)->ractor->threads.sched)
75 #include "eval_intern.h"
76 #include "hrtime.h"
77 #include "internal.h"
78 #include "internal/class.h"
79 #include "internal/cont.h"
80 #include "internal/error.h"
81 #include "internal/gc.h"
82 #include "internal/hash.h"
83 #include "internal/io.h"
84 #include "internal/object.h"
85 #include "internal/proc.h"
86 #include "ruby/fiber/scheduler.h"
87 #include "internal/signal.h"
88 #include "internal/thread.h"
89 #include "internal/time.h"
90 #include "internal/warnings.h"
91 #include "iseq.h"
92 #include "rjit.h"
93 #include "ruby/debug.h"
94 #include "ruby/io.h"
95 #include "ruby/thread.h"
96 #include "ruby/thread_native.h"
97 #include "timev.h"
98 #include "vm_core.h"
99 #include "ractor_core.h"
100 #include "vm_debug.h"
101 #include "vm_sync.h"
103 #if USE_RJIT && defined(HAVE_SYS_WAIT_H)
104 #include <sys/wait.h>
105 #endif
107 #ifndef USE_NATIVE_THREAD_PRIORITY
108 #define USE_NATIVE_THREAD_PRIORITY 0
109 #define RUBY_THREAD_PRIORITY_MAX 3
110 #define RUBY_THREAD_PRIORITY_MIN -3
111 #endif
113 static VALUE rb_cThreadShield;
115 static VALUE sym_immediate;
116 static VALUE sym_on_blocking;
117 static VALUE sym_never;
119 #define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13
120 #define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED)
122 static inline VALUE
123 rb_thread_local_storage(VALUE thread)
125 if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
126 rb_ivar_set(thread, idLocals, rb_hash_new());
127 RB_FL_SET_RAW(thread, THREAD_LOCAL_STORAGE_INITIALISED);
129 return rb_ivar_get(thread, idLocals);
132 enum SLEEP_FLAGS {
133 SLEEP_DEADLOCKABLE = 0x01,
134 SLEEP_SPURIOUS_CHECK = 0x02,
135 SLEEP_ALLOW_SPURIOUS = 0x04,
136 SLEEP_NO_CHECKINTS = 0x08,
139 static void sleep_forever(rb_thread_t *th, unsigned int fl);
140 static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
142 static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end);
143 static int rb_threadptr_dead(rb_thread_t *th);
144 static void rb_check_deadlock(rb_ractor_t *r);
145 static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
146 static const char *thread_status_name(rb_thread_t *th, int detail);
147 static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
148 NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
149 MAYBE_UNUSED(static int consume_communication_pipe(int fd));
151 static volatile int system_working = 1;
152 static rb_internal_thread_specific_key_t specific_key_count;
154 struct waiting_fd {
155 struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
156 rb_thread_t *th;
157 int fd;
158 struct rb_io_close_wait_list *busy;
161 /********************************************************************************/
163 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
165 struct rb_blocking_region_buffer {
166 enum rb_thread_status prev_status;
169 static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted);
170 static void unblock_function_clear(rb_thread_t *th);
172 static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
173 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
174 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
176 #define THREAD_BLOCKING_BEGIN(th) do { \
177 struct rb_thread_sched * const sched = TH_SCHED(th); \
178 RB_VM_SAVE_MACHINE_CONTEXT(th); \
179 thread_sched_to_waiting((sched), (th));
181 #define THREAD_BLOCKING_END(th) \
182 thread_sched_to_running((sched), (th)); \
183 rb_ractor_thread_switch(th->ractor, th); \
184 } while(0)
186 #ifdef __GNUC__
187 #ifdef HAVE_BUILTIN___BUILTIN_CHOOSE_EXPR_CONSTANT_P
188 #define only_if_constant(expr, notconst) __builtin_choose_expr(__builtin_constant_p(expr), (expr), (notconst))
189 #else
190 #define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst))
191 #endif
192 #else
193 #define only_if_constant(expr, notconst) notconst
194 #endif
195 #define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
196 struct rb_blocking_region_buffer __region; \
197 if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
198 /* always return true unless fail_if_interrupted */ \
199 !only_if_constant(fail_if_interrupted, TRUE)) { \
200 /* Important that this is inlined into the macro, and not part of \
201 * blocking_region_begin - see bug #20493 */ \
202 RB_VM_SAVE_MACHINE_CONTEXT(th); \
203 thread_sched_to_waiting(TH_SCHED(th), th); \
204 exec; \
205 blocking_region_end(th, &__region); \
206 }; \
207 } while(0)
210 * returns true if this thread was spuriously interrupted, false otherwise
211 * (e.g. hit by Thread#run or ran a Ruby-level Signal.trap handler)
213 #define RUBY_VM_CHECK_INTS_BLOCKING(ec) vm_check_ints_blocking(ec)
214 static inline int
215 vm_check_ints_blocking(rb_execution_context_t *ec)
217 rb_thread_t *th = rb_ec_thread_ptr(ec);
219 if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
220 if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
222 else {
223 th->pending_interrupt_queue_checked = 0;
224 RUBY_VM_SET_INTERRUPT(ec);
226 return rb_threadptr_execute_interrupts(th, 1);
230 rb_vm_check_ints_blocking(rb_execution_context_t *ec)
232 return vm_check_ints_blocking(ec);
236 * poll() is supported by many OSes, but so far Linux is the only
237 * one we know of that supports using poll() in all places select()
238 * would work.
240 #if defined(HAVE_POLL)
241 # if defined(__linux__)
242 # define USE_POLL
243 # endif
244 # if defined(__FreeBSD_version) && __FreeBSD_version >= 1100000
245 # define USE_POLL
246 /* FreeBSD does not set POLLOUT when POLLHUP happens */
247 # define POLLERR_SET (POLLHUP | POLLERR)
248 # endif
249 #endif
251 static void
252 timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
253 const struct timeval *timeout)
255 if (timeout) {
256 *rel = rb_timeval2hrtime(timeout);
257 *end = rb_hrtime_add(rb_hrtime_now(), *rel);
258 *to = rel;
260 else {
261 *to = 0;
265 MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
266 MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th));
267 MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event));
269 #include THREAD_IMPL_SRC
272 * TODO: somebody with win32 knowledge should be able to get rid of
273 * timer-thread by busy-waiting on signals. And it should be possible
274 * to make the GVL in thread_pthread.c be platform-independent.
276 #ifndef BUSY_WAIT_SIGNALS
277 # define BUSY_WAIT_SIGNALS (0)
278 #endif
280 #ifndef USE_EVENTFD
281 # define USE_EVENTFD (0)
282 #endif
284 #include "thread_sync.c"
286 void
287 rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
289 rb_native_mutex_initialize(lock);
292 void
293 rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock)
295 rb_native_mutex_destroy(lock);
298 void
299 rb_nativethread_lock_lock(rb_nativethread_lock_t *lock)
301 rb_native_mutex_lock(lock);
304 void
305 rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock)
307 rb_native_mutex_unlock(lock);
310 static int
311 unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted)
313 do {
314 if (fail_if_interrupted) {
315 if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
316 return FALSE;
319 else {
320 RUBY_VM_CHECK_INTS(th->ec);
323 rb_native_mutex_lock(&th->interrupt_lock);
324 } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) &&
325 (rb_native_mutex_unlock(&th->interrupt_lock), TRUE));
327 VM_ASSERT(th->unblock.func == NULL);
329 th->unblock.func = func;
330 th->unblock.arg = arg;
331 rb_native_mutex_unlock(&th->interrupt_lock);
333 return TRUE;
336 static void
337 unblock_function_clear(rb_thread_t *th)
339 rb_native_mutex_lock(&th->interrupt_lock);
340 th->unblock.func = 0;
341 rb_native_mutex_unlock(&th->interrupt_lock);
344 static void
345 rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
347 RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
349 rb_native_mutex_lock(&th->interrupt_lock);
351 if (trap) {
352 RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
354 else {
355 RUBY_VM_SET_INTERRUPT(th->ec);
358 if (th->unblock.func != NULL) {
359 (th->unblock.func)(th->unblock.arg);
361 else {
362 /* none */
365 rb_native_mutex_unlock(&th->interrupt_lock);
368 void
369 rb_threadptr_interrupt(rb_thread_t *th)
371 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
372 rb_threadptr_interrupt_common(th, 0);
375 static void
376 threadptr_trap_interrupt(rb_thread_t *th)
378 rb_threadptr_interrupt_common(th, 1);
381 static void
382 terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
384 rb_thread_t *th = 0;
386 ccan_list_for_each(&r->threads.set, th, lt_node) {
387 if (th != main_thread) {
388 RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
390 rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED);
391 rb_threadptr_interrupt(th);
393 RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
395 else {
396 RUBY_DEBUG_LOG("main thread th:%u", rb_th_serial(th));
401 static void
402 rb_threadptr_join_list_wakeup(rb_thread_t *thread)
404 while (thread->join_list) {
405 struct rb_waiting_list *join_list = thread->join_list;
407 // Consume the entry from the join list:
408 thread->join_list = join_list->next;
410 rb_thread_t *target_thread = join_list->thread;
412 if (target_thread->scheduler != Qnil && join_list->fiber) {
413 rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
415 else {
416 rb_threadptr_interrupt(target_thread);
418 switch (target_thread->status) {
419 case THREAD_STOPPED:
420 case THREAD_STOPPED_FOREVER:
421 target_thread->status = THREAD_RUNNABLE;
422 break;
423 default:
424 break;
430 void
431 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
433 while (th->keeping_mutexes) {
434 rb_mutex_t *mutex = th->keeping_mutexes;
435 th->keeping_mutexes = mutex->next_mutex;
437 // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
439 const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
440 if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
444 void
445 rb_thread_terminate_all(rb_thread_t *th)
447 rb_ractor_t *cr = th->ractor;
448 rb_execution_context_t * volatile ec = th->ec;
449 volatile int sleeping = 0;
451 if (cr->threads.main != th) {
452 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
453 (void *)cr->threads.main, (void *)th);
456 /* unlock all locking mutexes */
457 rb_threadptr_unlock_all_locking_mutexes(th);
459 EC_PUSH_TAG(ec);
460 if (EC_EXEC_TAG() == TAG_NONE) {
461 retry:
462 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
464 terminate_all(cr, th);
466 while (rb_ractor_living_thread_num(cr) > 1) {
467 rb_hrtime_t rel = RB_HRTIME_PER_SEC;
469 * Thread exiting routine in thread_start_func_2 notify
470 * me when the last sub-thread exit.
472 sleeping = 1;
473 native_sleep(th, &rel);
474 RUBY_VM_CHECK_INTS_BLOCKING(ec);
475 sleeping = 0;
478 else {
480 * When caught an exception (e.g. Ctrl+C), let's broadcast
481 * kill request again to ensure killing all threads even
482 * if they are blocked on sleep, mutex, etc.
484 if (sleeping) {
485 sleeping = 0;
486 goto retry;
489 EC_POP_TAG();
492 void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
494 static void
495 thread_cleanup_func_before_exec(void *th_ptr)
497 rb_thread_t *th = th_ptr;
498 th->status = THREAD_KILLED;
500 // The thread stack doesn't exist in the forked process:
501 th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
503 rb_threadptr_root_fiber_terminate(th);
506 static void
507 thread_cleanup_func(void *th_ptr, int atfork)
509 rb_thread_t *th = th_ptr;
511 th->locking_mutex = Qfalse;
512 thread_cleanup_func_before_exec(th_ptr);
515 * Unfortunately, we can't release native threading resource at fork
516 * because libc may have unstable locking state therefore touching
517 * a threading resource may cause a deadlock.
519 if (atfork) {
520 th->nt = NULL;
521 return;
524 rb_native_mutex_destroy(&th->interrupt_lock);
527 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
528 static VALUE rb_thread_to_s(VALUE thread);
530 void
531 ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame)
533 native_thread_init_stack(th, local_in_parent_frame);
536 const VALUE *
537 rb_vm_proc_local_ep(VALUE proc)
539 const VALUE *ep = vm_proc_ep(proc);
541 if (ep) {
542 return rb_vm_ep_local_ep(ep);
544 else {
545 return NULL;
549 // for ractor, defined in vm.c
550 VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
551 int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
553 static VALUE
554 thread_do_start_proc(rb_thread_t *th)
556 VALUE args = th->invoke_arg.proc.args;
557 const VALUE *args_ptr;
558 int args_len;
559 VALUE procval = th->invoke_arg.proc.proc;
560 rb_proc_t *proc;
561 GetProcPtr(procval, proc);
563 th->ec->errinfo = Qnil;
564 th->ec->root_lep = rb_vm_proc_local_ep(procval);
565 th->ec->root_svar = Qfalse;
567 vm_check_ints_blocking(th->ec);
569 if (th->invoke_type == thread_invoke_type_ractor_proc) {
570 VALUE self = rb_ractor_self(th->ractor);
571 VM_ASSERT(FIXNUM_P(args));
572 args_len = FIX2INT(args);
573 args_ptr = ALLOCA_N(VALUE, args_len);
574 rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
575 vm_check_ints_blocking(th->ec);
577 return rb_vm_invoke_proc_with_self(
578 th->ec, proc, self,
579 args_len, args_ptr,
580 th->invoke_arg.proc.kw_splat,
581 VM_BLOCK_HANDLER_NONE
584 else {
585 args_len = RARRAY_LENINT(args);
586 if (args_len < 8) {
587 /* free proc.args if the length is enough small */
588 args_ptr = ALLOCA_N(VALUE, args_len);
589 MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len);
590 th->invoke_arg.proc.args = Qnil;
592 else {
593 args_ptr = RARRAY_CONST_PTR(args);
596 vm_check_ints_blocking(th->ec);
598 return rb_vm_invoke_proc(
599 th->ec, proc,
600 args_len, args_ptr,
601 th->invoke_arg.proc.kw_splat,
602 VM_BLOCK_HANDLER_NONE
607 static VALUE
608 thread_do_start(rb_thread_t *th)
610 native_set_thread_name(th);
611 VALUE result = Qundef;
613 switch (th->invoke_type) {
614 case thread_invoke_type_proc:
615 result = thread_do_start_proc(th);
616 break;
618 case thread_invoke_type_ractor_proc:
619 result = thread_do_start_proc(th);
620 rb_ractor_atexit(th->ec, result);
621 break;
623 case thread_invoke_type_func:
624 result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
625 break;
627 case thread_invoke_type_none:
628 rb_bug("unreachable");
631 return result;
634 void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
636 static int
637 thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
639 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
640 VM_ASSERT(th != th->vm->ractor.main_thread);
642 enum ruby_tag_type state;
643 VALUE errinfo = Qnil;
644 rb_thread_t *ractor_main_th = th->ractor->threads.main;
646 // setup ractor
647 if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
648 RB_VM_LOCK();
650 rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__);
651 rb_ractor_t *r = th->ractor;
652 r->r_stdin = rb_io_prep_stdin();
653 r->r_stdout = rb_io_prep_stdout();
654 r->r_stderr = rb_io_prep_stderr();
656 RB_VM_UNLOCK();
659 // Ensure that we are not joinable.
660 VM_ASSERT(UNDEF_P(th->value));
662 int fiber_scheduler_closed = 0, event_thread_end_hooked = 0;
663 VALUE result = Qundef;
665 EC_PUSH_TAG(th->ec);
667 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
668 EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
670 result = thread_do_start(th);
673 if (!fiber_scheduler_closed) {
674 fiber_scheduler_closed = 1;
675 rb_fiber_scheduler_set(Qnil);
678 if (!event_thread_end_hooked) {
679 event_thread_end_hooked = 1;
680 EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
683 if (state == TAG_NONE) {
684 // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed.
685 th->value = result;
686 } else {
687 errinfo = th->ec->errinfo;
689 VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef);
690 if (!NIL_P(exc)) errinfo = exc;
692 if (state == TAG_FATAL) {
693 if (th->invoke_type == thread_invoke_type_ractor_proc) {
694 rb_ractor_atexit(th->ec, Qnil);
696 /* fatal error within this thread, need to stop whole script */
698 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
699 /* exit on main_thread. */
701 else {
702 if (th->report_on_exception) {
703 VALUE mesg = rb_thread_to_s(th->self);
704 rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
705 rb_write_error_str(mesg);
706 rb_ec_error_print(th->ec, errinfo);
709 if (th->invoke_type == thread_invoke_type_ractor_proc) {
710 rb_ractor_atexit_exception(th->ec);
713 if (th->vm->thread_abort_on_exception ||
714 th->abort_on_exception || RTEST(ruby_debug)) {
715 /* exit on main_thread */
717 else {
718 errinfo = Qnil;
721 th->value = Qnil;
724 // The thread is effectively finished and can be joined.
725 VM_ASSERT(!UNDEF_P(th->value));
727 rb_threadptr_join_list_wakeup(th);
728 rb_threadptr_unlock_all_locking_mutexes(th);
730 if (th->invoke_type == thread_invoke_type_ractor_proc) {
731 rb_thread_terminate_all(th);
732 rb_ractor_teardown(th->ec);
735 th->status = THREAD_KILLED;
736 RUBY_DEBUG_LOG("killed th:%u", rb_th_serial(th));
738 if (th->vm->ractor.main_thread == th) {
739 ruby_stop(0);
742 if (RB_TYPE_P(errinfo, T_OBJECT)) {
743 /* treat with normal error object */
744 rb_threadptr_raise(ractor_main_th, 1, &errinfo);
747 EC_POP_TAG();
749 rb_ec_clear_current_thread_trace_func(th->ec);
751 /* locking_mutex must be Qfalse */
752 if (th->locking_mutex != Qfalse) {
753 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
754 (void *)th, th->locking_mutex);
757 if (ractor_main_th->status == THREAD_KILLED &&
758 th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
759 /* I'm last thread. wake up main thread from rb_thread_terminate_all */
760 rb_threadptr_interrupt(ractor_main_th);
763 rb_check_deadlock(th->ractor);
765 rb_fiber_close(th->ec->fiber_ptr);
767 thread_cleanup_func(th, FALSE);
768 VM_ASSERT(th->ec->vm_stack == NULL);
770 if (th->invoke_type == thread_invoke_type_ractor_proc) {
771 // after rb_ractor_living_threads_remove()
772 // GC will happen anytime and this ractor can be collected (and destroy GVL).
773 // So gvl_release() should be before it.
774 thread_sched_to_dead(TH_SCHED(th), th);
775 rb_ractor_living_threads_remove(th->ractor, th);
777 else {
778 rb_ractor_living_threads_remove(th->ractor, th);
779 thread_sched_to_dead(TH_SCHED(th), th);
782 return 0;
785 struct thread_create_params {
786 enum thread_invoke_type type;
788 // for normal proc thread
789 VALUE args;
790 VALUE proc;
792 // for ractor
793 rb_ractor_t *g;
795 // for func
796 VALUE (*fn)(void *);
799 static void thread_specific_storage_alloc(rb_thread_t *th);
801 static VALUE
802 thread_create_core(VALUE thval, struct thread_create_params *params)
804 rb_execution_context_t *ec = GET_EC();
805 rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
806 int err;
808 thread_specific_storage_alloc(th);
810 if (OBJ_FROZEN(current_th->thgroup)) {
811 rb_raise(rb_eThreadError,
812 "can't start a new thread (frozen ThreadGroup)");
815 rb_fiber_inherit_storage(ec, th->ec->fiber_ptr);
817 switch (params->type) {
818 case thread_invoke_type_proc:
819 th->invoke_type = thread_invoke_type_proc;
820 th->invoke_arg.proc.args = params->args;
821 th->invoke_arg.proc.proc = params->proc;
822 th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
823 break;
825 case thread_invoke_type_ractor_proc:
826 #if RACTOR_CHECK_MODE > 0
827 rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g));
828 #endif
829 th->invoke_type = thread_invoke_type_ractor_proc;
830 th->ractor = params->g;
831 th->ractor->threads.main = th;
832 th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
833 th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
834 th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
835 rb_ractor_send_parameters(ec, params->g, params->args);
836 break;
838 case thread_invoke_type_func:
839 th->invoke_type = thread_invoke_type_func;
840 th->invoke_arg.func.func = params->fn;
841 th->invoke_arg.func.arg = (void *)params->args;
842 break;
844 default:
845 rb_bug("unreachable");
848 th->priority = current_th->priority;
849 th->thgroup = current_th->thgroup;
851 th->pending_interrupt_queue = rb_ary_hidden_new(0);
852 th->pending_interrupt_queue_checked = 0;
853 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
854 RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
856 rb_native_mutex_initialize(&th->interrupt_lock);
858 RUBY_DEBUG_LOG("r:%u th:%u", rb_ractor_id(th->ractor), rb_th_serial(th));
860 rb_ractor_living_threads_insert(th->ractor, th);
862 /* kick thread */
863 err = native_thread_create(th);
864 if (err) {
865 th->status = THREAD_KILLED;
866 rb_ractor_living_threads_remove(th->ractor, th);
867 rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
869 return thval;
872 #define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none)
875 * call-seq:
876 * Thread.new { ... } -> thread
877 * Thread.new(*args, &proc) -> thread
878 * Thread.new(*args) { |args| ... } -> thread
880 * Creates a new thread executing the given block.
882 * Any +args+ given to ::new will be passed to the block:
884 * arr = []
885 * a, b, c = 1, 2, 3
886 * Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join
887 * arr #=> [1, 2, 3]
889 * A ThreadError exception is raised if ::new is called without a block.
891 * If you're going to subclass Thread, be sure to call super in your
892 * +initialize+ method, otherwise a ThreadError will be raised.
894 static VALUE
895 thread_s_new(int argc, VALUE *argv, VALUE klass)
897 rb_thread_t *th;
898 VALUE thread = rb_thread_alloc(klass);
900 if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
901 rb_raise(rb_eThreadError, "can't alloc thread");
904 rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
905 th = rb_thread_ptr(thread);
906 if (!threadptr_initialized(th)) {
907 rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'",
908 klass);
910 return thread;
914 * call-seq:
915 * Thread.start([args]*) {|args| block } -> thread
916 * Thread.fork([args]*) {|args| block } -> thread
918 * Basically the same as ::new. However, if class Thread is subclassed, then
919 * calling +start+ in that subclass will not invoke the subclass's
920 * +initialize+ method.
923 static VALUE
924 thread_start(VALUE klass, VALUE args)
926 struct thread_create_params params = {
927 .type = thread_invoke_type_proc,
928 .args = args,
929 .proc = rb_block_proc(),
931 return thread_create_core(rb_thread_alloc(klass), &params);
934 static VALUE
935 threadptr_invoke_proc_location(rb_thread_t *th)
937 if (th->invoke_type == thread_invoke_type_proc) {
938 return rb_proc_location(th->invoke_arg.proc.proc);
940 else {
941 return Qnil;
945 /* :nodoc: */
946 static VALUE
947 thread_initialize(VALUE thread, VALUE args)
949 rb_thread_t *th = rb_thread_ptr(thread);
951 if (!rb_block_given_p()) {
952 rb_raise(rb_eThreadError, "must be called with a block");
954 else if (th->invoke_type != thread_invoke_type_none) {
955 VALUE loc = threadptr_invoke_proc_location(th);
956 if (!NIL_P(loc)) {
957 rb_raise(rb_eThreadError,
958 "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
959 RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
961 else {
962 rb_raise(rb_eThreadError, "already initialized thread");
965 else {
966 struct thread_create_params params = {
967 .type = thread_invoke_type_proc,
968 .args = args,
969 .proc = rb_block_proc(),
971 return thread_create_core(thread, &params);
975 VALUE
976 rb_thread_create(VALUE (*fn)(void *), void *arg)
978 struct thread_create_params params = {
979 .type = thread_invoke_type_func,
980 .fn = fn,
981 .args = (VALUE)arg,
983 return thread_create_core(rb_thread_alloc(rb_cThread), &params);
986 VALUE
987 rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
989 struct thread_create_params params = {
990 .type = thread_invoke_type_ractor_proc,
991 .g = r,
992 .args = args,
993 .proc = proc,
995 return thread_create_core(rb_thread_alloc(rb_cThread), &params);
999 struct join_arg {
1000 struct rb_waiting_list *waiter;
1001 rb_thread_t *target;
1002 VALUE timeout;
1003 rb_hrtime_t *limit;
1006 static VALUE
1007 remove_from_join_list(VALUE arg)
1009 struct join_arg *p = (struct join_arg *)arg;
1010 rb_thread_t *target_thread = p->target;
1012 if (target_thread->status != THREAD_KILLED) {
1013 struct rb_waiting_list **join_list = &target_thread->join_list;
1015 while (*join_list) {
1016 if (*join_list == p->waiter) {
1017 *join_list = (*join_list)->next;
1018 break;
1021 join_list = &(*join_list)->next;
1025 return Qnil;
1028 static int
1029 thread_finished(rb_thread_t *th)
1031 return th->status == THREAD_KILLED || !UNDEF_P(th->value);
1034 static VALUE
1035 thread_join_sleep(VALUE arg)
1037 struct join_arg *p = (struct join_arg *)arg;
1038 rb_thread_t *target_th = p->target, *th = p->waiter->thread;
1039 rb_hrtime_t end = 0, *limit = p->limit;
1041 if (limit) {
1042 end = rb_hrtime_add(*limit, rb_hrtime_now());
1045 while (!thread_finished(target_th)) {
1046 VALUE scheduler = rb_fiber_scheduler_current();
1048 if (scheduler != Qnil) {
1049 rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout);
1050 // Check if the target thread is finished after blocking:
1051 if (thread_finished(target_th)) break;
1052 // Otherwise, a timeout occurred:
1053 else return Qfalse;
1055 else if (!limit) {
1056 sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
1058 else {
1059 if (hrtime_update_expire(limit, end)) {
1060 RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th));
1061 return Qfalse;
1063 th->status = THREAD_STOPPED;
1064 native_sleep(th, limit);
1066 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1067 th->status = THREAD_RUNNABLE;
1069 RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
1072 return Qtrue;
1075 static VALUE
1076 thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
1078 rb_execution_context_t *ec = GET_EC();
1079 rb_thread_t *th = ec->thread_ptr;
1080 rb_fiber_t *fiber = ec->fiber_ptr;
1082 if (th == target_th) {
1083 rb_raise(rb_eThreadError, "Target thread must not be current thread");
1086 if (th->ractor->threads.main == target_th) {
1087 rb_raise(rb_eThreadError, "Target thread must not be main thread");
1090 RUBY_DEBUG_LOG("target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
1092 if (target_th->status != THREAD_KILLED) {
1093 struct rb_waiting_list waiter;
1094 waiter.next = target_th->join_list;
1095 waiter.thread = th;
1096 waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
1097 target_th->join_list = &waiter;
1099 struct join_arg arg;
1100 arg.waiter = &waiter;
1101 arg.target = target_th;
1102 arg.timeout = timeout;
1103 arg.limit = limit;
1105 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
1106 return Qnil;
1110 RUBY_DEBUG_LOG("success target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
1112 if (target_th->ec->errinfo != Qnil) {
1113 VALUE err = target_th->ec->errinfo;
1115 if (FIXNUM_P(err)) {
1116 switch (err) {
1117 case INT2FIX(TAG_FATAL):
1118 RUBY_DEBUG_LOG("terminated target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
1120 /* OK. killed. */
1121 break;
1122 default:
1123 rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
1126 else if (THROW_DATA_P(target_th->ec->errinfo)) {
1127 rb_bug("thread_join: THROW_DATA should not reach here.");
1129 else {
1130 /* normal exception */
1131 rb_exc_raise(err);
1134 return target_th->self;
1138 * call-seq:
1139 * thr.join -> thr
1140 * thr.join(limit) -> thr
1142 * The calling thread will suspend execution and run this +thr+.
1144 * Does not return until +thr+ exits or until the given +limit+ seconds have
1145 * passed.
1147 * If the time limit expires, +nil+ will be returned, otherwise +thr+ is
1148 * returned.
1150 * Any threads not joined will be killed when the main program exits.
1152 * If +thr+ had previously raised an exception and the ::abort_on_exception or
1153 * $DEBUG flags are not set, (so the exception has not yet been processed), it
1154 * will be processed at this time.
1156 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" }
1157 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" }
1158 * x.join # Let thread x finish, thread a will be killed on exit.
1159 * #=> "axyz"
1161 * The following example illustrates the +limit+ parameter.
1163 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }}
1164 * puts "Waiting" until y.join(0.15)
1166 * This will produce:
1168 * tick...
1169 * Waiting
1170 * tick...
1171 * Waiting
1172 * tick...
1173 * tick...
1176 static VALUE
1177 thread_join_m(int argc, VALUE *argv, VALUE self)
1179 VALUE timeout = Qnil;
1180 rb_hrtime_t rel = 0, *limit = 0;
1182 if (rb_check_arity(argc, 0, 1)) {
1183 timeout = argv[0];
1186 // Convert the timeout eagerly, so it's always converted and deterministic
1188 * This supports INFINITY and negative values, so we can't use
1189 * rb_time_interval right now...
1191 if (NIL_P(timeout)) {
1192 /* unlimited */
1194 else if (FIXNUM_P(timeout)) {
1195 rel = rb_sec2hrtime(NUM2TIMET(timeout));
1196 limit = &rel;
1198 else {
1199 limit = double2hrtime(&rel, rb_num2dbl(timeout));
1202 return thread_join(rb_thread_ptr(self), timeout, limit);
1206 * call-seq:
1207 * thr.value -> obj
1209 * Waits for +thr+ to complete, using #join, and returns its value or raises
1210 * the exception which terminated the thread.
1212 * a = Thread.new { 2 + 2 }
1213 * a.value #=> 4
1215 * b = Thread.new { raise 'something went wrong' }
1216 * b.value #=> RuntimeError: something went wrong
1219 static VALUE
1220 thread_value(VALUE self)
1222 rb_thread_t *th = rb_thread_ptr(self);
1223 thread_join(th, Qnil, 0);
1224 if (UNDEF_P(th->value)) {
1225 // If the thread is dead because we forked th->value is still Qundef.
1226 return Qnil;
1228 return th->value;
1232 * Thread Scheduling
1235 static void
1236 getclockofday(struct timespec *ts)
1238 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
1239 if (clock_gettime(CLOCK_MONOTONIC, ts) == 0)
1240 return;
1241 #endif
1242 rb_timespec_now(ts);
1246 * Don't inline this, since library call is already time consuming
1247 * and we don't want "struct timespec" on stack too long for GC
1249 NOINLINE(rb_hrtime_t rb_hrtime_now(void));
1250 rb_hrtime_t
1251 rb_hrtime_now(void)
1253 struct timespec ts;
1255 getclockofday(&ts);
1256 return rb_timespec2hrtime(&ts);
1260 * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
1261 * being uninitialized, maybe other versions, too.
1263 COMPILER_WARNING_PUSH
1264 #if defined(__GNUC__) && __GNUC__ == 7 && __GNUC_MINOR__ <= 3
1265 COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized)
1266 #endif
1267 #ifndef PRIu64
1268 #define PRIu64 PRI_64_PREFIX "u"
1269 #endif
1271 * @end is the absolute time when @ts is set to expire
1272 * Returns true if @end has past
1273 * Updates @ts and returns false otherwise
1275 static int
1276 hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end)
1278 rb_hrtime_t now = rb_hrtime_now();
1280 if (now > end) return 1;
1282 RUBY_DEBUG_LOG("%"PRIu64" > %"PRIu64"", (uint64_t)end, (uint64_t)now);
1284 *timeout = end - now;
1285 return 0;
1287 COMPILER_WARNING_POP
1289 static int
1290 sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
1292 enum rb_thread_status prev_status = th->status;
1293 int woke;
1294 rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), rel);
1296 th->status = THREAD_STOPPED;
1297 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1298 while (th->status == THREAD_STOPPED) {
1299 native_sleep(th, &rel);
1300 woke = vm_check_ints_blocking(th->ec);
1301 if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
1302 break;
1303 if (hrtime_update_expire(&rel, end))
1304 break;
1305 woke = 1;
1307 th->status = prev_status;
1308 return woke;
1311 static int
1312 sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl)
1314 enum rb_thread_status prev_status = th->status;
1315 int woke;
1316 rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now());
1318 th->status = THREAD_STOPPED;
1319 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1320 while (th->status == THREAD_STOPPED) {
1321 native_sleep(th, &rel);
1322 woke = vm_check_ints_blocking(th->ec);
1323 if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
1324 break;
1325 if (hrtime_update_expire(&rel, end))
1326 break;
1327 woke = 1;
1329 th->status = prev_status;
1330 return woke;
1333 static void
1334 sleep_forever(rb_thread_t *th, unsigned int fl)
1336 enum rb_thread_status prev_status = th->status;
1337 enum rb_thread_status status;
1338 int woke;
1340 status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
1341 th->status = status;
1343 if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1345 while (th->status == status) {
1346 if (fl & SLEEP_DEADLOCKABLE) {
1347 rb_ractor_sleeper_threads_inc(th->ractor);
1348 rb_check_deadlock(th->ractor);
1351 native_sleep(th, 0);
1353 if (fl & SLEEP_DEADLOCKABLE) {
1354 rb_ractor_sleeper_threads_dec(th->ractor);
1356 if (fl & SLEEP_ALLOW_SPURIOUS) {
1357 break;
1360 woke = vm_check_ints_blocking(th->ec);
1362 if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) {
1363 break;
1366 th->status = prev_status;
1369 void
1370 rb_thread_sleep_forever(void)
1372 RUBY_DEBUG_LOG("forever");
1373 sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
1376 void
1377 rb_thread_sleep_deadly(void)
1379 RUBY_DEBUG_LOG("deadly");
1380 sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
1383 static void
1384 rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
1386 VALUE scheduler = rb_fiber_scheduler_current();
1387 if (scheduler != Qnil) {
1388 rb_fiber_scheduler_block(scheduler, blocker, timeout);
1390 else {
1391 RUBY_DEBUG_LOG("...");
1392 if (end) {
1393 sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
1395 else {
1396 sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
1401 void
1402 rb_thread_wait_for(struct timeval time)
1404 rb_thread_t *th = GET_THREAD();
1406 sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK);
1410 * CAUTION: This function causes thread switching.
1411 * rb_thread_check_ints() check ruby's interrupts.
1412 * some interrupt needs thread switching/invoke handlers,
1413 * and so on.
1416 void
1417 rb_thread_check_ints(void)
1419 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
1423 * Hidden API for tcl/tk wrapper.
1424 * There is no guarantee to perpetuate it.
1427 rb_thread_check_trap_pending(void)
1429 return rb_signal_buff_size() != 0;
1432 /* This function can be called in blocking region. */
1434 rb_thread_interrupted(VALUE thval)
1436 return (int)RUBY_VM_INTERRUPTED(rb_thread_ptr(thval)->ec);
1439 void
1440 rb_thread_sleep(int sec)
1442 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
1445 static void
1446 rb_thread_schedule_limits(uint32_t limits_us)
1448 if (!rb_thread_alone()) {
1449 rb_thread_t *th = GET_THREAD();
1450 RUBY_DEBUG_LOG("us:%u", (unsigned int)limits_us);
1452 if (th->running_time_us >= limits_us) {
1453 RUBY_DEBUG_LOG("switch %s", "start");
1455 RB_VM_SAVE_MACHINE_CONTEXT(th);
1456 thread_sched_yield(TH_SCHED(th), th);
1457 rb_ractor_thread_switch(th->ractor, th);
1459 RUBY_DEBUG_LOG("switch %s", "done");
1464 void
1465 rb_thread_schedule(void)
1467 rb_thread_schedule_limits(0);
1468 RUBY_VM_CHECK_INTS(GET_EC());
1471 /* blocking region */
1473 static inline int
1474 blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
1475 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
1477 #ifdef RUBY_VM_CRITICAL_SECTION
1478 VM_ASSERT(ruby_assert_critical_section_entered == 0);
1479 #endif
1480 VM_ASSERT(th == GET_THREAD());
1482 region->prev_status = th->status;
1483 if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
1484 th->blocking_region_buffer = region;
1485 th->status = THREAD_STOPPED;
1486 rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
1488 RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
1489 return TRUE;
1491 else {
1492 return FALSE;
1496 static inline void
1497 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
1499 /* entry to ubf_list still permitted at this point, make it impossible: */
1500 unblock_function_clear(th);
1501 /* entry to ubf_list impossible at this point, so unregister is safe: */
1502 unregister_ubf_list(th);
1504 thread_sched_to_running(TH_SCHED(th), th);
1505 rb_ractor_thread_switch(th->ractor, th);
1507 th->blocking_region_buffer = 0;
1508 rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
1509 if (th->status == THREAD_STOPPED) {
1510 th->status = region->prev_status;
1513 RUBY_DEBUG_LOG("end");
1515 #ifndef _WIN32
1516 // GET_THREAD() clears WSAGetLastError()
1517 VM_ASSERT(th == GET_THREAD());
1518 #endif
1521 void *
1522 rb_nogvl(void *(*func)(void *), void *data1,
1523 rb_unblock_function_t *ubf, void *data2,
1524 int flags)
1526 void *val = 0;
1527 rb_execution_context_t *ec = GET_EC();
1528 rb_thread_t *th = rb_ec_thread_ptr(ec);
1529 rb_vm_t *vm = rb_ec_vm_ptr(ec);
1530 bool is_main_thread = vm->ractor.main_thread == th;
1531 int saved_errno = 0;
1532 VALUE ubf_th = Qfalse;
1534 if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
1535 ubf = ubf_select;
1536 data2 = th;
1538 else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
1539 if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
1540 vm->ubf_async_safe = 1;
1544 rb_vm_t *volatile saved_vm = vm;
1545 BLOCKING_REGION(th, {
1546 val = func(data1);
1547 saved_errno = rb_errno();
1548 }, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
1549 vm = saved_vm;
1551 if (is_main_thread) vm->ubf_async_safe = 0;
1553 if ((flags & RB_NOGVL_INTR_FAIL) == 0) {
1554 RUBY_VM_CHECK_INTS_BLOCKING(ec);
1557 if (ubf_th != Qfalse) {
1558 thread_value(rb_thread_kill(ubf_th));
1561 rb_errno_set(saved_errno);
1563 return val;
1567 * rb_thread_call_without_gvl - permit concurrent/parallel execution.
1568 * rb_thread_call_without_gvl2 - permit concurrent/parallel execution
1569 * without interrupt process.
1571 * rb_thread_call_without_gvl() does:
1572 * (1) Check interrupts.
1573 * (2) release GVL.
1574 * Other Ruby threads may run in parallel.
1575 * (3) call func with data1
1576 * (4) acquire GVL.
1577 * Other Ruby threads can not run in parallel any more.
1578 * (5) Check interrupts.
1580 * rb_thread_call_without_gvl2() does:
1581 * (1) Check interrupt and return if interrupted.
1582 * (2) release GVL.
1583 * (3) call func with data1 and a pointer to the flags.
1584 * (4) acquire GVL.
1586 * If another thread interrupts this thread (Thread#kill, signal delivery,
1587 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means
1588 * "un-blocking function"). `ubf()' should interrupt `func()' execution by
1589 * toggling a cancellation flag, canceling the invocation of a call inside
1590 * `func()' or similar. Note that `ubf()' may not be called with the GVL.
1592 * There are built-in ubfs and you can specify these ubfs:
1594 * * RUBY_UBF_IO: ubf for IO operation
1595 * * RUBY_UBF_PROCESS: ubf for process operation
1597 * However, we can not guarantee our built-in ubfs interrupt your `func()'
1598 * correctly. Be careful to use rb_thread_call_without_gvl(). If you don't
1599 * provide proper ubf(), your program will not stop for Control+C or other
1600 * shutdown events.
1602 * "Check interrupts" on above list means checking asynchronous
1603 * interrupt events (such as Thread#kill, signal delivery, VM-shutdown
1604 * request, and so on) and calling corresponding procedures
1605 * (such as `trap' for signals, raise an exception for Thread#raise).
1606 * If `func()' finished and received interrupts, you may skip interrupt
1607 * checking. For example, assume the following func() it reads data from file.
1609 * read_func(...) {
1610 * // (a) before read
1611 * read(buffer); // (b) reading
1612 * // (c) after read
1615 * If an interrupt occurs at (a) or (b), then `ubf()' cancels this
1616 * `read_func()' and interrupts are checked. However, if an interrupt occurs
1617 * at (c), after *read* operation is completed, checking interrupts is harmful
1618 * because it causes irrevocable side-effect, the read data will vanish. To
1619 * avoid such problem, the `read_func()' should be used with
1620 * `rb_thread_call_without_gvl2()'.
1622 * If `rb_thread_call_without_gvl2()' detects interrupt, it returns
1623 * immediately. This function does not show when the execution was interrupted.
1624 * For example, there are 4 possible timing (a), (b), (c) and before calling
1625 * read_func(). You need to record progress of a read_func() and check
1626 * the progress after `rb_thread_call_without_gvl2()'. You may need to call
1627 * `rb_thread_check_ints()' correctly or your program can not process proper
1628 * process such as `trap' and so on.
1630 * NOTE: You can not execute most of Ruby C API and touch Ruby
1631 * objects in `func()' and `ubf()', including raising an
1632 * exception, because current thread doesn't acquire GVL
1633 * (it causes synchronization problems). If you need to
1634 * call ruby functions either use rb_thread_call_with_gvl()
1635 * or read source code of C APIs and confirm safety by
1636 * yourself.
1638 * NOTE: In short, this API is difficult to use safely. I recommend you
1639 * use other ways if you have. We lack experiences to use this API.
1640 * Please report your problem related on it.
1642 * NOTE: Releasing GVL and re-acquiring GVL may be expensive operations
1643 * for a short running `func()'. Be sure to benchmark and use this
1644 * mechanism when `func()' consumes enough time.
1646 * Safe C API:
1647 * * rb_thread_interrupted() - check interrupt flag
1648 * * ruby_xmalloc(), ruby_xrealloc(), ruby_xfree() -
1649 * they will work without GVL, and may acquire GVL when GC is needed.
1651 void *
1652 rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
1653 rb_unblock_function_t *ubf, void *data2)
1655 return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL);
1658 void *
1659 rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
1660 rb_unblock_function_t *ubf, void *data2)
1662 return rb_nogvl(func, data1, ubf, data2, 0);
1665 static int
1666 waitfd_to_waiting_flag(int wfd_event)
1668 return wfd_event << 1;
1671 static void
1672 thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
1674 wfd->fd = fd;
1675 wfd->th = th;
1676 wfd->busy = NULL;
1678 RB_VM_LOCK_ENTER();
1680 ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
1682 RB_VM_LOCK_LEAVE();
1685 static void
1686 thread_io_wake_pending_closer(struct waiting_fd *wfd)
1688 bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
1689 if (has_waiter) {
1690 rb_mutex_lock(wfd->busy->wakeup_mutex);
1693 /* Needs to be protected with RB_VM_LOCK because we don't know if
1694 wfd is on the global list of pending FD ops or if it's on a
1695 struct rb_io_close_wait_list close-waiter. */
1696 RB_VM_LOCK_ENTER();
1697 ccan_list_del(&wfd->wfd_node);
1698 RB_VM_LOCK_LEAVE();
1700 if (has_waiter) {
1701 rb_thread_wakeup(wfd->busy->closing_thread);
1702 rb_mutex_unlock(wfd->busy->wakeup_mutex);
1706 static bool
1707 thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
1709 #if defined(USE_MN_THREADS) && USE_MN_THREADS
1710 return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
1711 #else
1712 return false;
1713 #endif
1716 // true if need retry
1717 static bool
1718 thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
1720 #if defined(USE_MN_THREADS) && USE_MN_THREADS
1721 if (thread_io_mn_schedulable(th, events, timeout)) {
1722 rb_hrtime_t rel, *prel;
1724 if (timeout) {
1725 rel = rb_timeval2hrtime(timeout);
1726 prel = &rel;
1728 else {
1729 prel = NULL;
1732 VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
1734 if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
1735 // timeout
1736 return false;
1738 else {
1739 return true;
1742 #endif // defined(USE_MN_THREADS) && USE_MN_THREADS
1743 return false;
1746 // assume read/write
1747 static bool
1748 blocking_call_retryable_p(int r, int eno)
1750 if (r != -1) return false;
1752 switch (eno) {
1753 case EAGAIN:
1754 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1755 case EWOULDBLOCK:
1756 #endif
1757 return true;
1758 default:
1759 return false;
1763 bool
1764 rb_thread_mn_schedulable(VALUE thval)
1766 rb_thread_t *th = rb_thread_ptr(thval);
1767 return th->mn_schedulable;
1770 VALUE
1771 rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events)
1773 rb_execution_context_t *volatile ec = GET_EC();
1774 rb_thread_t *th = rb_ec_thread_ptr(ec);
1776 RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
1778 struct waiting_fd waiting_fd;
1779 volatile VALUE val = Qundef; /* shouldn't be used */
1780 volatile int saved_errno = 0;
1781 enum ruby_tag_type state;
1782 bool prev_mn_schedulable = th->mn_schedulable;
1783 th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
1785 // `errno` is only valid when there is an actual error - but we can't
1786 // extract that from the return value of `func` alone, so we clear any
1787 // prior `errno` value here so that we can later check if it was set by
1788 // `func` or not (as opposed to some previously set value).
1789 errno = 0;
1791 thread_io_setup_wfd(th, fd, &waiting_fd);
1793 EC_PUSH_TAG(ec);
1794 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1795 volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
1796 retry:
1797 BLOCKING_REGION(waiting_fd.th, {
1798 val = func(data1);
1799 saved_errno = errno;
1800 }, ubf_select, waiting_fd.th, FALSE);
1802 th = rb_ec_thread_ptr(ec);
1803 if (events &&
1804 blocking_call_retryable_p((int)val, saved_errno) &&
1805 thread_io_wait_events(th, fd, events, NULL)) {
1806 RUBY_VM_CHECK_INTS_BLOCKING(ec);
1807 goto retry;
1809 state = saved_state;
1811 EC_POP_TAG();
1813 th = rb_ec_thread_ptr(ec);
1814 th->mn_schedulable = prev_mn_schedulable;
1817 * must be deleted before jump
1818 * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
1820 thread_io_wake_pending_closer(&waiting_fd);
1822 if (state) {
1823 EC_JUMP_TAG(ec, state);
1825 /* TODO: check func() */
1826 RUBY_VM_CHECK_INTS_BLOCKING(ec);
1828 // If the error was a timeout, we raise a specific exception for that:
1829 if (saved_errno == ETIMEDOUT) {
1830 rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!");
1833 errno = saved_errno;
1835 return val;
1838 VALUE
1839 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
1841 return rb_thread_io_blocking_call(func, data1, fd, 0);
1845 * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
1847 * After releasing GVL using
1848 * rb_thread_call_without_gvl() you can not access Ruby values or invoke
1849 * methods. If you need to access Ruby you must use this function
1850 * rb_thread_call_with_gvl().
1852 * This function rb_thread_call_with_gvl() does:
1853 * (1) acquire GVL.
1854 * (2) call passed function `func'.
1855 * (3) release GVL.
1856 * (4) return a value which is returned at (2).
1858 * NOTE: You should not return Ruby object at (2) because such Object
1859 * will not be marked.
1861 * NOTE: If an exception is raised in `func', this function DOES NOT
1862 * protect (catch) the exception. If you have any resources
1863 * which should free before throwing exception, you need use
1864 * rb_protect() in `func' and return a value which represents
1865 * exception was raised.
1867 * NOTE: This function should not be called by a thread which was not
1868 * created as Ruby thread (created by Thread.new or so). In other
1869 * words, this function *DOES NOT* associate or convert a NON-Ruby
1870 * thread to a Ruby thread.
1872 void *
1873 rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
1875 rb_thread_t *th = ruby_thread_from_native();
1876 struct rb_blocking_region_buffer *brb;
1877 struct rb_unblock_callback prev_unblock;
1878 void *r;
1880 if (th == 0) {
1881 /* Error has occurred, but we can't use rb_bug()
1882 * because this thread is not Ruby's thread.
1883 * What should we do?
1885 bp();
1886 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
1887 exit(EXIT_FAILURE);
1890 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
1891 prev_unblock = th->unblock;
1893 if (brb == 0) {
1894 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
1897 blocking_region_end(th, brb);
1898 /* enter to Ruby world: You can access Ruby values, methods and so on. */
1899 r = (*func)(data1);
1900 /* leave from Ruby world: You can not access Ruby values, etc. */
1901 int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
1902 RUBY_ASSERT_ALWAYS(released);
1903 RB_VM_SAVE_MACHINE_CONTEXT(th);
1904 thread_sched_to_waiting(TH_SCHED(th), th);
1905 return r;
1909 * ruby_thread_has_gvl_p - check if current native thread has GVL.
1912 *** This API is EXPERIMENTAL!
1913 *** We do not guarantee that this API remains in ruby 1.9.2 or later.
1918 ruby_thread_has_gvl_p(void)
1920 rb_thread_t *th = ruby_thread_from_native();
1922 if (th && th->blocking_region_buffer == 0) {
1923 return 1;
1925 else {
1926 return 0;
1931 * call-seq:
1932 * Thread.pass -> nil
1934 * Give the thread scheduler a hint to pass execution to another thread.
1935 * A running thread may or may not switch, it depends on OS and processor.
1938 static VALUE
1939 thread_s_pass(VALUE klass)
1941 rb_thread_schedule();
1942 return Qnil;
1945 /*****************************************************/
1948 * rb_threadptr_pending_interrupt_* - manage asynchronous error queue
1950 * Async events such as an exception thrown by Thread#raise,
1951 * Thread#kill and thread termination (after main thread termination)
1952 * will be queued to th->pending_interrupt_queue.
1953 * - clear: clear the queue.
1954 * - enque: enqueue err object into queue.
1955 * - deque: dequeue err object from queue.
1956 * - active_p: return 1 if the queue should be checked.
1958 * All rb_threadptr_pending_interrupt_* functions are called by
1959 * a GVL acquired thread, of course.
1960 * Note that all "rb_" prefix APIs need GVL to call.
1963 void
1964 rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
1966 rb_ary_clear(th->pending_interrupt_queue);
1969 void
1970 rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
1972 rb_ary_push(th->pending_interrupt_queue, v);
1973 th->pending_interrupt_queue_checked = 0;
1976 static void
1977 threadptr_check_pending_interrupt_queue(rb_thread_t *th)
1979 if (!th->pending_interrupt_queue) {
1980 rb_raise(rb_eThreadError, "uninitialized thread");
1984 enum handle_interrupt_timing {
1985 INTERRUPT_NONE,
1986 INTERRUPT_IMMEDIATE,
1987 INTERRUPT_ON_BLOCKING,
1988 INTERRUPT_NEVER
1991 static enum handle_interrupt_timing
1992 rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym)
1994 if (sym == sym_immediate) {
1995 return INTERRUPT_IMMEDIATE;
1997 else if (sym == sym_on_blocking) {
1998 return INTERRUPT_ON_BLOCKING;
2000 else if (sym == sym_never) {
2001 return INTERRUPT_NEVER;
2003 else {
2004 rb_raise(rb_eThreadError, "unknown mask signature");
2008 static enum handle_interrupt_timing
2009 rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
2011 VALUE mask;
2012 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
2013 const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack);
2014 VALUE mod;
2015 long i;
2017 for (i=0; i<mask_stack_len; i++) {
2018 mask = mask_stack[mask_stack_len-(i+1)];
2020 if (SYMBOL_P(mask)) {
2021 /* do not match RUBY_FATAL_THREAD_KILLED etc */
2022 if (err != rb_cInteger) {
2023 return rb_threadptr_pending_interrupt_from_symbol(th, mask);
2025 else {
2026 continue;
2030 for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
2031 VALUE klass = mod;
2032 VALUE sym;
2034 if (BUILTIN_TYPE(mod) == T_ICLASS) {
2035 klass = RBASIC(mod)->klass;
2037 else if (mod != RCLASS_ORIGIN(mod)) {
2038 continue;
2041 if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
2042 return rb_threadptr_pending_interrupt_from_symbol(th, sym);
2045 /* try to next mask */
2047 return INTERRUPT_NONE;
2050 static int
2051 rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th)
2053 return RARRAY_LEN(th->pending_interrupt_queue) == 0;
2056 static int
2057 rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
2059 int i;
2060 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
2061 VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
2062 if (rb_obj_is_kind_of(e, err)) {
2063 return TRUE;
2066 return FALSE;
2069 static VALUE
2070 rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
2072 #if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */
2073 int i;
2075 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
2076 VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
2078 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
2080 switch (mask_timing) {
2081 case INTERRUPT_ON_BLOCKING:
2082 if (timing != INTERRUPT_ON_BLOCKING) {
2083 break;
2085 /* fall through */
2086 case INTERRUPT_NONE: /* default: IMMEDIATE */
2087 case INTERRUPT_IMMEDIATE:
2088 rb_ary_delete_at(th->pending_interrupt_queue, i);
2089 return err;
2090 case INTERRUPT_NEVER:
2091 break;
2095 th->pending_interrupt_queue_checked = 1;
2096 return Qundef;
2097 #else
2098 VALUE err = rb_ary_shift(th->pending_interrupt_queue);
2099 if (rb_threadptr_pending_interrupt_empty_p(th)) {
2100 th->pending_interrupt_queue_checked = 1;
2102 return err;
2103 #endif
2106 static int
2107 threadptr_pending_interrupt_active_p(rb_thread_t *th)
2110 * For optimization, we don't check async errinfo queue
2111 * if the queue and the thread interrupt mask were not changed
2112 * since last check.
2114 if (th->pending_interrupt_queue_checked) {
2115 return 0;
2118 if (rb_threadptr_pending_interrupt_empty_p(th)) {
2119 return 0;
2122 return 1;
2125 static int
2126 handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
2128 VALUE *maskp = (VALUE *)args;
2130 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
2131 rb_raise(rb_eArgError, "unknown mask signature");
2134 if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) {
2135 *maskp = val;
2136 return ST_CONTINUE;
2139 if (RTEST(*maskp)) {
2140 if (!RB_TYPE_P(*maskp, T_HASH)) {
2141 VALUE prev = *maskp;
2142 *maskp = rb_ident_hash_new();
2143 if (SYMBOL_P(prev)) {
2144 rb_hash_aset(*maskp, rb_eException, prev);
2147 rb_hash_aset(*maskp, key, val);
2149 else {
2150 *maskp = Qfalse;
2153 return ST_CONTINUE;
2157 * call-seq:
2158 * Thread.handle_interrupt(hash) { ... } -> result of the block
2160 * Changes asynchronous interrupt timing.
2162 * _interrupt_ means asynchronous event and corresponding procedure
2163 * by Thread#raise, Thread#kill, signal trap (not supported yet)
2164 * and main thread termination (if main thread terminates, then all
2165 * other thread will be killed).
2167 * The given +hash+ has pairs like <code>ExceptionClass =>
2168 * :TimingSymbol</code>. Where the ExceptionClass is the interrupt handled by
2169 * the given block. The TimingSymbol can be one of the following symbols:
2171 * [+:immediate+] Invoke interrupts immediately.
2172 * [+:on_blocking+] Invoke interrupts while _BlockingOperation_.
2173 * [+:never+] Never invoke all interrupts.
2175 * _BlockingOperation_ means that the operation will block the calling thread,
2176 * such as read and write. On CRuby implementation, _BlockingOperation_ is any
2177 * operation executed without GVL.
2179 * Masked asynchronous interrupts are delayed until they are enabled.
2180 * This method is similar to sigprocmask(3).
2182 * === NOTE
2184 * Asynchronous interrupts are difficult to use.
2186 * If you need to communicate between threads, please consider to use another way such as Queue.
2188 * Or use them with deep understanding about this method.
2190 * === Usage
2192 * In this example, we can guard from Thread#raise exceptions.
2194 * Using the +:never+ TimingSymbol the RuntimeError exception will always be
2195 * ignored in the first block of the main thread. In the second
2196 * ::handle_interrupt block we can purposefully handle RuntimeError exceptions.
2198 * th = Thread.new do
2199 * Thread.handle_interrupt(RuntimeError => :never) {
2200 * begin
2201 * # You can write resource allocation code safely.
2202 * Thread.handle_interrupt(RuntimeError => :immediate) {
2203 * # ...
2205 * ensure
2206 * # You can write resource deallocation code safely.
2207 * end
2209 * end
2210 * Thread.pass
2211 * # ...
2212 * th.raise "stop"
2214 * While we are ignoring the RuntimeError exception, it's safe to write our
2215 * resource allocation code. Then, the ensure block is where we can safely
2216 * deallocate your resources.
2218 * ==== Guarding from Timeout::Error
2220 * In the next example, we will guard from the Timeout::Error exception. This
2221 * will help prevent from leaking resources when Timeout::Error exceptions occur
2222 * during normal ensure clause. For this example we use the help of the
2223 * standard library Timeout, from lib/timeout.rb
2225 * require 'timeout'
2226 * Thread.handle_interrupt(Timeout::Error => :never) {
2227 * timeout(10){
2228 * # Timeout::Error doesn't occur here
2229 * Thread.handle_interrupt(Timeout::Error => :on_blocking) {
2230 * # possible to be killed by Timeout::Error
2231 * # while blocking operation
2233 * # Timeout::Error doesn't occur here
2237 * In the first part of the +timeout+ block, we can rely on Timeout::Error being
2238 * ignored. Then in the <code>Timeout::Error => :on_blocking</code> block, any
2239 * operation that will block the calling thread is susceptible to a
2240 * Timeout::Error exception being raised.
2242 * ==== Stack control settings
2244 * It's possible to stack multiple levels of ::handle_interrupt blocks in order
2245 * to control more than one ExceptionClass and TimingSymbol at a time.
2247 * Thread.handle_interrupt(FooError => :never) {
2248 * Thread.handle_interrupt(BarError => :never) {
2249 * # FooError and BarError are prohibited.
2253 * ==== Inheritance with ExceptionClass
2255 * All exceptions inherited from the ExceptionClass parameter will be considered.
2257 * Thread.handle_interrupt(Exception => :never) {
2258 * # all exceptions inherited from Exception are prohibited.
2261 * For handling all interrupts, use +Object+ and not +Exception+
2262 * as the ExceptionClass, as kill/terminate interrupts are not handled by +Exception+.
2264 static VALUE
2265 rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
2267 VALUE mask = Qundef;
2268 rb_execution_context_t * volatile ec = GET_EC();
2269 rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
2270 volatile VALUE r = Qnil;
2271 enum ruby_tag_type state;
2273 if (!rb_block_given_p()) {
2274 rb_raise(rb_eArgError, "block is needed.");
2277 mask_arg = rb_to_hash_type(mask_arg);
2279 if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
2280 mask = Qnil;
2283 rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
2285 if (UNDEF_P(mask)) {
2286 return rb_yield(Qnil);
2289 if (!RTEST(mask)) {
2290 mask = mask_arg;
2292 else if (RB_TYPE_P(mask, T_HASH)) {
2293 OBJ_FREEZE(mask);
2296 rb_ary_push(th->pending_interrupt_mask_stack, mask);
2297 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
2298 th->pending_interrupt_queue_checked = 0;
2299 RUBY_VM_SET_INTERRUPT(th->ec);
2302 EC_PUSH_TAG(th->ec);
2303 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
2304 r = rb_yield(Qnil);
2306 EC_POP_TAG();
2308 rb_ary_pop(th->pending_interrupt_mask_stack);
2309 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
2310 th->pending_interrupt_queue_checked = 0;
2311 RUBY_VM_SET_INTERRUPT(th->ec);
2314 RUBY_VM_CHECK_INTS(th->ec);
2316 if (state) {
2317 EC_JUMP_TAG(th->ec, state);
2320 return r;
2324 * call-seq:
2325 * target_thread.pending_interrupt?(error = nil) -> true/false
2327 * Returns whether or not the asynchronous queue is empty for the target thread.
2329 * If +error+ is given, then check only for +error+ type deferred events.
2331 * See ::pending_interrupt? for more information.
2333 static VALUE
2334 rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
2336 rb_thread_t *target_th = rb_thread_ptr(target_thread);
2338 if (!target_th->pending_interrupt_queue) {
2339 return Qfalse;
2341 if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
2342 return Qfalse;
2344 if (rb_check_arity(argc, 0, 1)) {
2345 VALUE err = argv[0];
2346 if (!rb_obj_is_kind_of(err, rb_cModule)) {
2347 rb_raise(rb_eTypeError, "class or module required for rescue clause");
2349 return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
2351 else {
2352 return Qtrue;
2357 * call-seq:
2358 * Thread.pending_interrupt?(error = nil) -> true/false
2360 * Returns whether or not the asynchronous queue is empty.
2362 * Since Thread::handle_interrupt can be used to defer asynchronous events,
2363 * this method can be used to determine if there are any deferred events.
2365 * If you find this method returns true, then you may finish +:never+ blocks.
2367 * For example, the following method processes deferred asynchronous events
2368 * immediately.
2370 * def Thread.kick_interrupt_immediately
2371 * Thread.handle_interrupt(Object => :immediate) {
2372 * Thread.pass
2374 * end
2376 * If +error+ is given, then check only for +error+ type deferred events.
2378 * === Usage
2380 * th = Thread.new{
2381 * Thread.handle_interrupt(RuntimeError => :on_blocking){
2382 * while true
2383 * ...
2384 * # reach safe point to invoke interrupt
2385 * if Thread.pending_interrupt?
2386 * Thread.handle_interrupt(Object => :immediate){}
2387 * end
2388 * ...
2389 * end
2392 * ...
2393 * th.raise # stop thread
2395 * This example can also be written as the following, which you should use to
2396 * avoid asynchronous interrupts.
2398 * flag = true
2399 * th = Thread.new{
2400 * Thread.handle_interrupt(RuntimeError => :on_blocking){
2401 * while true
2402 * ...
2403 * # reach safe point to invoke interrupt
2404 * break if flag == false
2405 * ...
2406 * end
2409 * ...
2410 * flag = false # stop thread
2413 static VALUE
2414 rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
2416 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
2419 NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
2421 static void
2422 rb_threadptr_to_kill(rb_thread_t *th)
2424 rb_threadptr_pending_interrupt_clear(th);
2425 th->status = THREAD_RUNNABLE;
2426 th->to_kill = 1;
2427 th->ec->errinfo = INT2FIX(TAG_FATAL);
2428 EC_JUMP_TAG(th->ec, TAG_FATAL);
2431 static inline rb_atomic_t
2432 threadptr_get_interrupts(rb_thread_t *th)
2434 rb_execution_context_t *ec = th->ec;
2435 rb_atomic_t interrupt;
2436 rb_atomic_t old;
2438 do {
2439 interrupt = ec->interrupt_flag;
2440 old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
2441 } while (old != interrupt);
2442 return interrupt & (rb_atomic_t)~ec->interrupt_mask;
2446 rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
2448 rb_atomic_t interrupt;
2449 int postponed_job_interrupt = 0;
2450 int ret = FALSE;
2452 if (th->ec->raised_flag) return ret;
2454 while ((interrupt = threadptr_get_interrupts(th)) != 0) {
2455 int sig;
2456 int timer_interrupt;
2457 int pending_interrupt;
2458 int trap_interrupt;
2459 int terminate_interrupt;
2461 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
2462 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
2463 postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
2464 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
2465 terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
2467 if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
2468 RB_VM_LOCK_ENTER();
2469 RB_VM_LOCK_LEAVE();
2472 if (postponed_job_interrupt) {
2473 rb_postponed_job_flush(th->vm);
2476 /* signal handling */
2477 if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
2478 enum rb_thread_status prev_status = th->status;
2480 th->status = THREAD_RUNNABLE;
2482 while ((sig = rb_get_next_signal()) != 0) {
2483 ret |= rb_signal_exec(th, sig);
2486 th->status = prev_status;
2489 /* exception from another thread */
2490 if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
2491 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
2492 RUBY_DEBUG_LOG("err:%"PRIdVALUE, err);
2493 ret = TRUE;
2495 if (UNDEF_P(err)) {
2496 /* no error */
2498 else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ ||
2499 err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ ||
2500 err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
2501 terminate_interrupt = 1;
2503 else {
2504 if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
2505 /* the only special exception to be queued across thread */
2506 err = ruby_vm_special_exception_copy(err);
2508 /* set runnable if th was slept. */
2509 if (th->status == THREAD_STOPPED ||
2510 th->status == THREAD_STOPPED_FOREVER)
2511 th->status = THREAD_RUNNABLE;
2512 rb_exc_raise(err);
2516 if (terminate_interrupt) {
2517 rb_threadptr_to_kill(th);
2520 if (timer_interrupt) {
2521 uint32_t limits_us = TIME_QUANTUM_USEC;
2523 if (th->priority > 0)
2524 limits_us <<= th->priority;
2525 else
2526 limits_us >>= -th->priority;
2528 if (th->status == THREAD_RUNNABLE)
2529 th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
2531 VM_ASSERT(th->ec->cfp);
2532 EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
2533 0, 0, 0, Qundef);
2535 rb_thread_schedule_limits(limits_us);
2538 return ret;
2541 void
2542 rb_thread_execute_interrupts(VALUE thval)
2544 rb_threadptr_execute_interrupts(rb_thread_ptr(thval), 1);
2547 static void
2548 rb_threadptr_ready(rb_thread_t *th)
2550 rb_threadptr_interrupt(th);
2553 static VALUE
2554 rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
2556 VALUE exc;
2558 if (rb_threadptr_dead(target_th)) {
2559 return Qnil;
2562 if (argc == 0) {
2563 exc = rb_exc_new(rb_eRuntimeError, 0, 0);
2565 else {
2566 exc = rb_make_exception(argc, argv);
2569 /* making an exception object can switch thread,
2570 so we need to check thread deadness again */
2571 if (rb_threadptr_dead(target_th)) {
2572 return Qnil;
2575 rb_ec_setup_exception(GET_EC(), exc, Qundef);
2576 rb_threadptr_pending_interrupt_enque(target_th, exc);
2577 rb_threadptr_interrupt(target_th);
2578 return Qnil;
2581 void
2582 rb_threadptr_signal_raise(rb_thread_t *th, int sig)
2584 VALUE argv[2];
2586 argv[0] = rb_eSignal;
2587 argv[1] = INT2FIX(sig);
2588 rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
2591 void
2592 rb_threadptr_signal_exit(rb_thread_t *th)
2594 VALUE argv[2];
2596 argv[0] = rb_eSystemExit;
2597 argv[1] = rb_str_new2("exit");
2599 // TODO: check signal raise deliverly
2600 rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
2604 rb_ec_set_raised(rb_execution_context_t *ec)
2606 if (ec->raised_flag & RAISED_EXCEPTION) {
2607 return 1;
2609 ec->raised_flag |= RAISED_EXCEPTION;
2610 return 0;
2614 rb_ec_reset_raised(rb_execution_context_t *ec)
2616 if (!(ec->raised_flag & RAISED_EXCEPTION)) {
2617 return 0;
2619 ec->raised_flag &= ~RAISED_EXCEPTION;
2620 return 1;
2624 rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
2626 rb_vm_t *vm = GET_THREAD()->vm;
2627 struct waiting_fd *wfd = 0, *next;
2628 ccan_list_head_init(&busy->pending_fd_users);
2629 int has_any;
2630 VALUE wakeup_mutex;
2632 RB_VM_LOCK_ENTER();
2634 ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
2635 if (wfd->fd == fd) {
2636 rb_thread_t *th = wfd->th;
2637 VALUE err;
2639 ccan_list_del(&wfd->wfd_node);
2640 ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
2642 wfd->busy = busy;
2643 err = th->vm->special_exceptions[ruby_error_stream_closed];
2644 rb_threadptr_pending_interrupt_enque(th, err);
2645 rb_threadptr_interrupt(th);
2650 has_any = !ccan_list_empty(&busy->pending_fd_users);
2651 busy->closing_thread = rb_thread_current();
2652 wakeup_mutex = Qnil;
2653 if (has_any) {
2654 wakeup_mutex = rb_mutex_new();
2655 RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
2657 busy->wakeup_mutex = wakeup_mutex;
2659 RB_VM_LOCK_LEAVE();
2661 /* If the caller didn't pass *busy as a pointer to something on the stack,
2662 we need to guard this mutex object on _our_ C stack for the duration
2663 of this function. */
2664 RB_GC_GUARD(wakeup_mutex);
2665 return has_any;
2668 void
2669 rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
2671 if (!RB_TEST(busy->wakeup_mutex)) {
2672 /* There was nobody else using this file when we closed it, so we
2673 never bothered to allocate a mutex*/
2674 return;
2677 rb_mutex_lock(busy->wakeup_mutex);
2678 while (!ccan_list_empty(&busy->pending_fd_users)) {
2679 rb_mutex_sleep(busy->wakeup_mutex, Qnil);
2681 rb_mutex_unlock(busy->wakeup_mutex);
2684 void
2685 rb_thread_fd_close(int fd)
2687 struct rb_io_close_wait_list busy;
2689 if (rb_notify_fd_close(fd, &busy)) {
2690 rb_notify_fd_close_wait(&busy);
2695 * call-seq:
2696 * thr.raise
2697 * thr.raise(string)
2698 * thr.raise(exception [, string [, array]])
2700 * Raises an exception from the given thread. The caller does not have to be
2701 * +thr+. See Kernel#raise for more information.
2703 * Thread.abort_on_exception = true
2704 * a = Thread.new { sleep(200) }
2705 * a.raise("Gotcha")
2707 * This will produce:
2709 * prog.rb:3: Gotcha (RuntimeError)
2710 * from prog.rb:2:in `initialize'
2711 * from prog.rb:2:in `new'
2712 * from prog.rb:2
2715 static VALUE
2716 thread_raise_m(int argc, VALUE *argv, VALUE self)
2718 rb_thread_t *target_th = rb_thread_ptr(self);
2719 const rb_thread_t *current_th = GET_THREAD();
2721 threadptr_check_pending_interrupt_queue(target_th);
2722 rb_threadptr_raise(target_th, argc, argv);
2724 /* To perform Thread.current.raise as Kernel.raise */
2725 if (current_th == target_th) {
2726 RUBY_VM_CHECK_INTS(target_th->ec);
2728 return Qnil;
2733 * call-seq:
2734 * thr.exit -> thr
2735 * thr.kill -> thr
2736 * thr.terminate -> thr
2738 * Terminates +thr+ and schedules another thread to be run, returning
2739 * the terminated Thread. If this is the main thread, or the last
2740 * thread, exits the process.
2743 VALUE
2744 rb_thread_kill(VALUE thread)
2746 rb_thread_t *target_th = rb_thread_ptr(thread);
2748 if (target_th->to_kill || target_th->status == THREAD_KILLED) {
2749 return thread;
2751 if (target_th == target_th->vm->ractor.main_thread) {
2752 rb_exit(EXIT_SUCCESS);
2755 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th));
2757 if (target_th == GET_THREAD()) {
2758 /* kill myself immediately */
2759 rb_threadptr_to_kill(target_th);
2761 else {
2762 threadptr_check_pending_interrupt_queue(target_th);
2763 rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
2764 rb_threadptr_interrupt(target_th);
2767 return thread;
2771 rb_thread_to_be_killed(VALUE thread)
2773 rb_thread_t *target_th = rb_thread_ptr(thread);
2775 if (target_th->to_kill || target_th->status == THREAD_KILLED) {
2776 return TRUE;
2778 return FALSE;
2782 * call-seq:
2783 * Thread.kill(thread) -> thread
2785 * Causes the given +thread+ to exit, see also Thread::exit.
2787 * count = 0
2788 * a = Thread.new { loop { count += 1 } }
2789 * sleep(0.1) #=> 0
2790 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead>
2791 * count #=> 93947
2792 * a.alive? #=> false
2795 static VALUE
2796 rb_thread_s_kill(VALUE obj, VALUE th)
2798 return rb_thread_kill(th);
2803 * call-seq:
2804 * Thread.exit -> thread
2806 * Terminates the currently running thread and schedules another thread to be
2807 * run.
2809 * If this thread is already marked to be killed, ::exit returns the Thread.
2811 * If this is the main thread, or the last thread, exit the process.
2814 static VALUE
2815 rb_thread_exit(VALUE _)
2817 rb_thread_t *th = GET_THREAD();
2818 return rb_thread_kill(th->self);
2823 * call-seq:
2824 * thr.wakeup -> thr
2826 * Marks a given thread as eligible for scheduling, however it may still
2827 * remain blocked on I/O.
2829 * *Note:* This does not invoke the scheduler, see #run for more information.
2831 * c = Thread.new { Thread.stop; puts "hey!" }
2832 * sleep 0.1 while c.status!='sleep'
2833 * c.wakeup
2834 * c.join
2835 * #=> "hey!"
2838 VALUE
2839 rb_thread_wakeup(VALUE thread)
2841 if (!RTEST(rb_thread_wakeup_alive(thread))) {
2842 rb_raise(rb_eThreadError, "killed thread");
2844 return thread;
2847 VALUE
2848 rb_thread_wakeup_alive(VALUE thread)
2850 rb_thread_t *target_th = rb_thread_ptr(thread);
2851 if (target_th->status == THREAD_KILLED) return Qnil;
2853 rb_threadptr_ready(target_th);
2855 if (target_th->status == THREAD_STOPPED ||
2856 target_th->status == THREAD_STOPPED_FOREVER) {
2857 target_th->status = THREAD_RUNNABLE;
2860 return thread;
2865 * call-seq:
2866 * thr.run -> thr
2868 * Wakes up +thr+, making it eligible for scheduling.
2870 * a = Thread.new { puts "a"; Thread.stop; puts "c" }
2871 * sleep 0.1 while a.status!='sleep'
2872 * puts "Got here"
2873 * a.run
2874 * a.join
2876 * This will produce:
2879 * Got here
2882 * See also the instance method #wakeup.
2885 VALUE
2886 rb_thread_run(VALUE thread)
2888 rb_thread_wakeup(thread);
2889 rb_thread_schedule();
2890 return thread;
2894 VALUE
2895 rb_thread_stop(void)
2897 if (rb_thread_alone()) {
2898 rb_raise(rb_eThreadError,
2899 "stopping only thread\n\tnote: use sleep to stop forever");
2901 rb_thread_sleep_deadly();
2902 return Qnil;
2906 * call-seq:
2907 * Thread.stop -> nil
2909 * Stops execution of the current thread, putting it into a ``sleep'' state,
2910 * and schedules execution of another thread.
2912 * a = Thread.new { print "a"; Thread.stop; print "c" }
2913 * sleep 0.1 while a.status!='sleep'
2914 * print "b"
2915 * a.run
2916 * a.join
2917 * #=> "abc"
2920 static VALUE
2921 thread_stop(VALUE _)
2923 return rb_thread_stop();
2926 /********************************************************************/
2928 VALUE
2929 rb_thread_list(void)
2931 // TODO
2932 return rb_ractor_thread_list();
2936 * call-seq:
2937 * Thread.list -> array
2939 * Returns an array of Thread objects for all threads that are either runnable
2940 * or stopped.
2942 * Thread.new { sleep(200) }
2943 * Thread.new { 1000000.times {|i| i*i } }
2944 * Thread.new { Thread.stop }
2945 * Thread.list.each {|t| p t}
2947 * This will produce:
2949 * #<Thread:0x401b3e84 sleep>
2950 * #<Thread:0x401b3f38 run>
2951 * #<Thread:0x401b3fb0 sleep>
2952 * #<Thread:0x401bdf4c run>
2955 static VALUE
2956 thread_list(VALUE _)
2958 return rb_thread_list();
2961 VALUE
2962 rb_thread_current(void)
2964 return GET_THREAD()->self;
2968 * call-seq:
2969 * Thread.current -> thread
2971 * Returns the currently executing thread.
2973 * Thread.current #=> #<Thread:0x401bdf4c run>
2976 static VALUE
2977 thread_s_current(VALUE klass)
2979 return rb_thread_current();
2982 VALUE
2983 rb_thread_main(void)
2985 return GET_RACTOR()->threads.main->self;
2989 * call-seq:
2990 * Thread.main -> thread
2992 * Returns the main thread.
2995 static VALUE
2996 rb_thread_s_main(VALUE klass)
2998 return rb_thread_main();
3003 * call-seq:
3004 * Thread.abort_on_exception -> true or false
3006 * Returns the status of the global ``abort on exception'' condition.
3008 * The default is +false+.
3010 * When set to +true+, if any thread is aborted by an exception, the
3011 * raised exception will be re-raised in the main thread.
3013 * Can also be specified by the global $DEBUG flag or command line option
3014 * +-d+.
3016 * See also ::abort_on_exception=.
3018 * There is also an instance level method to set this for a specific thread,
3019 * see #abort_on_exception.
3022 static VALUE
3023 rb_thread_s_abort_exc(VALUE _)
3025 return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
3030 * call-seq:
3031 * Thread.abort_on_exception= boolean -> true or false
3033 * When set to +true+, if any thread is aborted by an exception, the
3034 * raised exception will be re-raised in the main thread.
3035 * Returns the new state.
3037 * Thread.abort_on_exception = true
3038 * t1 = Thread.new do
3039 * puts "In new thread"
3040 * raise "Exception from thread"
3041 * end
3042 * sleep(1)
3043 * puts "not reached"
3045 * This will produce:
3047 * In new thread
3048 * prog.rb:4: Exception from thread (RuntimeError)
3049 * from prog.rb:2:in `initialize'
3050 * from prog.rb:2:in `new'
3051 * from prog.rb:2
3053 * See also ::abort_on_exception.
3055 * There is also an instance level method to set this for a specific thread,
3056 * see #abort_on_exception=.
3059 static VALUE
3060 rb_thread_s_abort_exc_set(VALUE self, VALUE val)
3062 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
3063 return val;
3068 * call-seq:
3069 * thr.abort_on_exception -> true or false
3071 * Returns the status of the thread-local ``abort on exception'' condition for
3072 * this +thr+.
3074 * The default is +false+.
3076 * See also #abort_on_exception=.
3078 * There is also a class level method to set this for all threads, see
3079 * ::abort_on_exception.
3082 static VALUE
3083 rb_thread_abort_exc(VALUE thread)
3085 return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
3090 * call-seq:
3091 * thr.abort_on_exception= boolean -> true or false
3093 * When set to +true+, if this +thr+ is aborted by an exception, the
3094 * raised exception will be re-raised in the main thread.
3096 * See also #abort_on_exception.
3098 * There is also a class level method to set this for all threads, see
3099 * ::abort_on_exception=.
3102 static VALUE
3103 rb_thread_abort_exc_set(VALUE thread, VALUE val)
3105 rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
3106 return val;
3111 * call-seq:
3112 * Thread.report_on_exception -> true or false
3114 * Returns the status of the global ``report on exception'' condition.
3116 * The default is +true+ since Ruby 2.5.
3118 * All threads created when this flag is true will report
3119 * a message on $stderr if an exception kills the thread.
3121 * Thread.new { 1.times { raise } }
3123 * will produce this output on $stderr:
3125 * #<Thread:...> terminated with exception (report_on_exception is true):
3126 * Traceback (most recent call last):
3127 * 2: from -e:1:in `block in <main>'
3128 * 1: from -e:1:in `times'
3130 * This is done to catch errors in threads early.
3131 * In some cases, you might not want this output.
3132 * There are multiple ways to avoid the extra output:
3134 * * If the exception is not intended, the best is to fix the cause of
3135 * the exception so it does not happen anymore.
3136 * * If the exception is intended, it might be better to rescue it closer to
3137 * where it is raised rather then let it kill the Thread.
3138 * * If it is guaranteed the Thread will be joined with Thread#join or
3139 * Thread#value, then it is safe to disable this report with
3140 * <code>Thread.current.report_on_exception = false</code>
3141 * when starting the Thread.
3142 * However, this might handle the exception much later, or not at all
3143 * if the Thread is never joined due to the parent thread being blocked, etc.
3145 * See also ::report_on_exception=.
3147 * There is also an instance level method to set this for a specific thread,
3148 * see #report_on_exception=.
3152 static VALUE
3153 rb_thread_s_report_exc(VALUE _)
3155 return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
3160 * call-seq:
3161 * Thread.report_on_exception= boolean -> true or false
3163 * Returns the new state.
3164 * When set to +true+, all threads created afterwards will inherit the
3165 * condition and report a message on $stderr if an exception kills a thread:
3167 * Thread.report_on_exception = true
3168 * t1 = Thread.new do
3169 * puts "In new thread"
3170 * raise "Exception from thread"
3171 * end
3172 * sleep(1)
3173 * puts "In the main thread"
3175 * This will produce:
3177 * In new thread
3178 * #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true):
3179 * Traceback (most recent call last):
3180 * prog.rb:4:in `block in <main>': Exception from thread (RuntimeError)
3181 * In the main thread
3183 * See also ::report_on_exception.
3185 * There is also an instance level method to set this for a specific thread,
3186 * see #report_on_exception=.
3189 static VALUE
3190 rb_thread_s_report_exc_set(VALUE self, VALUE val)
3192 GET_THREAD()->vm->thread_report_on_exception = RTEST(val);
3193 return val;
3198 * call-seq:
3199 * Thread.ignore_deadlock -> true or false
3201 * Returns the status of the global ``ignore deadlock'' condition.
3202 * The default is +false+, so that deadlock conditions are not ignored.
3204 * See also ::ignore_deadlock=.
3208 static VALUE
3209 rb_thread_s_ignore_deadlock(VALUE _)
3211 return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
3216 * call-seq:
3217 * Thread.ignore_deadlock = boolean -> true or false
3219 * Returns the new state.
3220 * When set to +true+, the VM will not check for deadlock conditions.
3221 * It is only useful to set this if your application can break a
3222 * deadlock condition via some other means, such as a signal.
3224 * Thread.ignore_deadlock = true
3225 * queue = Thread::Queue.new
3227 * trap(:SIGUSR1){queue.push "Received signal"}
3229 * # raises fatal error unless ignoring deadlock
3230 * puts queue.pop
3232 * See also ::ignore_deadlock.
3235 static VALUE
3236 rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
3238 GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val);
3239 return val;
3244 * call-seq:
3245 * thr.report_on_exception -> true or false
3247 * Returns the status of the thread-local ``report on exception'' condition for
3248 * this +thr+.
3250 * The default value when creating a Thread is the value of
3251 * the global flag Thread.report_on_exception.
3253 * See also #report_on_exception=.
3255 * There is also a class level method to set this for all new threads, see
3256 * ::report_on_exception=.
3259 static VALUE
3260 rb_thread_report_exc(VALUE thread)
3262 return RBOOL(rb_thread_ptr(thread)->report_on_exception);
3267 * call-seq:
3268 * thr.report_on_exception= boolean -> true or false
3270 * When set to +true+, a message is printed on $stderr if an exception
3271 * kills this +thr+. See ::report_on_exception for details.
3273 * See also #report_on_exception.
3275 * There is also a class level method to set this for all new threads, see
3276 * ::report_on_exception=.
3279 static VALUE
3280 rb_thread_report_exc_set(VALUE thread, VALUE val)
3282 rb_thread_ptr(thread)->report_on_exception = RTEST(val);
3283 return val;
3288 * call-seq:
3289 * thr.group -> thgrp or nil
3291 * Returns the ThreadGroup which contains the given thread.
3293 * Thread.main.group #=> #<ThreadGroup:0x4029d914>
3296 VALUE
3297 rb_thread_group(VALUE thread)
3299 return rb_thread_ptr(thread)->thgroup;
3302 static const char *
3303 thread_status_name(rb_thread_t *th, int detail)
3305 switch (th->status) {
3306 case THREAD_RUNNABLE:
3307 return th->to_kill ? "aborting" : "run";
3308 case THREAD_STOPPED_FOREVER:
3309 if (detail) return "sleep_forever";
3310 case THREAD_STOPPED:
3311 return "sleep";
3312 case THREAD_KILLED:
3313 return "dead";
3314 default:
3315 return "unknown";
3319 static int
3320 rb_threadptr_dead(rb_thread_t *th)
3322 return th->status == THREAD_KILLED;
3327 * call-seq:
3328 * thr.status -> string, false or nil
3330 * Returns the status of +thr+.
3332 * [<tt>"sleep"</tt>]
3333 * Returned if this thread is sleeping or waiting on I/O
3334 * [<tt>"run"</tt>]
3335 * When this thread is executing
3336 * [<tt>"aborting"</tt>]
3337 * If this thread is aborting
3338 * [+false+]
3339 * When this thread is terminated normally
3340 * [+nil+]
3341 * If terminated with an exception.
3343 * a = Thread.new { raise("die now") }
3344 * b = Thread.new { Thread.stop }
3345 * c = Thread.new { Thread.exit }
3346 * d = Thread.new { sleep }
3347 * d.kill #=> #<Thread:0x401b3678 aborting>
3348 * a.status #=> nil
3349 * b.status #=> "sleep"
3350 * c.status #=> false
3351 * d.status #=> "aborting"
3352 * Thread.current.status #=> "run"
3354 * See also the instance methods #alive? and #stop?
3357 static VALUE
3358 rb_thread_status(VALUE thread)
3360 rb_thread_t *target_th = rb_thread_ptr(thread);
3362 if (rb_threadptr_dead(target_th)) {
3363 if (!NIL_P(target_th->ec->errinfo) &&
3364 !FIXNUM_P(target_th->ec->errinfo)) {
3365 return Qnil;
3367 else {
3368 return Qfalse;
3371 else {
3372 return rb_str_new2(thread_status_name(target_th, FALSE));
3378 * call-seq:
3379 * thr.alive? -> true or false
3381 * Returns +true+ if +thr+ is running or sleeping.
3383 * thr = Thread.new { }
3384 * thr.join #=> #<Thread:0x401b3fb0 dead>
3385 * Thread.current.alive? #=> true
3386 * thr.alive? #=> false
3388 * See also #stop? and #status.
3391 static VALUE
3392 rb_thread_alive_p(VALUE thread)
3394 return RBOOL(!thread_finished(rb_thread_ptr(thread)));
3398 * call-seq:
3399 * thr.stop? -> true or false
3401 * Returns +true+ if +thr+ is dead or sleeping.
3403 * a = Thread.new { Thread.stop }
3404 * b = Thread.current
3405 * a.stop? #=> true
3406 * b.stop? #=> false
3408 * See also #alive? and #status.
3411 static VALUE
3412 rb_thread_stop_p(VALUE thread)
3414 rb_thread_t *th = rb_thread_ptr(thread);
3416 if (rb_threadptr_dead(th)) {
3417 return Qtrue;
3419 return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
3423 * call-seq:
3424 * thr.name -> string
3426 * show the name of the thread.
3429 static VALUE
3430 rb_thread_getname(VALUE thread)
3432 return rb_thread_ptr(thread)->name;
3436 * call-seq:
3437 * thr.name=(name) -> string
3439 * set given name to the ruby thread.
3440 * On some platform, it may set the name to pthread and/or kernel.
3443 static VALUE
3444 rb_thread_setname(VALUE thread, VALUE name)
3446 rb_thread_t *target_th = rb_thread_ptr(thread);
3448 if (!NIL_P(name)) {
3449 rb_encoding *enc;
3450 StringValueCStr(name);
3451 enc = rb_enc_get(name);
3452 if (!rb_enc_asciicompat(enc)) {
3453 rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
3454 rb_enc_name(enc));
3456 name = rb_str_new_frozen(name);
3458 target_th->name = name;
3459 if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
3460 native_set_another_thread_name(target_th->nt->thread_id, name);
3462 return name;
3465 #if USE_NATIVE_THREAD_NATIVE_THREAD_ID
3467 * call-seq:
3468 * thr.native_thread_id -> integer
3470 * Return the native thread ID which is used by the Ruby thread.
3472 * The ID depends on the OS. (not POSIX thread ID returned by pthread_self(3))
3473 * * On Linux it is TID returned by gettid(2).
3474 * * On macOS it is the system-wide unique integral ID of thread returned
3475 * by pthread_threadid_np(3).
3476 * * On FreeBSD it is the unique integral ID of the thread returned by
3477 * pthread_getthreadid_np(3).
3478 * * On Windows it is the thread identifier returned by GetThreadId().
3479 * * On other platforms, it raises NotImplementedError.
3481 * NOTE:
3482 * If the thread is not associated yet or already deassociated with a native
3483 * thread, it returns _nil_.
3484 * If the Ruby implementation uses M:N thread model, the ID may change
3485 * depending on the timing.
3488 static VALUE
3489 rb_thread_native_thread_id(VALUE thread)
3491 rb_thread_t *target_th = rb_thread_ptr(thread);
3492 if (rb_threadptr_dead(target_th)) return Qnil;
3493 return native_thread_native_thread_id(target_th);
3495 #else
3496 # define rb_thread_native_thread_id rb_f_notimplement
3497 #endif
3500 * call-seq:
3501 * thr.to_s -> string
3503 * Dump the name, id, and status of _thr_ to a string.
3506 static VALUE
3507 rb_thread_to_s(VALUE thread)
3509 VALUE cname = rb_class_path(rb_obj_class(thread));
3510 rb_thread_t *target_th = rb_thread_ptr(thread);
3511 const char *status;
3512 VALUE str, loc;
3514 status = thread_status_name(target_th, TRUE);
3515 str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
3516 if (!NIL_P(target_th->name)) {
3517 rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
3519 if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
3520 rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
3521 RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
3523 rb_str_catf(str, " %s>", status);
3525 return str;
3528 /* variables for recursive traversals */
3529 #define recursive_key id__recursive_key__
3531 static VALUE
3532 threadptr_local_aref(rb_thread_t *th, ID id)
3534 if (id == recursive_key) {
3535 return th->ec->local_storage_recursive_hash;
3537 else {
3538 VALUE val;
3539 struct rb_id_table *local_storage = th->ec->local_storage;
3541 if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) {
3542 return val;
3544 else {
3545 return Qnil;
3550 VALUE
3551 rb_thread_local_aref(VALUE thread, ID id)
3553 return threadptr_local_aref(rb_thread_ptr(thread), id);
3557 * call-seq:
3558 * thr[sym] -> obj or nil
3560 * Attribute Reference---Returns the value of a fiber-local variable (current thread's root fiber
3561 * if not explicitly inside a Fiber), using either a symbol or a string name.
3562 * If the specified variable does not exist, returns +nil+.
3565 * Thread.new { Thread.current["name"] = "A" },
3566 * Thread.new { Thread.current[:name] = "B" },
3567 * Thread.new { Thread.current["name"] = "C" }
3568 * ].each do |th|
3569 * th.join
3570 * puts "#{th.inspect}: #{th[:name]}"
3571 * end
3573 * This will produce:
3575 * #<Thread:0x00000002a54220 dead>: A
3576 * #<Thread:0x00000002a541a8 dead>: B
3577 * #<Thread:0x00000002a54130 dead>: C
3579 * Thread#[] and Thread#[]= are not thread-local but fiber-local.
3580 * This confusion did not exist in Ruby 1.8 because
3581 * fibers are only available since Ruby 1.9.
3582 * Ruby 1.9 chooses that the methods behaves fiber-local to save
3583 * following idiom for dynamic scope.
3585 * def meth(newvalue)
3586 * begin
3587 * oldvalue = Thread.current[:name]
3588 * Thread.current[:name] = newvalue
3589 * yield
3590 * ensure
3591 * Thread.current[:name] = oldvalue
3592 * end
3593 * end
3595 * The idiom may not work as dynamic scope if the methods are thread-local
3596 * and a given block switches fiber.
3598 * f = Fiber.new {
3599 * meth(1) {
3600 * Fiber.yield
3603 * meth(2) {
3604 * f.resume
3606 * f.resume
3607 * p Thread.current[:name]
3608 * #=> nil if fiber-local
3609 * #=> 2 if thread-local (The value 2 is leaked to outside of meth method.)
3611 * For thread-local variables, please see #thread_variable_get and
3612 * #thread_variable_set.
3616 static VALUE
3617 rb_thread_aref(VALUE thread, VALUE key)
3619 ID id = rb_check_id(&key);
3620 if (!id) return Qnil;
3621 return rb_thread_local_aref(thread, id);
3625 * call-seq:
3626 * thr.fetch(sym) -> obj
3627 * thr.fetch(sym) { } -> obj
3628 * thr.fetch(sym, default) -> obj
3630 * Returns a fiber-local for the given key. If the key can't be
3631 * found, there are several options: With no other arguments, it will
3632 * raise a KeyError exception; if <i>default</i> is given, then that
3633 * will be returned; if the optional code block is specified, then
3634 * that will be run and its result returned. See Thread#[] and
3635 * Hash#fetch.
3637 static VALUE
3638 rb_thread_fetch(int argc, VALUE *argv, VALUE self)
3640 VALUE key, val;
3641 ID id;
3642 rb_thread_t *target_th = rb_thread_ptr(self);
3643 int block_given;
3645 rb_check_arity(argc, 1, 2);
3646 key = argv[0];
3648 block_given = rb_block_given_p();
3649 if (block_given && argc == 2) {
3650 rb_warn("block supersedes default value argument");
3653 id = rb_check_id(&key);
3655 if (id == recursive_key) {
3656 return target_th->ec->local_storage_recursive_hash;
3658 else if (id && target_th->ec->local_storage &&
3659 rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
3660 return val;
3662 else if (block_given) {
3663 return rb_yield(key);
3665 else if (argc == 1) {
3666 rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
3668 else {
3669 return argv[1];
3673 static VALUE
3674 threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
3676 if (id == recursive_key) {
3677 th->ec->local_storage_recursive_hash = val;
3678 return val;
3680 else {
3681 struct rb_id_table *local_storage = th->ec->local_storage;
3683 if (NIL_P(val)) {
3684 if (!local_storage) return Qnil;
3685 rb_id_table_delete(local_storage, id);
3686 return Qnil;
3688 else {
3689 if (local_storage == NULL) {
3690 th->ec->local_storage = local_storage = rb_id_table_create(0);
3692 rb_id_table_insert(local_storage, id, val);
3693 return val;
3698 VALUE
3699 rb_thread_local_aset(VALUE thread, ID id, VALUE val)
3701 if (OBJ_FROZEN(thread)) {
3702 rb_frozen_error_raise(thread, "can't modify frozen thread locals");
3705 return threadptr_local_aset(rb_thread_ptr(thread), id, val);
3709 * call-seq:
3710 * thr[sym] = obj -> obj
3712 * Attribute Assignment---Sets or creates the value of a fiber-local variable,
3713 * using either a symbol or a string.
3715 * See also Thread#[].
3717 * For thread-local variables, please see #thread_variable_set and
3718 * #thread_variable_get.
3721 static VALUE
3722 rb_thread_aset(VALUE self, VALUE id, VALUE val)
3724 return rb_thread_local_aset(self, rb_to_id(id), val);
3728 * call-seq:
3729 * thr.thread_variable_get(key) -> obj or nil
3731 * Returns the value of a thread local variable that has been set. Note that
3732 * these are different than fiber local values. For fiber local values,
3733 * please see Thread#[] and Thread#[]=.
3735 * Thread local values are carried along with threads, and do not respect
3736 * fibers. For example:
3738 * Thread.new {
3739 * Thread.current.thread_variable_set("foo", "bar") # set a thread local
3740 * Thread.current["foo"] = "bar" # set a fiber local
3742 * Fiber.new {
3743 * Fiber.yield [
3744 * Thread.current.thread_variable_get("foo"), # get the thread local
3745 * Thread.current["foo"], # get the fiber local
3747 * }.resume
3748 * }.join.value # => ['bar', nil]
3750 * The value "bar" is returned for the thread local, where nil is returned
3751 * for the fiber local. The fiber is executed in the same thread, so the
3752 * thread local values are available.
3755 static VALUE
3756 rb_thread_variable_get(VALUE thread, VALUE key)
3758 VALUE locals;
3760 if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
3761 return Qnil;
3763 locals = rb_thread_local_storage(thread);
3764 return rb_hash_aref(locals, rb_to_symbol(key));
3768 * call-seq:
3769 * thr.thread_variable_set(key, value)
3771 * Sets a thread local with +key+ to +value+. Note that these are local to
3772 * threads, and not to fibers. Please see Thread#thread_variable_get and
3773 * Thread#[] for more information.
3776 static VALUE
3777 rb_thread_variable_set(VALUE thread, VALUE key, VALUE val)
3779 VALUE locals;
3781 if (OBJ_FROZEN(thread)) {
3782 rb_frozen_error_raise(thread, "can't modify frozen thread locals");
3785 locals = rb_thread_local_storage(thread);
3786 return rb_hash_aset(locals, rb_to_symbol(key), val);
3790 * call-seq:
3791 * thr.key?(sym) -> true or false
3793 * Returns +true+ if the given string (or symbol) exists as a fiber-local
3794 * variable.
3796 * me = Thread.current
3797 * me[:oliver] = "a"
3798 * me.key?(:oliver) #=> true
3799 * me.key?(:stanley) #=> false
3802 static VALUE
3803 rb_thread_key_p(VALUE self, VALUE key)
3805 VALUE val;
3806 ID id = rb_check_id(&key);
3807 struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
3809 if (!id || local_storage == NULL) {
3810 return Qfalse;
3812 return RBOOL(rb_id_table_lookup(local_storage, id, &val));
3815 static enum rb_id_table_iterator_result
3816 thread_keys_i(ID key, VALUE value, void *ary)
3818 rb_ary_push((VALUE)ary, ID2SYM(key));
3819 return ID_TABLE_CONTINUE;
3823 rb_thread_alone(void)
3825 // TODO
3826 return rb_ractor_living_thread_num(GET_RACTOR()) == 1;
3830 * call-seq:
3831 * thr.keys -> array
3833 * Returns an array of the names of the fiber-local variables (as Symbols).
3835 * thr = Thread.new do
3836 * Thread.current[:cat] = 'meow'
3837 * Thread.current["dog"] = 'woof'
3838 * end
3839 * thr.join #=> #<Thread:0x401b3f10 dead>
3840 * thr.keys #=> [:dog, :cat]
3843 static VALUE
3844 rb_thread_keys(VALUE self)
3846 struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
3847 VALUE ary = rb_ary_new();
3849 if (local_storage) {
3850 rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
3852 return ary;
3855 static int
3856 keys_i(VALUE key, VALUE value, VALUE ary)
3858 rb_ary_push(ary, key);
3859 return ST_CONTINUE;
3863 * call-seq:
3864 * thr.thread_variables -> array
3866 * Returns an array of the names of the thread-local variables (as Symbols).
3868 * thr = Thread.new do
3869 * Thread.current.thread_variable_set(:cat, 'meow')
3870 * Thread.current.thread_variable_set("dog", 'woof')
3871 * end
3872 * thr.join #=> #<Thread:0x401b3f10 dead>
3873 * thr.thread_variables #=> [:dog, :cat]
3875 * Note that these are not fiber local variables. Please see Thread#[] and
3876 * Thread#thread_variable_get for more details.
3879 static VALUE
3880 rb_thread_variables(VALUE thread)
3882 VALUE locals;
3883 VALUE ary;
3885 ary = rb_ary_new();
3886 if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
3887 return ary;
3889 locals = rb_thread_local_storage(thread);
3890 rb_hash_foreach(locals, keys_i, ary);
3892 return ary;
3896 * call-seq:
3897 * thr.thread_variable?(key) -> true or false
3899 * Returns +true+ if the given string (or symbol) exists as a thread-local
3900 * variable.
3902 * me = Thread.current
3903 * me.thread_variable_set(:oliver, "a")
3904 * me.thread_variable?(:oliver) #=> true
3905 * me.thread_variable?(:stanley) #=> false
3907 * Note that these are not fiber local variables. Please see Thread#[] and
3908 * Thread#thread_variable_get for more details.
3911 static VALUE
3912 rb_thread_variable_p(VALUE thread, VALUE key)
3914 VALUE locals;
3916 if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
3917 return Qfalse;
3919 locals = rb_thread_local_storage(thread);
3921 return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil);
3925 * call-seq:
3926 * thr.priority -> integer
3928 * Returns the priority of <i>thr</i>. Default is inherited from the
3929 * current thread which creating the new thread, or zero for the
3930 * initial main thread; higher-priority thread will run more frequently
3931 * than lower-priority threads (but lower-priority threads can also run).
3933 * This is just hint for Ruby thread scheduler. It may be ignored on some
3934 * platform.
3936 * Thread.current.priority #=> 0
3939 static VALUE
3940 rb_thread_priority(VALUE thread)
3942 return INT2NUM(rb_thread_ptr(thread)->priority);
3947 * call-seq:
3948 * thr.priority= integer -> thr
3950 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads
3951 * will run more frequently than lower-priority threads (but lower-priority
3952 * threads can also run).
3954 * This is just hint for Ruby thread scheduler. It may be ignored on some
3955 * platform.
3957 * count1 = count2 = 0
3958 * a = Thread.new do
3959 * loop { count1 += 1 }
3960 * end
3961 * a.priority = -1
3963 * b = Thread.new do
3964 * loop { count2 += 1 }
3965 * end
3966 * b.priority = -2
3967 * sleep 1 #=> 1
3968 * count1 #=> 622504
3969 * count2 #=> 5832
3972 static VALUE
3973 rb_thread_priority_set(VALUE thread, VALUE prio)
3975 rb_thread_t *target_th = rb_thread_ptr(thread);
3976 int priority;
3978 #if USE_NATIVE_THREAD_PRIORITY
3979 target_th->priority = NUM2INT(prio);
3980 native_thread_apply_priority(th);
3981 #else
3982 priority = NUM2INT(prio);
3983 if (priority > RUBY_THREAD_PRIORITY_MAX) {
3984 priority = RUBY_THREAD_PRIORITY_MAX;
3986 else if (priority < RUBY_THREAD_PRIORITY_MIN) {
3987 priority = RUBY_THREAD_PRIORITY_MIN;
3989 target_th->priority = (int8_t)priority;
3990 #endif
3991 return INT2NUM(target_th->priority);
3994 /* for IO */
3996 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
3999 * several Unix platforms support file descriptors bigger than FD_SETSIZE
4000 * in select(2) system call.
4002 * - Linux 2.2.12 (?)
4003 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25)
4004 * select(2) documents how to allocate fd_set dynamically.
4005 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0
4006 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19)
4007 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4)
4008 * select(2) documents how to allocate fd_set dynamically.
4009 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4
4010 * - Solaris 8 has select_large_fdset
4011 * - Mac OS X 10.7 (Lion)
4012 * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and
4013 * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined.
4014 * https://developer.apple.com/library/archive/releasenotes/Darwin/SymbolVariantsRelNotes/index.html
4016 * When fd_set is not big enough to hold big file descriptors,
4017 * it should be allocated dynamically.
4018 * Note that this assumes fd_set is structured as bitmap.
4020 * rb_fd_init allocates the memory.
4021 * rb_fd_term free the memory.
4022 * rb_fd_set may re-allocates bitmap.
4024 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE.
4027 void
4028 rb_fd_init(rb_fdset_t *fds)
4030 fds->maxfd = 0;
4031 fds->fdset = ALLOC(fd_set);
4032 FD_ZERO(fds->fdset);
4035 void
4036 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
4038 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
4040 if (size < sizeof(fd_set))
4041 size = sizeof(fd_set);
4042 dst->maxfd = src->maxfd;
4043 dst->fdset = xmalloc(size);
4044 memcpy(dst->fdset, src->fdset, size);
4047 void
4048 rb_fd_term(rb_fdset_t *fds)
4050 xfree(fds->fdset);
4051 fds->maxfd = 0;
4052 fds->fdset = 0;
4055 void
4056 rb_fd_zero(rb_fdset_t *fds)
4058 if (fds->fdset)
4059 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
4062 static void
4063 rb_fd_resize(int n, rb_fdset_t *fds)
4065 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask);
4066 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask);
4068 if (m < sizeof(fd_set)) m = sizeof(fd_set);
4069 if (o < sizeof(fd_set)) o = sizeof(fd_set);
4071 if (m > o) {
4072 fds->fdset = xrealloc(fds->fdset, m);
4073 memset((char *)fds->fdset + o, 0, m - o);
4075 if (n >= fds->maxfd) fds->maxfd = n + 1;
4078 void
4079 rb_fd_set(int n, rb_fdset_t *fds)
4081 rb_fd_resize(n, fds);
4082 FD_SET(n, fds->fdset);
4085 void
4086 rb_fd_clr(int n, rb_fdset_t *fds)
4088 if (n >= fds->maxfd) return;
4089 FD_CLR(n, fds->fdset);
4093 rb_fd_isset(int n, const rb_fdset_t *fds)
4095 if (n >= fds->maxfd) return 0;
4096 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */
4099 void
4100 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
4102 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask);
4104 if (size < sizeof(fd_set)) size = sizeof(fd_set);
4105 dst->maxfd = max;
4106 dst->fdset = xrealloc(dst->fdset, size);
4107 memcpy(dst->fdset, src, size);
4110 void
4111 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
4113 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
4115 if (size < sizeof(fd_set))
4116 size = sizeof(fd_set);
4117 dst->maxfd = src->maxfd;
4118 dst->fdset = xrealloc(dst->fdset, size);
4119 memcpy(dst->fdset, src->fdset, size);
4123 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
4125 fd_set *r = NULL, *w = NULL, *e = NULL;
4126 if (readfds) {
4127 rb_fd_resize(n - 1, readfds);
4128 r = rb_fd_ptr(readfds);
4130 if (writefds) {
4131 rb_fd_resize(n - 1, writefds);
4132 w = rb_fd_ptr(writefds);
4134 if (exceptfds) {
4135 rb_fd_resize(n - 1, exceptfds);
4136 e = rb_fd_ptr(exceptfds);
4138 return select(n, r, w, e, timeout);
4141 #define rb_fd_no_init(fds) ((void)((fds)->fdset = 0), (void)((fds)->maxfd = 0))
4143 #undef FD_ZERO
4144 #undef FD_SET
4145 #undef FD_CLR
4146 #undef FD_ISSET
4148 #define FD_ZERO(f) rb_fd_zero(f)
4149 #define FD_SET(i, f) rb_fd_set((i), (f))
4150 #define FD_CLR(i, f) rb_fd_clr((i), (f))
4151 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
4153 #elif defined(_WIN32)
4155 void
4156 rb_fd_init(rb_fdset_t *set)
4158 set->capa = FD_SETSIZE;
4159 set->fdset = ALLOC(fd_set);
4160 FD_ZERO(set->fdset);
4163 void
4164 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
4166 rb_fd_init(dst);
4167 rb_fd_dup(dst, src);
4170 void
4171 rb_fd_term(rb_fdset_t *set)
4173 xfree(set->fdset);
4174 set->fdset = NULL;
4175 set->capa = 0;
4178 void
4179 rb_fd_set(int fd, rb_fdset_t *set)
4181 unsigned int i;
4182 SOCKET s = rb_w32_get_osfhandle(fd);
4184 for (i = 0; i < set->fdset->fd_count; i++) {
4185 if (set->fdset->fd_array[i] == s) {
4186 return;
4189 if (set->fdset->fd_count >= (unsigned)set->capa) {
4190 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
4191 set->fdset =
4192 rb_xrealloc_mul_add(
4193 set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int));
4195 set->fdset->fd_array[set->fdset->fd_count++] = s;
4198 #undef FD_ZERO
4199 #undef FD_SET
4200 #undef FD_CLR
4201 #undef FD_ISSET
4203 #define FD_ZERO(f) rb_fd_zero(f)
4204 #define FD_SET(i, f) rb_fd_set((i), (f))
4205 #define FD_CLR(i, f) rb_fd_clr((i), (f))
4206 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
4208 #define rb_fd_no_init(fds) (void)((fds)->fdset = 0)
4210 #endif
4212 #ifndef rb_fd_no_init
4213 #define rb_fd_no_init(fds) (void)(fds)
4214 #endif
4216 static int
4217 wait_retryable(volatile int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
4219 int r = *result;
4220 if (r < 0) {
4221 switch (errnum) {
4222 case EINTR:
4223 #ifdef ERESTART
4224 case ERESTART:
4225 #endif
4226 *result = 0;
4227 if (rel && hrtime_update_expire(rel, end)) {
4228 *rel = 0;
4230 return TRUE;
4232 return FALSE;
4234 else if (r == 0) {
4235 /* check for spurious wakeup */
4236 if (rel) {
4237 return !hrtime_update_expire(rel, end);
4239 return TRUE;
4241 return FALSE;
4244 struct select_set {
4245 int max;
4246 rb_thread_t *th;
4247 rb_fdset_t *rset;
4248 rb_fdset_t *wset;
4249 rb_fdset_t *eset;
4250 rb_fdset_t orig_rset;
4251 rb_fdset_t orig_wset;
4252 rb_fdset_t orig_eset;
4253 struct timeval *timeout;
4256 static VALUE
4257 select_set_free(VALUE p)
4259 struct select_set *set = (struct select_set *)p;
4261 rb_fd_term(&set->orig_rset);
4262 rb_fd_term(&set->orig_wset);
4263 rb_fd_term(&set->orig_eset);
4265 return Qfalse;
4268 static VALUE
4269 do_select(VALUE p)
4271 struct select_set *set = (struct select_set *)p;
4272 volatile int result = 0;
4273 int lerrno;
4274 rb_hrtime_t *to, rel, end = 0;
4276 timeout_prepare(&to, &rel, &end, set->timeout);
4277 volatile rb_hrtime_t endtime = end;
4278 #define restore_fdset(dst, src) \
4279 ((dst) ? rb_fd_dup(dst, src) : (void)0)
4280 #define do_select_update() \
4281 (restore_fdset(set->rset, &set->orig_rset), \
4282 restore_fdset(set->wset, &set->orig_wset), \
4283 restore_fdset(set->eset, &set->orig_eset), \
4284 TRUE)
4286 do {
4287 lerrno = 0;
4289 BLOCKING_REGION(set->th, {
4290 struct timeval tv;
4292 if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
4293 result = native_fd_select(set->max,
4294 set->rset, set->wset, set->eset,
4295 rb_hrtime2timeval(&tv, to), set->th);
4296 if (result < 0) lerrno = errno;
4298 }, ubf_select, set->th, TRUE);
4300 RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
4301 } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update());
4303 if (result < 0) {
4304 errno = lerrno;
4307 return (VALUE)result;
4311 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
4312 struct timeval *timeout)
4314 struct select_set set;
4316 set.th = GET_THREAD();
4317 RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec);
4318 set.max = max;
4319 set.rset = read;
4320 set.wset = write;
4321 set.eset = except;
4322 set.timeout = timeout;
4324 if (!set.rset && !set.wset && !set.eset) {
4325 if (!timeout) {
4326 rb_thread_sleep_forever();
4327 return 0;
4329 rb_thread_wait_for(*timeout);
4330 return 0;
4333 #define fd_init_copy(f) do { \
4334 if (set.f) { \
4335 rb_fd_resize(set.max - 1, set.f); \
4336 if (&set.orig_##f != set.f) { /* sigwait_fd */ \
4337 rb_fd_init_copy(&set.orig_##f, set.f); \
4340 else { \
4341 rb_fd_no_init(&set.orig_##f); \
4343 } while (0)
4344 fd_init_copy(rset);
4345 fd_init_copy(wset);
4346 fd_init_copy(eset);
4347 #undef fd_init_copy
4349 return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
4352 #ifdef USE_POLL
4354 /* The same with linux kernel. TODO: make platform independent definition. */
4355 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
4356 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
4357 #define POLLEX_SET (POLLPRI)
4359 #ifndef POLLERR_SET /* defined for FreeBSD for now */
4360 # define POLLERR_SET (0)
4361 #endif
4363 static int
4364 wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t nfds,
4365 rb_hrtime_t *const to, volatile int *lerrno)
4367 struct timespec ts;
4368 volatile int result = 0;
4370 *lerrno = 0;
4371 BLOCKING_REGION(th, {
4372 if (!RUBY_VM_INTERRUPTED(th->ec)) {
4373 result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
4374 if (result < 0) *lerrno = errno;
4376 }, ubf_select, th, TRUE);
4377 return result;
4381 * returns a mask of events
4384 rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
4386 struct pollfd fds[1] = {{
4387 .fd = fd,
4388 .events = (short)events,
4389 .revents = 0,
4391 volatile int result = 0;
4392 nfds_t nfds;
4393 struct waiting_fd wfd;
4394 enum ruby_tag_type state;
4395 volatile int lerrno;
4397 rb_execution_context_t *ec = GET_EC();
4398 rb_thread_t *th = rb_ec_thread_ptr(ec);
4400 thread_io_setup_wfd(th, fd, &wfd);
4402 if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
4403 // fd is readable
4404 state = 0;
4405 fds[0].revents = events;
4406 errno = 0;
4408 else {
4409 EC_PUSH_TAG(wfd.th->ec);
4410 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
4411 rb_hrtime_t *to, rel, end = 0;
4412 RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4413 timeout_prepare(&to, &rel, &end, timeout);
4414 do {
4415 nfds = numberof(fds);
4416 result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
4418 RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4419 } while (wait_retryable(&result, lerrno, to, end));
4421 EC_POP_TAG();
4424 thread_io_wake_pending_closer(&wfd);
4426 if (state) {
4427 EC_JUMP_TAG(wfd.th->ec, state);
4430 if (result < 0) {
4431 errno = lerrno;
4432 return -1;
4435 if (fds[0].revents & POLLNVAL) {
4436 errno = EBADF;
4437 return -1;
4441 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit.
4442 * Therefore we need to fix it up.
4444 result = 0;
4445 if (fds[0].revents & POLLIN_SET)
4446 result |= RB_WAITFD_IN;
4447 if (fds[0].revents & POLLOUT_SET)
4448 result |= RB_WAITFD_OUT;
4449 if (fds[0].revents & POLLEX_SET)
4450 result |= RB_WAITFD_PRI;
4452 /* all requested events are ready if there is an error */
4453 if (fds[0].revents & POLLERR_SET)
4454 result |= events;
4456 return result;
4458 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
4459 struct select_args {
4460 union {
4461 int fd;
4462 int error;
4463 } as;
4464 rb_fdset_t *read;
4465 rb_fdset_t *write;
4466 rb_fdset_t *except;
4467 struct waiting_fd wfd;
4468 struct timeval *tv;
4471 static VALUE
4472 select_single(VALUE ptr)
4474 struct select_args *args = (struct select_args *)ptr;
4475 int r;
4477 r = rb_thread_fd_select(args->as.fd + 1,
4478 args->read, args->write, args->except, args->tv);
4479 if (r == -1)
4480 args->as.error = errno;
4481 if (r > 0) {
4482 r = 0;
4483 if (args->read && rb_fd_isset(args->as.fd, args->read))
4484 r |= RB_WAITFD_IN;
4485 if (args->write && rb_fd_isset(args->as.fd, args->write))
4486 r |= RB_WAITFD_OUT;
4487 if (args->except && rb_fd_isset(args->as.fd, args->except))
4488 r |= RB_WAITFD_PRI;
4490 return (VALUE)r;
4493 static VALUE
4494 select_single_cleanup(VALUE ptr)
4496 struct select_args *args = (struct select_args *)ptr;
4498 thread_io_wake_pending_closer(&args->wfd);
4499 if (args->read) rb_fd_term(args->read);
4500 if (args->write) rb_fd_term(args->write);
4501 if (args->except) rb_fd_term(args->except);
4503 return (VALUE)-1;
4506 static rb_fdset_t *
4507 init_set_fd(int fd, rb_fdset_t *fds)
4509 if (fd < 0) {
4510 return 0;
4512 rb_fd_init(fds);
4513 rb_fd_set(fd, fds);
4515 return fds;
4519 rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
4521 rb_fdset_t rfds, wfds, efds;
4522 struct select_args args;
4523 int r;
4524 VALUE ptr = (VALUE)&args;
4525 rb_execution_context_t *ec = GET_EC();
4526 rb_thread_t *th = rb_ec_thread_ptr(ec);
4528 args.as.fd = fd;
4529 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
4530 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
4531 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
4532 args.tv = timeout;
4533 thread_io_setup_wfd(th, fd, &args.wfd);
4535 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
4536 if (r == -1)
4537 errno = args.as.error;
4539 return r;
4541 #endif /* ! USE_POLL */
4544 * for GC
4547 #ifdef USE_CONSERVATIVE_STACK_END
4548 void
4549 rb_gc_set_stack_end(VALUE **stack_end_p)
4551 VALUE stack_end;
4552 *stack_end_p = &stack_end;
4554 #endif
4560 void
4561 rb_threadptr_check_signal(rb_thread_t *mth)
4563 /* mth must be main_thread */
4564 if (rb_signal_buff_size() > 0) {
4565 /* wakeup main thread */
4566 threadptr_trap_interrupt(mth);
4570 static void
4571 async_bug_fd(const char *mesg, int errno_arg, int fd)
4573 char buff[64];
4574 size_t n = strlcpy(buff, mesg, sizeof(buff));
4575 if (n < sizeof(buff)-3) {
4576 ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
4578 rb_async_bug_errno(buff, errno_arg);
4581 /* VM-dependent API is not available for this function */
4582 static int
4583 consume_communication_pipe(int fd)
4585 #if USE_EVENTFD
4586 uint64_t buff[1];
4587 #else
4588 /* buffer can be shared because no one refers to them. */
4589 static char buff[1024];
4590 #endif
4591 ssize_t result;
4592 int ret = FALSE; /* for rb_sigwait_sleep */
4594 while (1) {
4595 result = read(fd, buff, sizeof(buff));
4596 #if USE_EVENTFD
4597 RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
4598 #else
4599 RUBY_DEBUG_LOG("result:%d", (int)result);
4600 #endif
4601 if (result > 0) {
4602 ret = TRUE;
4603 if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
4604 return ret;
4607 else if (result == 0) {
4608 return ret;
4610 else if (result < 0) {
4611 int e = errno;
4612 switch (e) {
4613 case EINTR:
4614 continue; /* retry */
4615 case EAGAIN:
4616 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
4617 case EWOULDBLOCK:
4618 #endif
4619 return ret;
4620 default:
4621 async_bug_fd("consume_communication_pipe: read", e, fd);
4627 void
4628 rb_thread_stop_timer_thread(void)
4630 if (TIMER_THREAD_CREATED_P() && native_stop_timer_thread()) {
4631 native_reset_timer_thread();
4635 void
4636 rb_thread_reset_timer_thread(void)
4638 native_reset_timer_thread();
4641 void
4642 rb_thread_start_timer_thread(void)
4644 system_working = 1;
4645 rb_thread_create_timer_thread();
4648 static int
4649 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
4651 int i;
4652 VALUE coverage = (VALUE)val;
4653 VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
4654 VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
4656 if (lines) {
4657 if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
4658 rb_ary_clear(lines);
4660 else {
4661 int i;
4662 for (i = 0; i < RARRAY_LEN(lines); i++) {
4663 if (RARRAY_AREF(lines, i) != Qnil)
4664 RARRAY_ASET(lines, i, INT2FIX(0));
4668 if (branches) {
4669 VALUE counters = RARRAY_AREF(branches, 1);
4670 for (i = 0; i < RARRAY_LEN(counters); i++) {
4671 RARRAY_ASET(counters, i, INT2FIX(0));
4675 return ST_CONTINUE;
4678 void
4679 rb_clear_coverages(void)
4681 VALUE coverages = rb_get_coverages();
4682 if (RTEST(coverages)) {
4683 rb_hash_foreach(coverages, clear_coverage_i, 0);
4687 #if defined(HAVE_WORKING_FORK)
4689 static void
4690 rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *))
4692 rb_thread_t *i = 0;
4693 rb_vm_t *vm = th->vm;
4694 rb_ractor_t *r = th->ractor;
4695 vm->ractor.main_ractor = r;
4696 vm->ractor.main_thread = th;
4697 r->threads.main = th;
4698 r->status_ = ractor_created;
4700 thread_sched_atfork(TH_SCHED(th));
4701 ubf_list_atfork();
4703 // OK. Only this thread accesses:
4704 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
4705 ccan_list_for_each(&r->threads.set, i, lt_node) {
4706 atfork(i, th);
4709 rb_vm_living_threads_init(vm);
4711 rb_ractor_atfork(vm, th);
4712 rb_vm_postponed_job_atfork();
4714 /* may be held by RJIT threads in parent */
4715 rb_native_mutex_initialize(&vm->workqueue_lock);
4717 /* may be held by any thread in parent */
4718 rb_native_mutex_initialize(&th->interrupt_lock);
4720 vm->fork_gen++;
4721 rb_ractor_sleeper_threads_clear(th->ractor);
4722 rb_clear_coverages();
4724 // restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
4725 rb_thread_reset_timer_thread();
4726 rb_thread_start_timer_thread();
4728 VM_ASSERT(vm->ractor.blocking_cnt == 0);
4729 VM_ASSERT(vm->ractor.cnt == 1);
4732 static void
4733 terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
4735 if (th != current_th) {
4736 rb_mutex_abandon_keeping_mutexes(th);
4737 rb_mutex_abandon_locking_mutex(th);
4738 thread_cleanup_func(th, TRUE);
4742 void rb_fiber_atfork(rb_thread_t *);
4743 void
4744 rb_thread_atfork(void)
4746 rb_thread_t *th = GET_THREAD();
4747 rb_threadptr_pending_interrupt_clear(th);
4748 rb_thread_atfork_internal(th, terminate_atfork_i);
4749 th->join_list = NULL;
4750 rb_fiber_atfork(th);
4752 /* We don't want reproduce CVE-2003-0900. */
4753 rb_reset_random_seed();
4756 static void
4757 terminate_atfork_before_exec_i(rb_thread_t *th, const rb_thread_t *current_th)
4759 if (th != current_th) {
4760 thread_cleanup_func_before_exec(th);
4764 void
4765 rb_thread_atfork_before_exec(void)
4767 rb_thread_t *th = GET_THREAD();
4768 rb_thread_atfork_internal(th, terminate_atfork_before_exec_i);
4770 #else
4771 void
4772 rb_thread_atfork(void)
4776 void
4777 rb_thread_atfork_before_exec(void)
4780 #endif
4782 struct thgroup {
4783 int enclosed;
4786 static const rb_data_type_t thgroup_data_type = {
4787 "thgroup",
4790 RUBY_TYPED_DEFAULT_FREE,
4791 NULL, // No external memory to report
4793 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
4797 * Document-class: ThreadGroup
4799 * ThreadGroup provides a means of keeping track of a number of threads as a
4800 * group.
4802 * A given Thread object can only belong to one ThreadGroup at a time; adding
4803 * a thread to a new group will remove it from any previous group.
4805 * Newly created threads belong to the same group as the thread from which they
4806 * were created.
4810 * Document-const: Default
4812 * The default ThreadGroup created when Ruby starts; all Threads belong to it
4813 * by default.
4815 static VALUE
4816 thgroup_s_alloc(VALUE klass)
4818 VALUE group;
4819 struct thgroup *data;
4821 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
4822 data->enclosed = 0;
4824 return group;
4828 * call-seq:
4829 * thgrp.list -> array
4831 * Returns an array of all existing Thread objects that belong to this group.
4833 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>]
4836 static VALUE
4837 thgroup_list(VALUE group)
4839 VALUE ary = rb_ary_new();
4840 rb_thread_t *th = 0;
4841 rb_ractor_t *r = GET_RACTOR();
4843 ccan_list_for_each(&r->threads.set, th, lt_node) {
4844 if (th->thgroup == group) {
4845 rb_ary_push(ary, th->self);
4848 return ary;
4853 * call-seq:
4854 * thgrp.enclose -> thgrp
4856 * Prevents threads from being added to or removed from the receiving
4857 * ThreadGroup.
4859 * New threads can still be started in an enclosed ThreadGroup.
4861 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914>
4862 * thr = Thread.new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
4863 * tg = ThreadGroup.new #=> #<ThreadGroup:0x402752d4>
4864 * tg.add thr
4865 * #=> ThreadError: can't move from the enclosed thread group
4868 static VALUE
4869 thgroup_enclose(VALUE group)
4871 struct thgroup *data;
4873 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4874 data->enclosed = 1;
4876 return group;
4881 * call-seq:
4882 * thgrp.enclosed? -> true or false
4884 * Returns +true+ if the +thgrp+ is enclosed. See also ThreadGroup#enclose.
4887 static VALUE
4888 thgroup_enclosed_p(VALUE group)
4890 struct thgroup *data;
4892 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4893 return RBOOL(data->enclosed);
4898 * call-seq:
4899 * thgrp.add(thread) -> thgrp
4901 * Adds the given +thread+ to this group, removing it from any other
4902 * group to which it may have previously been a member.
4904 * puts "Initial group is #{ThreadGroup::Default.list}"
4905 * tg = ThreadGroup.new
4906 * t1 = Thread.new { sleep }
4907 * t2 = Thread.new { sleep }
4908 * puts "t1 is #{t1}"
4909 * puts "t2 is #{t2}"
4910 * tg.add(t1)
4911 * puts "Initial group now #{ThreadGroup::Default.list}"
4912 * puts "tg group now #{tg.list}"
4914 * This will produce:
4916 * Initial group is #<Thread:0x401bdf4c>
4917 * t1 is #<Thread:0x401b3c90>
4918 * t2 is #<Thread:0x401b3c18>
4919 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c>
4920 * tg group now #<Thread:0x401b3c90>
4923 static VALUE
4924 thgroup_add(VALUE group, VALUE thread)
4926 rb_thread_t *target_th = rb_thread_ptr(thread);
4927 struct thgroup *data;
4929 if (OBJ_FROZEN(group)) {
4930 rb_raise(rb_eThreadError, "can't move to the frozen thread group");
4932 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4933 if (data->enclosed) {
4934 rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
4937 if (OBJ_FROZEN(target_th->thgroup)) {
4938 rb_raise(rb_eThreadError, "can't move from the frozen thread group");
4940 TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data);
4941 if (data->enclosed) {
4942 rb_raise(rb_eThreadError,
4943 "can't move from the enclosed thread group");
4946 target_th->thgroup = group;
4947 return group;
4951 * Document-class: ThreadShield
4953 static void
4954 thread_shield_mark(void *ptr)
4956 rb_gc_mark((VALUE)ptr);
4959 static const rb_data_type_t thread_shield_data_type = {
4960 "thread_shield",
4961 {thread_shield_mark, 0, 0,},
4962 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
4965 static VALUE
4966 thread_shield_alloc(VALUE klass)
4968 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0));
4971 #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
4972 #define THREAD_SHIELD_WAITING_MASK (((FL_USER19-1)&~(FL_USER0-1))|FL_USER19)
4973 #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
4974 #define THREAD_SHIELD_WAITING_MAX (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)
4975 STATIC_ASSERT(THREAD_SHIELD_WAITING_MAX, THREAD_SHIELD_WAITING_MAX <= UINT_MAX);
4976 static inline unsigned int
4977 rb_thread_shield_waiting(VALUE b)
4979 return ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT);
4982 static inline void
4983 rb_thread_shield_waiting_inc(VALUE b)
4985 unsigned int w = rb_thread_shield_waiting(b);
4986 w++;
4987 if (w > THREAD_SHIELD_WAITING_MAX)
4988 rb_raise(rb_eRuntimeError, "waiting count overflow");
4989 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
4990 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
4993 static inline void
4994 rb_thread_shield_waiting_dec(VALUE b)
4996 unsigned int w = rb_thread_shield_waiting(b);
4997 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow");
4998 w--;
4999 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
5000 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
5003 VALUE
5004 rb_thread_shield_new(void)
5006 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield);
5007 rb_mutex_lock((VALUE)DATA_PTR(thread_shield));
5008 return thread_shield;
5011 bool
5012 rb_thread_shield_owned(VALUE self)
5014 VALUE mutex = GetThreadShieldPtr(self);
5015 if (!mutex) return false;
5017 rb_mutex_t *m = mutex_ptr(mutex);
5019 return m->fiber == GET_EC()->fiber_ptr;
5023 * Wait a thread shield.
5025 * Returns
5026 * true: acquired the thread shield
5027 * false: the thread shield was destroyed and no other threads waiting
5028 * nil: the thread shield was destroyed but still in use
5030 VALUE
5031 rb_thread_shield_wait(VALUE self)
5033 VALUE mutex = GetThreadShieldPtr(self);
5034 rb_mutex_t *m;
5036 if (!mutex) return Qfalse;
5037 m = mutex_ptr(mutex);
5038 if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
5039 rb_thread_shield_waiting_inc(self);
5040 rb_mutex_lock(mutex);
5041 rb_thread_shield_waiting_dec(self);
5042 if (DATA_PTR(self)) return Qtrue;
5043 rb_mutex_unlock(mutex);
5044 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse;
5047 static VALUE
5048 thread_shield_get_mutex(VALUE self)
5050 VALUE mutex = GetThreadShieldPtr(self);
5051 if (!mutex)
5052 rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self);
5053 return mutex;
5057 * Release a thread shield, and return true if it has waiting threads.
5059 VALUE
5060 rb_thread_shield_release(VALUE self)
5062 VALUE mutex = thread_shield_get_mutex(self);
5063 rb_mutex_unlock(mutex);
5064 return RBOOL(rb_thread_shield_waiting(self) > 0);
5068 * Release and destroy a thread shield, and return true if it has waiting threads.
5070 VALUE
5071 rb_thread_shield_destroy(VALUE self)
5073 VALUE mutex = thread_shield_get_mutex(self);
5074 DATA_PTR(self) = 0;
5075 rb_mutex_unlock(mutex);
5076 return RBOOL(rb_thread_shield_waiting(self) > 0);
5079 static VALUE
5080 threadptr_recursive_hash(rb_thread_t *th)
5082 return th->ec->local_storage_recursive_hash;
5085 static void
5086 threadptr_recursive_hash_set(rb_thread_t *th, VALUE hash)
5088 th->ec->local_storage_recursive_hash = hash;
5091 ID rb_frame_last_func(void);
5094 * Returns the current "recursive list" used to detect recursion.
5095 * This list is a hash table, unique for the current thread and for
5096 * the current __callee__.
5099 static VALUE
5100 recursive_list_access(VALUE sym)
5102 rb_thread_t *th = GET_THREAD();
5103 VALUE hash = threadptr_recursive_hash(th);
5104 VALUE list;
5105 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
5106 hash = rb_ident_hash_new();
5107 threadptr_recursive_hash_set(th, hash);
5108 list = Qnil;
5110 else {
5111 list = rb_hash_aref(hash, sym);
5113 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
5114 list = rb_ident_hash_new();
5115 rb_hash_aset(hash, sym, list);
5117 return list;
5121 * Returns true if and only if obj (or the pair <obj, paired_obj>) is already
5122 * in the recursion list.
5123 * Assumes the recursion list is valid.
5126 static bool
5127 recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
5129 #if SIZEOF_LONG == SIZEOF_VOIDP
5130 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
5131 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
5132 #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \
5133 rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
5134 #endif
5136 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
5137 if (UNDEF_P(pair_list))
5138 return false;
5139 if (paired_obj_id) {
5140 if (!RB_TYPE_P(pair_list, T_HASH)) {
5141 if (!OBJ_ID_EQL(paired_obj_id, pair_list))
5142 return false;
5144 else {
5145 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
5146 return false;
5149 return true;
5153 * Pushes obj (or the pair <obj, paired_obj>) in the recursion list.
5154 * For a single obj, it sets list[obj] to Qtrue.
5155 * For a pair, it sets list[obj] to paired_obj_id if possible,
5156 * otherwise list[obj] becomes a hash like:
5157 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... }
5158 * Assumes the recursion list is valid.
5161 static void
5162 recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
5164 VALUE pair_list;
5166 if (!paired_obj) {
5167 rb_hash_aset(list, obj, Qtrue);
5169 else if (UNDEF_P(pair_list = rb_hash_lookup2(list, obj, Qundef))) {
5170 rb_hash_aset(list, obj, paired_obj);
5172 else {
5173 if (!RB_TYPE_P(pair_list, T_HASH)){
5174 VALUE other_paired_obj = pair_list;
5175 pair_list = rb_hash_new();
5176 rb_hash_aset(pair_list, other_paired_obj, Qtrue);
5177 rb_hash_aset(list, obj, pair_list);
5179 rb_hash_aset(pair_list, paired_obj, Qtrue);
5184 * Pops obj (or the pair <obj, paired_obj>) from the recursion list.
5185 * For a pair, if list[obj] is a hash, then paired_obj_id is
5186 * removed from the hash and no attempt is made to simplify
5187 * list[obj] from {only_one_paired_id => true} to only_one_paired_id
5188 * Assumes the recursion list is valid.
5191 static int
5192 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
5194 if (paired_obj) {
5195 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
5196 if (UNDEF_P(pair_list)) {
5197 return 0;
5199 if (RB_TYPE_P(pair_list, T_HASH)) {
5200 rb_hash_delete_entry(pair_list, paired_obj);
5201 if (!RHASH_EMPTY_P(pair_list)) {
5202 return 1; /* keep hash until is empty */
5206 rb_hash_delete_entry(list, obj);
5207 return 1;
5210 struct exec_recursive_params {
5211 VALUE (*func) (VALUE, VALUE, int);
5212 VALUE list;
5213 VALUE obj;
5214 VALUE pairid;
5215 VALUE arg;
5218 static VALUE
5219 exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
5221 struct exec_recursive_params *p = (void *)data;
5222 return (*p->func)(p->obj, p->arg, FALSE);
5226 * Calls func(obj, arg, recursive), where recursive is non-zero if the
5227 * current method is called recursively on obj, or on the pair <obj, pairid>
5228 * If outer is 0, then the innermost func will be called with recursive set
5229 * to true, otherwise the outermost func will be called. In the latter case,
5230 * all inner func are short-circuited by throw.
5231 * Implementation details: the value thrown is the recursive list which is
5232 * proper to the current method and unlikely to be caught anywhere else.
5233 * list[recursive_key] is used as a flag for the outermost call.
5236 static VALUE
5237 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer, ID mid)
5239 VALUE result = Qundef;
5240 const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
5241 struct exec_recursive_params p;
5242 int outermost;
5243 p.list = recursive_list_access(sym);
5244 p.obj = obj;
5245 p.pairid = pairid;
5246 p.arg = arg;
5247 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
5249 if (recursive_check(p.list, p.obj, pairid)) {
5250 if (outer && !outermost) {
5251 rb_throw_obj(p.list, p.list);
5253 return (*func)(obj, arg, TRUE);
5255 else {
5256 enum ruby_tag_type state;
5258 p.func = func;
5260 if (outermost) {
5261 recursive_push(p.list, ID2SYM(recursive_key), 0);
5262 recursive_push(p.list, p.obj, p.pairid);
5263 result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
5264 if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid;
5265 if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
5266 if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
5267 if (result == p.list) {
5268 result = (*func)(obj, arg, TRUE);
5271 else {
5272 volatile VALUE ret = Qundef;
5273 recursive_push(p.list, p.obj, p.pairid);
5274 EC_PUSH_TAG(GET_EC());
5275 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
5276 ret = (*func)(obj, arg, FALSE);
5278 EC_POP_TAG();
5279 if (!recursive_pop(p.list, p.obj, p.pairid)) {
5280 goto invalid;
5282 if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
5283 result = ret;
5286 *(volatile struct exec_recursive_params *)&p;
5287 return result;
5289 invalid:
5290 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
5291 "for %+"PRIsVALUE" in %+"PRIsVALUE,
5292 sym, rb_thread_current());
5293 UNREACHABLE_RETURN(Qundef);
5297 * Calls func(obj, arg, recursive), where recursive is non-zero if the
5298 * current method is called recursively on obj
5301 VALUE
5302 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
5304 return exec_recursive(func, obj, 0, arg, 0, rb_frame_last_func());
5308 * Calls func(obj, arg, recursive), where recursive is non-zero if the
5309 * current method is called recursively on the ordered pair <obj, paired_obj>
5312 VALUE
5313 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
5315 return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0, rb_frame_last_func());
5319 * If recursion is detected on the current method and obj, the outermost
5320 * func will be called with (obj, arg, true). All inner func will be
5321 * short-circuited using throw.
5324 VALUE
5325 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
5327 return exec_recursive(func, obj, 0, arg, 1, rb_frame_last_func());
5330 VALUE
5331 rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg, ID mid)
5333 return exec_recursive(func, obj, 0, arg, 1, mid);
5337 * If recursion is detected on the current method, obj and paired_obj,
5338 * the outermost func will be called with (obj, arg, true). All inner
5339 * func will be short-circuited using throw.
5342 VALUE
5343 rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
5345 return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1, rb_frame_last_func());
5349 * call-seq:
5350 * thread.backtrace -> array or nil
5352 * Returns the current backtrace of the target thread.
5356 static VALUE
5357 rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
5359 return rb_vm_thread_backtrace(argc, argv, thval);
5362 /* call-seq:
5363 * thread.backtrace_locations(*args) -> array or nil
5365 * Returns the execution stack for the target thread---an array containing
5366 * backtrace location objects.
5368 * See Thread::Backtrace::Location for more information.
5370 * This method behaves similarly to Kernel#caller_locations except it applies
5371 * to a specific thread.
5373 static VALUE
5374 rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
5376 return rb_vm_thread_backtrace_locations(argc, argv, thval);
5379 void
5380 Init_Thread_Mutex(void)
5382 rb_thread_t *th = GET_THREAD();
5384 rb_native_mutex_initialize(&th->vm->workqueue_lock);
5385 rb_native_mutex_initialize(&th->interrupt_lock);
5389 * Document-class: ThreadError
5391 * Raised when an invalid operation is attempted on a thread.
5393 * For example, when no other thread has been started:
5395 * Thread.stop
5397 * This will raises the following exception:
5399 * ThreadError: stopping only thread
5400 * note: use sleep to stop forever
5403 void
5404 Init_Thread(void)
5406 VALUE cThGroup;
5407 rb_thread_t *th = GET_THREAD();
5409 sym_never = ID2SYM(rb_intern_const("never"));
5410 sym_immediate = ID2SYM(rb_intern_const("immediate"));
5411 sym_on_blocking = ID2SYM(rb_intern_const("on_blocking"));
5413 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
5414 rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
5415 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
5416 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
5417 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
5418 rb_define_singleton_method(rb_cThread, "stop", thread_stop, 0);
5419 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
5420 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
5421 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
5422 rb_define_singleton_method(rb_cThread, "list", thread_list, 0);
5423 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
5424 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
5425 rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0);
5426 rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1);
5427 rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0);
5428 rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1);
5429 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
5430 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
5431 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
5433 rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
5434 rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
5435 rb_define_method(rb_cThread, "join", thread_join_m, -1);
5436 rb_define_method(rb_cThread, "value", thread_value, 0);
5437 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0);
5438 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0);
5439 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0);
5440 rb_define_method(rb_cThread, "run", rb_thread_run, 0);
5441 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
5442 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
5443 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
5444 rb_define_method(rb_cThread, "fetch", rb_thread_fetch, -1);
5445 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
5446 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
5447 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
5448 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
5449 rb_define_method(rb_cThread, "status", rb_thread_status, 0);
5450 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
5451 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
5452 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0);
5453 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1);
5454 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
5455 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
5456 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0);
5457 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
5458 rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0);
5459 rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1);
5460 rb_define_method(rb_cThread, "group", rb_thread_group, 0);
5461 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
5462 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
5464 rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
5465 rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
5466 rb_define_method(rb_cThread, "native_thread_id", rb_thread_native_thread_id, 0);
5467 rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);
5468 rb_define_alias(rb_cThread, "inspect", "to_s");
5470 rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError,
5471 "stream closed in another thread");
5473 cThGroup = rb_define_class("ThreadGroup", rb_cObject);
5474 rb_define_alloc_func(cThGroup, thgroup_s_alloc);
5475 rb_define_method(cThGroup, "list", thgroup_list, 0);
5476 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0);
5477 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
5478 rb_define_method(cThGroup, "add", thgroup_add, 1);
5481 th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
5482 rb_define_const(cThGroup, "Default", th->thgroup);
5485 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
5487 /* init thread core */
5489 /* main thread setting */
5491 /* acquire global vm lock */
5492 #ifdef HAVE_PTHREAD_NP_H
5493 VM_ASSERT(TH_SCHED(th)->running == th);
5494 #endif
5495 // thread_sched_to_running() should not be called because
5496 // it assumes blocked by thread_sched_to_waiting().
5497 // thread_sched_to_running(sched, th);
5499 th->pending_interrupt_queue = rb_ary_hidden_new(0);
5500 th->pending_interrupt_queue_checked = 0;
5501 th->pending_interrupt_mask_stack = rb_ary_hidden_new(0);
5505 rb_thread_create_timer_thread();
5507 Init_thread_sync();
5509 // TODO: Suppress unused function warning for now
5510 // if (0) rb_thread_sched_destroy(NULL);
5514 ruby_native_thread_p(void)
5516 rb_thread_t *th = ruby_thread_from_native();
5518 return th != 0;
5521 #ifdef NON_SCALAR_THREAD_ID
5522 #define thread_id_str(th) (NULL)
5523 #else
5524 #define thread_id_str(th) ((void *)(uintptr_t)(th)->nt->thread_id)
5525 #endif
5527 static void
5528 debug_deadlock_check(rb_ractor_t *r, VALUE msg)
5530 rb_thread_t *th = 0;
5531 VALUE sep = rb_str_new_cstr("\n ");
5533 rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
5534 rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
5535 (void *)GET_THREAD(), (void *)r->threads.main);
5537 ccan_list_for_each(&r->threads.set, th, lt_node) {
5538 rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
5539 "native:%p int:%u",
5540 th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
5542 if (th->locking_mutex) {
5543 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
5544 rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
5545 (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
5549 struct rb_waiting_list *list = th->join_list;
5550 while (list) {
5551 rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread);
5552 list = list->next;
5555 rb_str_catf(msg, "\n ");
5556 rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
5557 rb_str_catf(msg, "\n");
5561 static void
5562 rb_check_deadlock(rb_ractor_t *r)
5564 if (GET_THREAD()->vm->thread_ignore_deadlock) return;
5566 #ifdef RUBY_THREAD_PTHREAD_H
5567 if (r->threads.sched.readyq_cnt > 0) return;
5568 #endif
5570 int sleeper_num = rb_ractor_sleeper_thread_num(r);
5571 int ltnum = rb_ractor_living_thread_num(r);
5573 if (ltnum > sleeper_num) return;
5574 if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
5576 int found = 0;
5577 rb_thread_t *th = NULL;
5579 ccan_list_for_each(&r->threads.set, th, lt_node) {
5580 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
5581 found = 1;
5583 else if (th->locking_mutex) {
5584 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
5585 if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
5586 found = 1;
5589 if (found)
5590 break;
5593 if (!found) {
5594 VALUE argv[2];
5595 argv[0] = rb_eFatal;
5596 argv[1] = rb_str_new2("No live threads left. Deadlock?");
5597 debug_deadlock_check(r, argv[1]);
5598 rb_ractor_sleeper_threads_dec(GET_RACTOR());
5599 rb_threadptr_raise(r->threads.main, 2, argv);
5603 // Used for VM memsize reporting. Returns the size of a list of waiting_fd
5604 // structs. Defined here because the struct definition lives here as well.
5605 size_t
5606 rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds)
5608 struct waiting_fd *waitfd = 0;
5609 size_t size = 0;
5611 ccan_list_for_each(waiting_fds, waitfd, wfd_node) {
5612 size += sizeof(struct waiting_fd);
5615 return size;
5618 static void
5619 update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
5621 const rb_control_frame_t *cfp = GET_EC()->cfp;
5622 VALUE coverage = rb_iseq_coverage(cfp->iseq);
5623 if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
5624 VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
5625 if (lines) {
5626 long line = rb_sourceline() - 1;
5627 long count;
5628 VALUE num;
5629 void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
5630 if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
5631 rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
5632 rb_ary_push(lines, LONG2FIX(line + 1));
5633 return;
5635 if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
5636 return;
5638 num = RARRAY_AREF(lines, line);
5639 if (!FIXNUM_P(num)) return;
5640 count = FIX2LONG(num) + 1;
5641 if (POSFIXABLE(count)) {
5642 RARRAY_ASET(lines, line, LONG2FIX(count));
5648 static void
5649 update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
5651 const rb_control_frame_t *cfp = GET_EC()->cfp;
5652 VALUE coverage = rb_iseq_coverage(cfp->iseq);
5653 if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
5654 VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
5655 if (branches) {
5656 long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1;
5657 long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
5658 VALUE counters = RARRAY_AREF(branches, 1);
5659 VALUE num = RARRAY_AREF(counters, idx);
5660 count = FIX2LONG(num) + 1;
5661 if (POSFIXABLE(count)) {
5662 RARRAY_ASET(counters, idx, LONG2FIX(count));
5668 const rb_method_entry_t *
5669 rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5])
5671 VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column;
5673 if (!me->def) return NULL; // negative cme
5675 retry:
5676 switch (me->def->type) {
5677 case VM_METHOD_TYPE_ISEQ: {
5678 const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
5679 rb_iseq_location_t *loc = &ISEQ_BODY(iseq)->location;
5680 path = rb_iseq_path(iseq);
5681 beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
5682 beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
5683 end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
5684 end_pos_column = INT2FIX(loc->code_location.end_pos.column);
5685 break;
5687 case VM_METHOD_TYPE_BMETHOD: {
5688 const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0);
5689 if (iseq) {
5690 rb_iseq_location_t *loc;
5691 rb_iseq_check(iseq);
5692 path = rb_iseq_path(iseq);
5693 loc = &ISEQ_BODY(iseq)->location;
5694 beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
5695 beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
5696 end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
5697 end_pos_column = INT2FIX(loc->code_location.end_pos.column);
5698 break;
5700 return NULL;
5702 case VM_METHOD_TYPE_ALIAS:
5703 me = me->def->body.alias.original_me;
5704 goto retry;
5705 case VM_METHOD_TYPE_REFINED:
5706 me = me->def->body.refined.orig_me;
5707 if (!me) return NULL;
5708 goto retry;
5709 default:
5710 return NULL;
5713 /* found */
5714 if (RB_TYPE_P(path, T_ARRAY)) {
5715 path = rb_ary_entry(path, 1);
5716 if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
5718 if (resolved_location) {
5719 resolved_location[0] = path;
5720 resolved_location[1] = beg_pos_lineno;
5721 resolved_location[2] = beg_pos_column;
5722 resolved_location[3] = end_pos_lineno;
5723 resolved_location[4] = end_pos_column;
5725 return me;
5728 static void
5729 update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg)
5731 const rb_control_frame_t *cfp = GET_EC()->cfp;
5732 const rb_callable_method_entry_t *cme = rb_vm_frame_method_entry(cfp);
5733 const rb_method_entry_t *me = (const rb_method_entry_t *)cme;
5734 VALUE rcount;
5735 long count;
5737 me = rb_resolve_me_location(me, 0);
5738 if (!me) return;
5740 rcount = rb_hash_aref(me2counter, (VALUE) me);
5741 count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1;
5742 if (POSFIXABLE(count)) {
5743 rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
5747 VALUE
5748 rb_get_coverages(void)
5750 return GET_VM()->coverages;
5754 rb_get_coverage_mode(void)
5756 return GET_VM()->coverage_mode;
5759 void
5760 rb_set_coverages(VALUE coverages, int mode, VALUE me2counter)
5762 GET_VM()->coverages = coverages;
5763 GET_VM()->me2counter = me2counter;
5764 GET_VM()->coverage_mode = mode;
5767 void
5768 rb_resume_coverages(void)
5770 int mode = GET_VM()->coverage_mode;
5771 VALUE me2counter = GET_VM()->me2counter;
5772 rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5773 if (mode & COVERAGE_TARGET_BRANCHES) {
5774 rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5776 if (mode & COVERAGE_TARGET_METHODS) {
5777 rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5781 void
5782 rb_suspend_coverages(void)
5784 rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage);
5785 if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) {
5786 rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
5788 if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) {
5789 rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
5793 /* Make coverage arrays empty so old covered files are no longer tracked. */
5794 void
5795 rb_reset_coverages(void)
5797 rb_clear_coverages();
5798 rb_iseq_remove_coverage_all();
5799 GET_VM()->coverages = Qfalse;
5802 VALUE
5803 rb_default_coverage(int n)
5805 VALUE coverage = rb_ary_hidden_new_fill(3);
5806 VALUE lines = Qfalse, branches = Qfalse;
5807 int mode = GET_VM()->coverage_mode;
5809 if (mode & COVERAGE_TARGET_LINES) {
5810 lines = n > 0 ? rb_ary_hidden_new_fill(n) : rb_ary_hidden_new(0);
5812 RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines);
5814 if (mode & COVERAGE_TARGET_BRANCHES) {
5815 branches = rb_ary_hidden_new_fill(2);
5816 /* internal data structures for branch coverage:
5818 * { branch base node =>
5819 * [base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, {
5820 * branch target id =>
5821 * [target_type, target_first_lineno, target_first_column, target_last_lineno, target_last_column, target_counter_index],
5822 * ...
5823 * }],
5824 * ...
5827 * Example:
5828 * { NODE_CASE =>
5829 * [1, 0, 4, 3, {
5830 * NODE_WHEN => [2, 8, 2, 9, 0],
5831 * NODE_WHEN => [3, 8, 3, 9, 1],
5832 * ...
5833 * }],
5834 * ...
5837 VALUE structure = rb_hash_new();
5838 rb_obj_hide(structure);
5839 RARRAY_ASET(branches, 0, structure);
5840 /* branch execution counters */
5841 RARRAY_ASET(branches, 1, rb_ary_hidden_new(0));
5843 RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches);
5845 return coverage;
5848 static VALUE
5849 uninterruptible_exit(VALUE v)
5851 rb_thread_t *cur_th = GET_THREAD();
5852 rb_ary_pop(cur_th->pending_interrupt_mask_stack);
5854 cur_th->pending_interrupt_queue_checked = 0;
5855 if (!rb_threadptr_pending_interrupt_empty_p(cur_th)) {
5856 RUBY_VM_SET_INTERRUPT(cur_th->ec);
5858 return Qnil;
5861 VALUE
5862 rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
5864 VALUE interrupt_mask = rb_ident_hash_new();
5865 rb_thread_t *cur_th = GET_THREAD();
5867 rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
5868 OBJ_FREEZE(interrupt_mask);
5869 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
5871 VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil);
5873 RUBY_VM_CHECK_INTS(cur_th->ec);
5874 return ret;
5877 static void
5878 thread_specific_storage_alloc(rb_thread_t *th)
5880 VM_ASSERT(th->specific_storage == NULL);
5882 if (UNLIKELY(specific_key_count > 0)) {
5883 th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
5887 rb_internal_thread_specific_key_t
5888 rb_internal_thread_specific_key_create(void)
5890 rb_vm_t *vm = GET_VM();
5892 if (specific_key_count == 0 && vm->ractor.cnt > 1) {
5893 rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors");
5895 else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) {
5896 rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
5898 else {
5899 rb_internal_thread_specific_key_t key = specific_key_count++;
5901 if (key == 0) {
5902 // allocate
5903 rb_ractor_t *cr = GET_RACTOR();
5904 rb_thread_t *th;
5906 ccan_list_for_each(&cr->threads.set, th, lt_node) {
5907 thread_specific_storage_alloc(th);
5910 return key;
5914 // async and native thread safe.
5915 void *
5916 rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key)
5918 rb_thread_t *th = DATA_PTR(thread_val);
5920 VM_ASSERT(rb_thread_ptr(thread_val) == th);
5921 VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
5922 VM_ASSERT(th->specific_storage);
5924 return th->specific_storage[key];
5927 // async and native thread safe.
5928 void
5929 rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data)
5931 rb_thread_t *th = DATA_PTR(thread_val);
5933 VM_ASSERT(rb_thread_ptr(thread_val) == th);
5934 VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
5935 VM_ASSERT(th->specific_storage);
5937 th->specific_storage[key] = data;