1 /* included by thread.c */
2 #include "ccan/list/list.h"
5 static VALUE rb_cMutex
, rb_cQueue
, rb_cSizedQueue
, rb_cConditionVariable
;
6 static VALUE rb_eClosedQueueError
;
9 typedef struct rb_mutex_struct
{
11 struct rb_mutex_struct
*next_mutex
;
12 struct ccan_list_head waitq
; /* protected by GVL */
15 /* sync_waiter is always on-stack */
20 struct ccan_list_node node
;
23 static inline rb_fiber_t
*
24 nonblocking_fiber(rb_fiber_t
*fiber
)
26 if (rb_fiberptr_blocking(fiber
)) {
33 struct queue_sleep_arg
{
39 #define MUTEX_ALLOW_TRAP FL_USER1
42 sync_wakeup(struct ccan_list_head
*head
, long max
)
44 RUBY_DEBUG_LOG("max:%ld", max
);
46 struct sync_waiter
*cur
= 0, *next
;
48 ccan_list_for_each_safe(head
, cur
, next
, node
) {
49 ccan_list_del_init(&cur
->node
);
51 if (cur
->th
->status
!= THREAD_KILLED
) {
52 if (cur
->th
->scheduler
!= Qnil
&& cur
->fiber
) {
53 rb_fiber_scheduler_unblock(cur
->th
->scheduler
, cur
->self
, rb_fiberptr_self(cur
->fiber
));
56 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur
->th
));
57 rb_threadptr_interrupt(cur
->th
);
58 cur
->th
->status
= THREAD_RUNNABLE
;
61 if (--max
== 0) return;
67 wakeup_one(struct ccan_list_head
*head
)
73 wakeup_all(struct ccan_list_head
*head
)
75 sync_wakeup(head
, LONG_MAX
);
78 #if defined(HAVE_WORKING_FORK)
79 static void rb_mutex_abandon_all(rb_mutex_t
*mutexes
);
80 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t
*th
);
81 static void rb_mutex_abandon_locking_mutex(rb_thread_t
*th
);
83 static const char* rb_mutex_unlock_th(rb_mutex_t
*mutex
, rb_thread_t
*th
, rb_fiber_t
*fiber
);
86 * Document-class: Thread::Mutex
88 * Thread::Mutex implements a simple semaphore that can be used to
89 * coordinate access to shared data from multiple concurrent threads.
93 * semaphore = Thread::Mutex.new
96 * semaphore.synchronize {
97 * # access shared resource
102 * semaphore.synchronize {
103 * # access shared resource
109 #define mutex_mark ((void(*)(void*))0)
112 rb_mutex_num_waiting(rb_mutex_t
*mutex
)
114 struct sync_waiter
*w
= 0;
117 ccan_list_for_each(&mutex
->waitq
, w
, node
) {
124 rb_thread_t
* rb_fiber_threadptr(const rb_fiber_t
*fiber
);
127 mutex_free(void *ptr
)
129 rb_mutex_t
*mutex
= ptr
;
131 /* rb_warn("free locked mutex"); */
132 const char *err
= rb_mutex_unlock_th(mutex
, rb_fiber_threadptr(mutex
->fiber
), mutex
->fiber
);
133 if (err
) rb_bug("%s", err
);
139 mutex_memsize(const void *ptr
)
141 return sizeof(rb_mutex_t
);
144 static const rb_data_type_t mutex_data_type
= {
146 {mutex_mark
, mutex_free
, mutex_memsize
,},
147 0, 0, RUBY_TYPED_WB_PROTECTED
| RUBY_TYPED_FREE_IMMEDIATELY
155 TypedData_Get_Struct(obj
, rb_mutex_t
, &mutex_data_type
, mutex
);
161 rb_obj_is_mutex(VALUE obj
)
163 return RBOOL(rb_typeddata_is_kind_of(obj
, &mutex_data_type
));
167 mutex_alloc(VALUE klass
)
172 obj
= TypedData_Make_Struct(klass
, rb_mutex_t
, &mutex_data_type
, mutex
);
174 ccan_list_head_init(&mutex
->waitq
);
180 * Thread::Mutex.new -> mutex
182 * Creates a new Mutex
185 mutex_initialize(VALUE self
)
193 return mutex_alloc(rb_cMutex
);
198 * mutex.locked? -> true or false
200 * Returns +true+ if this lock is currently held by some thread.
203 rb_mutex_locked_p(VALUE self
)
205 rb_mutex_t
*mutex
= mutex_ptr(self
);
207 return RBOOL(mutex
->fiber
);
211 thread_mutex_insert(rb_thread_t
*thread
, rb_mutex_t
*mutex
)
213 if (thread
->keeping_mutexes
) {
214 mutex
->next_mutex
= thread
->keeping_mutexes
;
217 thread
->keeping_mutexes
= mutex
;
221 thread_mutex_remove(rb_thread_t
*thread
, rb_mutex_t
*mutex
)
223 rb_mutex_t
**keeping_mutexes
= &thread
->keeping_mutexes
;
225 while (*keeping_mutexes
&& *keeping_mutexes
!= mutex
) {
226 // Move to the next mutex in the list:
227 keeping_mutexes
= &(*keeping_mutexes
)->next_mutex
;
230 if (*keeping_mutexes
) {
231 *keeping_mutexes
= mutex
->next_mutex
;
232 mutex
->next_mutex
= NULL
;
237 mutex_locked(rb_thread_t
*th
, VALUE self
)
239 rb_mutex_t
*mutex
= mutex_ptr(self
);
241 thread_mutex_insert(th
, mutex
);
246 * mutex.try_lock -> true or false
248 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
252 rb_mutex_trylock(VALUE self
)
254 rb_mutex_t
*mutex
= mutex_ptr(self
);
256 if (mutex
->fiber
== 0) {
257 RUBY_DEBUG_LOG("%p ok", mutex
);
259 rb_fiber_t
*fiber
= GET_EC()->fiber_ptr
;
260 rb_thread_t
*th
= GET_THREAD();
261 mutex
->fiber
= fiber
;
263 mutex_locked(th
, self
);
267 RUBY_DEBUG_LOG("%p ng", mutex
);
273 mutex_owned_p(rb_fiber_t
*fiber
, rb_mutex_t
*mutex
)
275 return RBOOL(mutex
->fiber
== fiber
);
279 call_rb_fiber_scheduler_block(VALUE mutex
)
281 return rb_fiber_scheduler_block(rb_fiber_scheduler_current(), mutex
, Qnil
);
285 delete_from_waitq(VALUE value
)
287 struct sync_waiter
*sync_waiter
= (void *)value
;
288 ccan_list_del(&sync_waiter
->node
);
293 static inline rb_atomic_t
threadptr_get_interrupts(rb_thread_t
*th
);
296 do_mutex_lock(VALUE self
, int interruptible_p
)
298 rb_execution_context_t
*ec
= GET_EC();
299 rb_thread_t
*th
= ec
->thread_ptr
;
300 rb_fiber_t
*fiber
= ec
->fiber_ptr
;
301 rb_mutex_t
*mutex
= mutex_ptr(self
);
302 rb_atomic_t saved_ints
= 0;
304 /* When running trap handler */
305 if (!FL_TEST_RAW(self
, MUTEX_ALLOW_TRAP
) &&
306 th
->ec
->interrupt_mask
& TRAP_INTERRUPT_MASK
) {
307 rb_raise(rb_eThreadError
, "can't be called from trap context");
310 if (rb_mutex_trylock(self
) == Qfalse
) {
311 if (mutex
->fiber
== fiber
) {
312 rb_raise(rb_eThreadError
, "deadlock; recursive locking");
315 while (mutex
->fiber
!= fiber
) {
316 VM_ASSERT(mutex
->fiber
!= NULL
);
318 VALUE scheduler
= rb_fiber_scheduler_current();
319 if (scheduler
!= Qnil
) {
320 struct sync_waiter sync_waiter
= {
323 .fiber
= nonblocking_fiber(fiber
)
326 ccan_list_add_tail(&mutex
->waitq
, &sync_waiter
.node
);
328 rb_ensure(call_rb_fiber_scheduler_block
, self
, delete_from_waitq
, (VALUE
)&sync_waiter
);
331 mutex
->fiber
= fiber
;
335 if (!th
->vm
->thread_ignore_deadlock
&& rb_fiber_threadptr(mutex
->fiber
) == th
) {
336 rb_raise(rb_eThreadError
, "deadlock; lock already owned by another fiber belonging to the same thread");
339 struct sync_waiter sync_waiter
= {
342 .fiber
= nonblocking_fiber(fiber
),
345 RUBY_DEBUG_LOG("%p wait", mutex
);
347 // similar code with `sleep_forever`, but
348 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
349 // Ensure clause is needed like but `rb_ensure` a bit slow.
352 // sleep_forever(th, SLEEP_DEADLOCKABLE);
354 // ccan_list_del(&sync_waiter.node);
356 enum rb_thread_status prev_status
= th
->status
;
357 th
->status
= THREAD_STOPPED_FOREVER
;
358 rb_ractor_sleeper_threads_inc(th
->ractor
);
359 rb_check_deadlock(th
->ractor
);
361 th
->locking_mutex
= self
;
363 ccan_list_add_tail(&mutex
->waitq
, &sync_waiter
.node
);
365 native_sleep(th
, NULL
);
367 ccan_list_del(&sync_waiter
.node
);
369 // unlocked by another thread while sleeping
371 mutex
->fiber
= fiber
;
374 rb_ractor_sleeper_threads_dec(th
->ractor
);
375 th
->status
= prev_status
;
376 th
->locking_mutex
= Qfalse
;
377 th
->locking_mutex
= Qfalse
;
379 RUBY_DEBUG_LOG("%p wakeup", mutex
);
382 if (interruptible_p
) {
383 /* release mutex before checking for interrupts...as interrupt checking
384 * code might call rb_raise() */
385 if (mutex
->fiber
== fiber
) mutex
->fiber
= 0;
386 RUBY_VM_CHECK_INTS_BLOCKING(th
->ec
); /* may release mutex */
388 mutex
->fiber
= fiber
;
392 // clear interrupt information
393 if (RUBY_VM_INTERRUPTED(th
->ec
)) {
395 if (saved_ints
== 0) {
396 saved_ints
= threadptr_get_interrupts(th
);
399 // ignore additional interrupts
400 threadptr_get_interrupts(th
);
406 if (saved_ints
) th
->ec
->interrupt_flag
= saved_ints
;
407 if (mutex
->fiber
== fiber
) mutex_locked(th
, self
);
410 RUBY_DEBUG_LOG("%p locked", mutex
);
413 if (mutex_owned_p(fiber
, mutex
) == Qfalse
) rb_bug("do_mutex_lock: mutex is not owned.");
419 mutex_lock_uninterruptible(VALUE self
)
421 return do_mutex_lock(self
, 0);
428 * Attempts to grab the lock and waits if it isn't available.
429 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
432 rb_mutex_lock(VALUE self
)
434 return do_mutex_lock(self
, 1);
439 * mutex.owned? -> true or false
441 * Returns +true+ if this lock is currently held by current thread.
444 rb_mutex_owned_p(VALUE self
)
446 rb_fiber_t
*fiber
= GET_EC()->fiber_ptr
;
447 rb_mutex_t
*mutex
= mutex_ptr(self
);
449 return mutex_owned_p(fiber
, mutex
);
453 rb_mutex_unlock_th(rb_mutex_t
*mutex
, rb_thread_t
*th
, rb_fiber_t
*fiber
)
455 RUBY_DEBUG_LOG("%p", mutex
);
457 if (mutex
->fiber
== 0) {
458 return "Attempt to unlock a mutex which is not locked";
460 else if (mutex
->fiber
!= fiber
) {
461 return "Attempt to unlock a mutex which is locked by another thread/fiber";
464 struct sync_waiter
*cur
= 0, *next
;
467 thread_mutex_remove(th
, mutex
);
469 ccan_list_for_each_safe(&mutex
->waitq
, cur
, next
, node
) {
470 ccan_list_del_init(&cur
->node
);
472 if (cur
->th
->scheduler
!= Qnil
&& cur
->fiber
) {
473 rb_fiber_scheduler_unblock(cur
->th
->scheduler
, cur
->self
, rb_fiberptr_self(cur
->fiber
));
477 switch (cur
->th
->status
) {
478 case THREAD_RUNNABLE
: /* from someone else calling Thread#run */
479 case THREAD_STOPPED_FOREVER
: /* likely (rb_mutex_lock) */
480 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur
->th
));
481 rb_threadptr_interrupt(cur
->th
);
483 case THREAD_STOPPED
: /* probably impossible */
484 rb_bug("unexpected THREAD_STOPPED");
486 /* not sure about this, possible in exit GC? */
487 rb_bug("unexpected THREAD_KILLED");
493 // We did not find any threads to wake up, so we can just return with no error:
499 * mutex.unlock -> self
502 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
505 rb_mutex_unlock(VALUE self
)
508 rb_mutex_t
*mutex
= mutex_ptr(self
);
509 rb_thread_t
*th
= GET_THREAD();
511 err
= rb_mutex_unlock_th(mutex
, th
, GET_EC()->fiber_ptr
);
512 if (err
) rb_raise(rb_eThreadError
, "%s", err
);
517 #if defined(HAVE_WORKING_FORK)
519 rb_mutex_abandon_keeping_mutexes(rb_thread_t
*th
)
521 rb_mutex_abandon_all(th
->keeping_mutexes
);
522 th
->keeping_mutexes
= NULL
;
526 rb_mutex_abandon_locking_mutex(rb_thread_t
*th
)
528 if (th
->locking_mutex
) {
529 rb_mutex_t
*mutex
= mutex_ptr(th
->locking_mutex
);
531 ccan_list_head_init(&mutex
->waitq
);
532 th
->locking_mutex
= Qfalse
;
537 rb_mutex_abandon_all(rb_mutex_t
*mutexes
)
543 mutexes
= mutex
->next_mutex
;
545 mutex
->next_mutex
= 0;
546 ccan_list_head_init(&mutex
->waitq
);
552 rb_mutex_sleep_forever(VALUE self
)
554 rb_thread_sleep_deadly_allow_spurious_wakeup(self
, Qnil
, 0);
559 rb_mutex_wait_for(VALUE time
)
561 rb_hrtime_t
*rel
= (rb_hrtime_t
*)time
;
562 /* permit spurious check */
563 return RBOOL(sleep_hrtime(GET_THREAD(), *rel
, 0));
567 rb_mutex_sleep(VALUE self
, VALUE timeout
)
572 if (!NIL_P(timeout
)) {
573 t
= rb_time_interval(timeout
);
576 rb_mutex_unlock(self
);
577 time_t beg
= time(0);
579 VALUE scheduler
= rb_fiber_scheduler_current();
580 if (scheduler
!= Qnil
) {
581 rb_fiber_scheduler_kernel_sleep(scheduler
, timeout
);
582 mutex_lock_uninterruptible(self
);
585 if (NIL_P(timeout
)) {
586 rb_ensure(rb_mutex_sleep_forever
, self
, mutex_lock_uninterruptible
, self
);
589 rb_hrtime_t rel
= rb_timeval2hrtime(&t
);
590 woken
= rb_ensure(rb_mutex_wait_for
, (VALUE
)&rel
, mutex_lock_uninterruptible
, self
);
594 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
595 if (!woken
) return Qnil
;
596 time_t end
= time(0) - beg
;
597 return TIMET2NUM(end
);
602 * mutex.sleep(timeout = nil) -> number or nil
604 * Releases the lock and sleeps +timeout+ seconds if it is given and
605 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
606 * the current thread.
608 * When the thread is next woken up, it will attempt to reacquire
611 * Note that this method can wakeup without explicit Thread#wakeup call.
612 * For example, receiving signal and so on.
614 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
617 mutex_sleep(int argc
, VALUE
*argv
, VALUE self
)
621 timeout
= rb_check_arity(argc
, 0, 1) ? argv
[0] : Qnil
;
622 return rb_mutex_sleep(self
, timeout
);
627 * mutex.synchronize { ... } -> result of the block
629 * Obtains a lock, runs the block, and releases the lock when the block
630 * completes. See the example under Thread::Mutex.
634 rb_mutex_synchronize(VALUE mutex
, VALUE (*func
)(VALUE arg
), VALUE arg
)
636 rb_mutex_lock(mutex
);
637 return rb_ensure(func
, arg
, rb_mutex_unlock
, mutex
);
642 * mutex.synchronize { ... } -> result of the block
644 * Obtains a lock, runs the block, and releases the lock when the block
645 * completes. See the example under Thread::Mutex.
648 rb_mutex_synchronize_m(VALUE self
)
650 if (!rb_block_given_p()) {
651 rb_raise(rb_eThreadError
, "must be called with a block");
654 return rb_mutex_synchronize(self
, rb_yield
, Qundef
);
658 rb_mutex_allow_trap(VALUE self
, int val
)
660 Check_TypedStruct(self
, &mutex_data_type
);
663 FL_SET_RAW(self
, MUTEX_ALLOW_TRAP
);
665 FL_UNSET_RAW(self
, MUTEX_ALLOW_TRAP
);
670 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
671 #define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
672 RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
674 struct ccan_list_head waitq
;
675 rb_serial_t fork_gen
;
678 } RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
680 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
681 #define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
682 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
683 RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
686 int num_waiting_push
;
687 struct ccan_list_head pushq
;
689 } RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
692 queue_mark(void *ptr
)
694 struct rb_queue
*q
= ptr
;
696 /* no need to mark threads in waitq, they are on stack */
701 queue_memsize(const void *ptr
)
703 return sizeof(struct rb_queue
);
706 static const rb_data_type_t queue_data_type
= {
708 {queue_mark
, RUBY_TYPED_DEFAULT_FREE
, queue_memsize
,},
709 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
713 queue_alloc(VALUE klass
)
718 obj
= TypedData_Make_Struct(klass
, struct rb_queue
, &queue_data_type
, q
);
719 ccan_list_head_init(queue_waitq(q
));
724 queue_fork_check(struct rb_queue
*q
)
726 rb_serial_t fork_gen
= GET_VM()->fork_gen
;
728 if (q
->fork_gen
== fork_gen
) {
731 /* forked children can't reach into parent thread stacks */
732 q
->fork_gen
= fork_gen
;
733 ccan_list_head_init(queue_waitq(q
));
738 static struct rb_queue
*
743 TypedData_Get_Struct(obj
, struct rb_queue
, &queue_data_type
, q
);
749 #define QUEUE_CLOSED FL_USER5
752 queue_timeout2hrtime(VALUE timeout
)
754 if (NIL_P(timeout
)) {
755 return (rb_hrtime_t
)0;
758 if (FIXNUM_P(timeout
)) {
759 rel
= rb_sec2hrtime(NUM2TIMET(timeout
));
762 double2hrtime(&rel
, rb_num2dbl(timeout
));
764 return rb_hrtime_add(rel
, rb_hrtime_now());
768 szqueue_mark(void *ptr
)
770 struct rb_szqueue
*sq
= ptr
;
776 szqueue_memsize(const void *ptr
)
778 return sizeof(struct rb_szqueue
);
781 static const rb_data_type_t szqueue_data_type
= {
783 {szqueue_mark
, RUBY_TYPED_DEFAULT_FREE
, szqueue_memsize
,},
784 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
788 szqueue_alloc(VALUE klass
)
790 struct rb_szqueue
*sq
;
791 VALUE obj
= TypedData_Make_Struct(klass
, struct rb_szqueue
,
792 &szqueue_data_type
, sq
);
793 ccan_list_head_init(szqueue_waitq(sq
));
794 ccan_list_head_init(szqueue_pushq(sq
));
798 static struct rb_szqueue
*
799 szqueue_ptr(VALUE obj
)
801 struct rb_szqueue
*sq
;
803 TypedData_Get_Struct(obj
, struct rb_szqueue
, &szqueue_data_type
, sq
);
804 if (queue_fork_check(&sq
->q
)) {
805 ccan_list_head_init(szqueue_pushq(sq
));
806 sq
->num_waiting_push
= 0;
815 return rb_ary_hidden_new(1);
819 check_array(VALUE obj
, VALUE ary
)
821 if (!RB_TYPE_P(ary
, T_ARRAY
)) {
822 rb_raise(rb_eTypeError
, "%+"PRIsVALUE
" not initialized", obj
);
828 queue_length(VALUE self
, struct rb_queue
*q
)
830 return RARRAY_LEN(check_array(self
, q
->que
));
834 queue_closed_p(VALUE self
)
836 return FL_TEST_RAW(self
, QUEUE_CLOSED
) != 0;
840 * Document-class: ClosedQueueError
842 * The exception class which will be raised when pushing into a closed
843 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
846 NORETURN(static void raise_closed_queue_error(VALUE self
));
849 raise_closed_queue_error(VALUE self
)
851 rb_raise(rb_eClosedQueueError
, "queue closed");
855 queue_closed_result(VALUE self
, struct rb_queue
*q
)
857 RUBY_ASSERT(queue_length(self
, q
) == 0);
862 * Document-class: Thread::Queue
864 * The Thread::Queue class implements multi-producer, multi-consumer
865 * queues. It is especially useful in threaded programming when
866 * information must be exchanged safely between multiple threads. The
867 * Thread::Queue class implements all the required locking semantics.
869 * The class implements FIFO (first in, first out) type of queue.
870 * In a FIFO queue, the first tasks added are the first retrieved.
874 * queue = Thread::Queue.new
876 * producer = Thread.new do
878 * sleep rand(i) # simulate expense
880 * puts "#{i} produced"
884 * consumer = Thread.new do
887 * sleep rand(i/2) # simulate expense
888 * puts "consumed #{value}"
897 * Document-method: Queue::new
900 * Thread::Queue.new -> empty_queue
901 * Thread::Queue.new(enumerable) -> queue
903 * Creates a new queue instance, optionally using the contents of an +enumerable+
904 * for its initial state.
908 * q = Thread::Queue.new
909 * #=> #<Thread::Queue:0x00007ff7501110d0>
913 * q = Thread::Queue.new([1, 2, 3])
914 * #=> #<Thread::Queue:0x00007ff7500ec500>
922 rb_queue_initialize(int argc
, VALUE
*argv
, VALUE self
)
925 struct rb_queue
*q
= queue_ptr(self
);
926 if ((argc
= rb_scan_args(argc
, argv
, "01", &initial
)) == 1) {
927 initial
= rb_to_array(initial
);
929 RB_OBJ_WRITE(self
, queue_list(q
), ary_buf_new());
930 ccan_list_head_init(queue_waitq(q
));
932 rb_ary_concat(q
->que
, initial
);
938 queue_do_push(VALUE self
, struct rb_queue
*q
, VALUE obj
)
940 if (queue_closed_p(self
)) {
941 raise_closed_queue_error(self
);
943 rb_ary_push(check_array(self
, q
->que
), obj
);
944 wakeup_one(queue_waitq(q
));
949 * Document-method: Thread::Queue#close
953 * Closes the queue. A closed queue cannot be re-opened.
955 * After the call to close completes, the following are true:
957 * - +closed?+ will return true
959 * - +close+ will be ignored.
961 * - calling enq/push/<< will raise a +ClosedQueueError+.
963 * - when +empty?+ is false, calling deq/pop/shift will return an object
964 * from the queue as usual.
965 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
966 * deq(true) will raise a +ThreadError+.
968 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
972 * q = Thread::Queue.new
974 * while e = q.deq # wait for nil to break loop
982 rb_queue_close(VALUE self
)
984 struct rb_queue
*q
= queue_ptr(self
);
986 if (!queue_closed_p(self
)) {
987 FL_SET(self
, QUEUE_CLOSED
);
989 wakeup_all(queue_waitq(q
));
996 * Document-method: Thread::Queue#closed?
999 * Returns +true+ if the queue is closed.
1003 rb_queue_closed_p(VALUE self
)
1005 return RBOOL(queue_closed_p(self
));
1009 * Document-method: Thread::Queue#push
1015 * Pushes the given +object+ to the queue.
1019 rb_queue_push(VALUE self
, VALUE obj
)
1021 return queue_do_push(self
, queue_ptr(self
), obj
);
1025 queue_sleep(VALUE _args
)
1027 struct queue_sleep_arg
*args
= (struct queue_sleep_arg
*)_args
;
1028 rb_thread_sleep_deadly_allow_spurious_wakeup(args
->self
, args
->timeout
, args
->end
);
1032 struct queue_waiter
{
1033 struct sync_waiter w
;
1036 struct rb_szqueue
*sq
;
1041 queue_sleep_done(VALUE p
)
1043 struct queue_waiter
*qw
= (struct queue_waiter
*)p
;
1045 ccan_list_del(&qw
->w
.node
);
1046 qw
->as
.q
->num_waiting
--;
1052 szqueue_sleep_done(VALUE p
)
1054 struct queue_waiter
*qw
= (struct queue_waiter
*)p
;
1056 ccan_list_del(&qw
->w
.node
);
1057 qw
->as
.sq
->num_waiting_push
--;
1063 queue_do_pop(VALUE self
, struct rb_queue
*q
, int should_block
, VALUE timeout
)
1065 check_array(self
, q
->que
);
1066 if (RARRAY_LEN(q
->que
) == 0) {
1067 if (!should_block
) {
1068 rb_raise(rb_eThreadError
, "queue empty");
1071 if (RTEST(rb_equal(INT2FIX(0), timeout
))) {
1076 rb_hrtime_t end
= queue_timeout2hrtime(timeout
);
1077 while (RARRAY_LEN(q
->que
) == 0) {
1078 if (queue_closed_p(self
)) {
1079 return queue_closed_result(self
, q
);
1082 rb_execution_context_t
*ec
= GET_EC();
1084 RUBY_ASSERT(RARRAY_LEN(q
->que
) == 0);
1085 RUBY_ASSERT(queue_closed_p(self
) == 0);
1087 struct queue_waiter queue_waiter
= {
1088 .w
= {.self
= self
, .th
= ec
->thread_ptr
, .fiber
= nonblocking_fiber(ec
->fiber_ptr
)},
1092 struct ccan_list_head
*waitq
= queue_waitq(q
);
1094 ccan_list_add_tail(waitq
, &queue_waiter
.w
.node
);
1095 queue_waiter
.as
.q
->num_waiting
++;
1097 struct queue_sleep_arg queue_sleep_arg
= {
1103 rb_ensure(queue_sleep
, (VALUE
)&queue_sleep_arg
, queue_sleep_done
, (VALUE
)&queue_waiter
);
1104 if (!NIL_P(timeout
) && (rb_hrtime_now() >= end
))
1109 return rb_ary_shift(q
->que
);
1113 rb_queue_pop(rb_execution_context_t
*ec
, VALUE self
, VALUE non_block
, VALUE timeout
)
1115 return queue_do_pop(self
, queue_ptr(self
), !RTEST(non_block
), timeout
);
1119 * Document-method: Thread::Queue#empty?
1122 * Returns +true+ if the queue is empty.
1126 rb_queue_empty_p(VALUE self
)
1128 return RBOOL(queue_length(self
, queue_ptr(self
)) == 0);
1132 * Document-method: Thread::Queue#clear
1134 * Removes all objects from the queue.
1138 rb_queue_clear(VALUE self
)
1140 struct rb_queue
*q
= queue_ptr(self
);
1142 rb_ary_clear(check_array(self
, q
->que
));
1147 * Document-method: Thread::Queue#length
1152 * Returns the length of the queue.
1156 rb_queue_length(VALUE self
)
1158 return LONG2NUM(queue_length(self
, queue_ptr(self
)));
1161 NORETURN(static VALUE
rb_queue_freeze(VALUE self
));
1166 * The queue can't be frozen, so this method raises an exception:
1167 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1171 rb_queue_freeze(VALUE self
)
1173 rb_raise(rb_eTypeError
, "cannot freeze " "%+"PRIsVALUE
, self
);
1174 UNREACHABLE_RETURN(self
);
1178 * Document-method: Thread::Queue#num_waiting
1180 * Returns the number of threads waiting on the queue.
1184 rb_queue_num_waiting(VALUE self
)
1186 struct rb_queue
*q
= queue_ptr(self
);
1188 return INT2NUM(q
->num_waiting
);
1192 * Document-class: Thread::SizedQueue
1194 * This class represents queues of specified size capacity. The push operation
1195 * may be blocked if the capacity is full.
1197 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1201 * Document-method: SizedQueue::new
1202 * call-seq: new(max)
1204 * Creates a fixed-length queue with a maximum size of +max+.
1208 rb_szqueue_initialize(VALUE self
, VALUE vmax
)
1211 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1213 max
= NUM2LONG(vmax
);
1215 rb_raise(rb_eArgError
, "queue size must be positive");
1218 RB_OBJ_WRITE(self
, szqueue_list(sq
), ary_buf_new());
1219 ccan_list_head_init(szqueue_waitq(sq
));
1220 ccan_list_head_init(szqueue_pushq(sq
));
1227 * Document-method: Thread::SizedQueue#close
1231 * Similar to Thread::Queue#close.
1233 * The difference is behavior with waiting enqueuing threads.
1235 * If there are waiting enqueuing threads, they are interrupted by
1236 * raising ClosedQueueError('queue closed').
1239 rb_szqueue_close(VALUE self
)
1241 if (!queue_closed_p(self
)) {
1242 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1244 FL_SET(self
, QUEUE_CLOSED
);
1245 wakeup_all(szqueue_waitq(sq
));
1246 wakeup_all(szqueue_pushq(sq
));
1252 * Document-method: Thread::SizedQueue#max
1254 * Returns the maximum size of the queue.
1258 rb_szqueue_max_get(VALUE self
)
1260 return LONG2NUM(szqueue_ptr(self
)->max
);
1264 * Document-method: Thread::SizedQueue#max=
1265 * call-seq: max=(number)
1267 * Sets the maximum size of the queue to the given +number+.
1271 rb_szqueue_max_set(VALUE self
, VALUE vmax
)
1273 long max
= NUM2LONG(vmax
);
1275 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1278 rb_raise(rb_eArgError
, "queue size must be positive");
1280 if (max
> sq
->max
) {
1281 diff
= max
- sq
->max
;
1284 sync_wakeup(szqueue_pushq(sq
), diff
);
1289 rb_szqueue_push(rb_execution_context_t
*ec
, VALUE self
, VALUE object
, VALUE non_block
, VALUE timeout
)
1291 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1293 if (queue_length(self
, &sq
->q
) >= sq
->max
) {
1294 if (RTEST(non_block
)) {
1295 rb_raise(rb_eThreadError
, "queue full");
1298 if (RTEST(rb_equal(INT2FIX(0), timeout
))) {
1303 rb_hrtime_t end
= queue_timeout2hrtime(timeout
);
1304 while (queue_length(self
, &sq
->q
) >= sq
->max
) {
1305 if (queue_closed_p(self
)) {
1306 raise_closed_queue_error(self
);
1309 rb_execution_context_t
*ec
= GET_EC();
1310 struct queue_waiter queue_waiter
= {
1311 .w
= {.self
= self
, .th
= ec
->thread_ptr
, .fiber
= nonblocking_fiber(ec
->fiber_ptr
)},
1315 struct ccan_list_head
*pushq
= szqueue_pushq(sq
);
1317 ccan_list_add_tail(pushq
, &queue_waiter
.w
.node
);
1318 sq
->num_waiting_push
++;
1320 struct queue_sleep_arg queue_sleep_arg
= {
1325 rb_ensure(queue_sleep
, (VALUE
)&queue_sleep_arg
, szqueue_sleep_done
, (VALUE
)&queue_waiter
);
1326 if (!NIL_P(timeout
) && rb_hrtime_now() >= end
) {
1332 return queue_do_push(self
, &sq
->q
, object
);
1336 szqueue_do_pop(VALUE self
, int should_block
, VALUE timeout
)
1338 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1339 VALUE retval
= queue_do_pop(self
, &sq
->q
, should_block
, timeout
);
1341 if (queue_length(self
, &sq
->q
) < sq
->max
) {
1342 wakeup_one(szqueue_pushq(sq
));
1348 rb_szqueue_pop(rb_execution_context_t
*ec
, VALUE self
, VALUE non_block
, VALUE timeout
)
1350 return szqueue_do_pop(self
, !RTEST(non_block
), timeout
);
1354 * Document-method: Thread::SizedQueue#clear
1356 * Removes all objects from the queue.
1360 rb_szqueue_clear(VALUE self
)
1362 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1364 rb_ary_clear(check_array(self
, sq
->q
.que
));
1365 wakeup_all(szqueue_pushq(sq
));
1370 * Document-method: Thread::SizedQueue#length
1375 * Returns the length of the queue.
1379 rb_szqueue_length(VALUE self
)
1381 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1383 return LONG2NUM(queue_length(self
, &sq
->q
));
1387 * Document-method: Thread::SizedQueue#num_waiting
1389 * Returns the number of threads waiting on the queue.
1393 rb_szqueue_num_waiting(VALUE self
)
1395 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1397 return INT2NUM(sq
->q
.num_waiting
+ sq
->num_waiting_push
);
1401 * Document-method: Thread::SizedQueue#empty?
1404 * Returns +true+ if the queue is empty.
1408 rb_szqueue_empty_p(VALUE self
)
1410 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1412 return RBOOL(queue_length(self
, &sq
->q
) == 0);
1416 /* ConditionalVariable */
1418 struct ccan_list_head waitq
;
1419 rb_serial_t fork_gen
;
1423 * Document-class: Thread::ConditionVariable
1425 * ConditionVariable objects augment class Mutex. Using condition variables,
1426 * it is possible to suspend while in the middle of a critical section until a
1427 * resource becomes available.
1431 * mutex = Thread::Mutex.new
1432 * resource = Thread::ConditionVariable.new
1435 * mutex.synchronize {
1436 * # Thread 'a' now needs the resource
1437 * resource.wait(mutex)
1438 * # 'a' can now have the resource
1443 * mutex.synchronize {
1444 * # Thread 'b' has finished using the resource
1451 condvar_memsize(const void *ptr
)
1453 return sizeof(struct rb_condvar
);
1456 static const rb_data_type_t cv_data_type
= {
1458 {0, RUBY_TYPED_DEFAULT_FREE
, condvar_memsize
,},
1459 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
1462 static struct rb_condvar
*
1463 condvar_ptr(VALUE self
)
1465 struct rb_condvar
*cv
;
1466 rb_serial_t fork_gen
= GET_VM()->fork_gen
;
1468 TypedData_Get_Struct(self
, struct rb_condvar
, &cv_data_type
, cv
);
1470 /* forked children can't reach into parent thread stacks */
1471 if (cv
->fork_gen
!= fork_gen
) {
1472 cv
->fork_gen
= fork_gen
;
1473 ccan_list_head_init(&cv
->waitq
);
1480 condvar_alloc(VALUE klass
)
1482 struct rb_condvar
*cv
;
1485 obj
= TypedData_Make_Struct(klass
, struct rb_condvar
, &cv_data_type
, cv
);
1486 ccan_list_head_init(&cv
->waitq
);
1492 * Document-method: ConditionVariable::new
1494 * Creates a new condition variable instance.
1498 rb_condvar_initialize(VALUE self
)
1500 struct rb_condvar
*cv
= condvar_ptr(self
);
1501 ccan_list_head_init(&cv
->waitq
);
1513 do_sleep(VALUE args
)
1515 struct sleep_call
*p
= (struct sleep_call
*)args
;
1516 return rb_funcallv(p
->mutex
, id_sleep
, 1, &p
->timeout
);
1520 * Document-method: Thread::ConditionVariable#wait
1521 * call-seq: wait(mutex, timeout=nil)
1523 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1525 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1526 * even if no other thread doesn't signal.
1528 * Returns the slept result on +mutex+.
1532 rb_condvar_wait(int argc
, VALUE
*argv
, VALUE self
)
1534 rb_execution_context_t
*ec
= GET_EC();
1536 struct rb_condvar
*cv
= condvar_ptr(self
);
1537 struct sleep_call args
;
1539 rb_scan_args(argc
, argv
, "11", &args
.mutex
, &args
.timeout
);
1541 struct sync_waiter sync_waiter
= {
1543 .th
= ec
->thread_ptr
,
1544 .fiber
= nonblocking_fiber(ec
->fiber_ptr
)
1547 ccan_list_add_tail(&cv
->waitq
, &sync_waiter
.node
);
1548 return rb_ensure(do_sleep
, (VALUE
)&args
, delete_from_waitq
, (VALUE
)&sync_waiter
);
1552 * Document-method: Thread::ConditionVariable#signal
1554 * Wakes up the first thread in line waiting for this lock.
1558 rb_condvar_signal(VALUE self
)
1560 struct rb_condvar
*cv
= condvar_ptr(self
);
1561 wakeup_one(&cv
->waitq
);
1566 * Document-method: Thread::ConditionVariable#broadcast
1568 * Wakes up all threads waiting for this lock.
1572 rb_condvar_broadcast(VALUE self
)
1574 struct rb_condvar
*cv
= condvar_ptr(self
);
1575 wakeup_all(&cv
->waitq
);
1579 NORETURN(static VALUE
undumpable(VALUE obj
));
1582 undumpable(VALUE obj
)
1584 rb_raise(rb_eTypeError
, "can't dump %"PRIsVALUE
, rb_obj_class(obj
));
1585 UNREACHABLE_RETURN(Qnil
);
1589 define_thread_class(VALUE outer
, const ID name
, VALUE super
)
1591 VALUE klass
= rb_define_class_id_under(outer
, name
, super
);
1592 rb_const_set(rb_cObject
, name
, klass
);
1597 Init_thread_sync(void)
1600 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1601 rb_cMutex
= rb_define_class_under(rb_cThread
, "Mutex", rb_cObject
);
1602 rb_cConditionVariable
= rb_define_class_under(rb_cThread
, "ConditionVariable", rb_cObject
);
1603 rb_cQueue
= rb_define_class_under(rb_cThread
, "Queue", rb_cObject
);
1604 rb_cSizedQueue
= rb_define_class_under(rb_cThread
, "SizedQueue", rb_cObject
);
1607 #define DEFINE_CLASS(name, super) \
1608 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1611 DEFINE_CLASS(Mutex
, Object
);
1612 rb_define_alloc_func(rb_cMutex
, mutex_alloc
);
1613 rb_define_method(rb_cMutex
, "initialize", mutex_initialize
, 0);
1614 rb_define_method(rb_cMutex
, "locked?", rb_mutex_locked_p
, 0);
1615 rb_define_method(rb_cMutex
, "try_lock", rb_mutex_trylock
, 0);
1616 rb_define_method(rb_cMutex
, "lock", rb_mutex_lock
, 0);
1617 rb_define_method(rb_cMutex
, "unlock", rb_mutex_unlock
, 0);
1618 rb_define_method(rb_cMutex
, "sleep", mutex_sleep
, -1);
1619 rb_define_method(rb_cMutex
, "synchronize", rb_mutex_synchronize_m
, 0);
1620 rb_define_method(rb_cMutex
, "owned?", rb_mutex_owned_p
, 0);
1623 DEFINE_CLASS(Queue
, Object
);
1624 rb_define_alloc_func(rb_cQueue
, queue_alloc
);
1626 rb_eClosedQueueError
= rb_define_class("ClosedQueueError", rb_eStopIteration
);
1628 rb_define_method(rb_cQueue
, "initialize", rb_queue_initialize
, -1);
1629 rb_undef_method(rb_cQueue
, "initialize_copy");
1630 rb_define_method(rb_cQueue
, "marshal_dump", undumpable
, 0);
1631 rb_define_method(rb_cQueue
, "close", rb_queue_close
, 0);
1632 rb_define_method(rb_cQueue
, "closed?", rb_queue_closed_p
, 0);
1633 rb_define_method(rb_cQueue
, "push", rb_queue_push
, 1);
1634 rb_define_method(rb_cQueue
, "empty?", rb_queue_empty_p
, 0);
1635 rb_define_method(rb_cQueue
, "clear", rb_queue_clear
, 0);
1636 rb_define_method(rb_cQueue
, "length", rb_queue_length
, 0);
1637 rb_define_method(rb_cQueue
, "num_waiting", rb_queue_num_waiting
, 0);
1638 rb_define_method(rb_cQueue
, "freeze", rb_queue_freeze
, 0);
1640 rb_define_alias(rb_cQueue
, "enq", "push");
1641 rb_define_alias(rb_cQueue
, "<<", "push");
1642 rb_define_alias(rb_cQueue
, "size", "length");
1644 DEFINE_CLASS(SizedQueue
, Queue
);
1645 rb_define_alloc_func(rb_cSizedQueue
, szqueue_alloc
);
1647 rb_define_method(rb_cSizedQueue
, "initialize", rb_szqueue_initialize
, 1);
1648 rb_define_method(rb_cSizedQueue
, "close", rb_szqueue_close
, 0);
1649 rb_define_method(rb_cSizedQueue
, "max", rb_szqueue_max_get
, 0);
1650 rb_define_method(rb_cSizedQueue
, "max=", rb_szqueue_max_set
, 1);
1651 rb_define_method(rb_cSizedQueue
, "empty?", rb_szqueue_empty_p
, 0);
1652 rb_define_method(rb_cSizedQueue
, "clear", rb_szqueue_clear
, 0);
1653 rb_define_method(rb_cSizedQueue
, "length", rb_szqueue_length
, 0);
1654 rb_define_method(rb_cSizedQueue
, "num_waiting", rb_szqueue_num_waiting
, 0);
1655 rb_define_alias(rb_cSizedQueue
, "size", "length");
1658 DEFINE_CLASS(ConditionVariable
, Object
);
1659 rb_define_alloc_func(rb_cConditionVariable
, condvar_alloc
);
1661 id_sleep
= rb_intern("sleep");
1663 rb_define_method(rb_cConditionVariable
, "initialize", rb_condvar_initialize
, 0);
1664 rb_undef_method(rb_cConditionVariable
, "initialize_copy");
1665 rb_define_method(rb_cConditionVariable
, "marshal_dump", undumpable
, 0);
1666 rb_define_method(rb_cConditionVariable
, "wait", rb_condvar_wait
, -1);
1667 rb_define_method(rb_cConditionVariable
, "signal", rb_condvar_signal
, 0);
1668 rb_define_method(rb_cConditionVariable
, "broadcast", rb_condvar_broadcast
, 0);
1670 rb_provide("thread.rb");
1673 #include "thread_sync.rbinc"