Tempfile document updated.
[ruby.git] / thread_sync.c
blobae69cb4a6e708e6bbac75a4370d67a85b20e65d6
1 /* included by thread.c */
2 #include "ccan/list/list.h"
3 #include "builtin.h"
5 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6 static VALUE rb_eClosedQueueError;
8 /* Mutex */
9 typedef struct rb_mutex_struct {
10 rb_fiber_t *fiber;
11 struct rb_mutex_struct *next_mutex;
12 struct ccan_list_head waitq; /* protected by GVL */
13 } rb_mutex_t;
15 /* sync_waiter is always on-stack */
16 struct sync_waiter {
17 VALUE self;
18 rb_thread_t *th;
19 rb_fiber_t *fiber;
20 struct ccan_list_node node;
23 static inline rb_fiber_t*
24 nonblocking_fiber(rb_fiber_t *fiber)
26 if (rb_fiberptr_blocking(fiber)) {
27 return NULL;
30 return fiber;
33 struct queue_sleep_arg {
34 VALUE self;
35 VALUE timeout;
36 rb_hrtime_t end;
39 #define MUTEX_ALLOW_TRAP FL_USER1
41 static void
42 sync_wakeup(struct ccan_list_head *head, long max)
44 RUBY_DEBUG_LOG("max:%ld", max);
46 struct sync_waiter *cur = 0, *next;
48 ccan_list_for_each_safe(head, cur, next, node) {
49 ccan_list_del_init(&cur->node);
51 if (cur->th->status != THREAD_KILLED) {
52 if (cur->th->scheduler != Qnil && cur->fiber) {
53 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
55 else {
56 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
57 rb_threadptr_interrupt(cur->th);
58 cur->th->status = THREAD_RUNNABLE;
61 if (--max == 0) return;
66 static void
67 wakeup_one(struct ccan_list_head *head)
69 sync_wakeup(head, 1);
72 static void
73 wakeup_all(struct ccan_list_head *head)
75 sync_wakeup(head, LONG_MAX);
78 #if defined(HAVE_WORKING_FORK)
79 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
80 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
81 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
82 #endif
83 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
86 * Document-class: Thread::Mutex
88 * Thread::Mutex implements a simple semaphore that can be used to
89 * coordinate access to shared data from multiple concurrent threads.
91 * Example:
93 * semaphore = Thread::Mutex.new
95 * a = Thread.new {
96 * semaphore.synchronize {
97 * # access shared resource
98 * }
99 * }
101 * b = Thread.new {
102 * semaphore.synchronize {
103 * # access shared resource
109 #define mutex_mark ((void(*)(void*))0)
111 static size_t
112 rb_mutex_num_waiting(rb_mutex_t *mutex)
114 struct sync_waiter *w = 0;
115 size_t n = 0;
117 ccan_list_for_each(&mutex->waitq, w, node) {
118 n++;
121 return n;
124 rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
126 static void
127 mutex_free(void *ptr)
129 rb_mutex_t *mutex = ptr;
130 if (mutex->fiber) {
131 /* rb_warn("free locked mutex"); */
132 const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
133 if (err) rb_bug("%s", err);
135 ruby_xfree(ptr);
138 static size_t
139 mutex_memsize(const void *ptr)
141 return sizeof(rb_mutex_t);
144 static const rb_data_type_t mutex_data_type = {
145 "mutex",
146 {mutex_mark, mutex_free, mutex_memsize,},
147 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
150 static rb_mutex_t *
151 mutex_ptr(VALUE obj)
153 rb_mutex_t *mutex;
155 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
157 return mutex;
160 VALUE
161 rb_obj_is_mutex(VALUE obj)
163 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
166 static VALUE
167 mutex_alloc(VALUE klass)
169 VALUE obj;
170 rb_mutex_t *mutex;
172 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
174 ccan_list_head_init(&mutex->waitq);
175 return obj;
179 * call-seq:
180 * Thread::Mutex.new -> mutex
182 * Creates a new Mutex
184 static VALUE
185 mutex_initialize(VALUE self)
187 return self;
190 VALUE
191 rb_mutex_new(void)
193 return mutex_alloc(rb_cMutex);
197 * call-seq:
198 * mutex.locked? -> true or false
200 * Returns +true+ if this lock is currently held by some thread.
202 VALUE
203 rb_mutex_locked_p(VALUE self)
205 rb_mutex_t *mutex = mutex_ptr(self);
207 return RBOOL(mutex->fiber);
210 static void
211 thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
213 if (thread->keeping_mutexes) {
214 mutex->next_mutex = thread->keeping_mutexes;
217 thread->keeping_mutexes = mutex;
220 static void
221 thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
223 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
225 while (*keeping_mutexes && *keeping_mutexes != mutex) {
226 // Move to the next mutex in the list:
227 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
230 if (*keeping_mutexes) {
231 *keeping_mutexes = mutex->next_mutex;
232 mutex->next_mutex = NULL;
236 static void
237 mutex_locked(rb_thread_t *th, VALUE self)
239 rb_mutex_t *mutex = mutex_ptr(self);
241 thread_mutex_insert(th, mutex);
245 * call-seq:
246 * mutex.try_lock -> true or false
248 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
249 * lock was granted.
251 VALUE
252 rb_mutex_trylock(VALUE self)
254 rb_mutex_t *mutex = mutex_ptr(self);
256 if (mutex->fiber == 0) {
257 RUBY_DEBUG_LOG("%p ok", mutex);
259 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
260 rb_thread_t *th = GET_THREAD();
261 mutex->fiber = fiber;
263 mutex_locked(th, self);
264 return Qtrue;
266 else {
267 RUBY_DEBUG_LOG("%p ng", mutex);
268 return Qfalse;
272 static VALUE
273 mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
275 return RBOOL(mutex->fiber == fiber);
278 static VALUE
279 call_rb_fiber_scheduler_block(VALUE mutex)
281 return rb_fiber_scheduler_block(rb_fiber_scheduler_current(), mutex, Qnil);
284 static VALUE
285 delete_from_waitq(VALUE value)
287 struct sync_waiter *sync_waiter = (void *)value;
288 ccan_list_del(&sync_waiter->node);
290 return Qnil;
293 static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
295 static VALUE
296 do_mutex_lock(VALUE self, int interruptible_p)
298 rb_execution_context_t *ec = GET_EC();
299 rb_thread_t *th = ec->thread_ptr;
300 rb_fiber_t *fiber = ec->fiber_ptr;
301 rb_mutex_t *mutex = mutex_ptr(self);
302 rb_atomic_t saved_ints = 0;
304 /* When running trap handler */
305 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
306 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
307 rb_raise(rb_eThreadError, "can't be called from trap context");
310 if (rb_mutex_trylock(self) == Qfalse) {
311 if (mutex->fiber == fiber) {
312 rb_raise(rb_eThreadError, "deadlock; recursive locking");
315 while (mutex->fiber != fiber) {
316 VM_ASSERT(mutex->fiber != NULL);
318 VALUE scheduler = rb_fiber_scheduler_current();
319 if (scheduler != Qnil) {
320 struct sync_waiter sync_waiter = {
321 .self = self,
322 .th = th,
323 .fiber = nonblocking_fiber(fiber)
326 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
328 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
330 if (!mutex->fiber) {
331 mutex->fiber = fiber;
334 else {
335 if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
336 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
339 struct sync_waiter sync_waiter = {
340 .self = self,
341 .th = th,
342 .fiber = nonblocking_fiber(fiber),
345 RUBY_DEBUG_LOG("%p wait", mutex);
347 // similar code with `sleep_forever`, but
348 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
349 // Ensure clause is needed like but `rb_ensure` a bit slow.
351 // begin
352 // sleep_forever(th, SLEEP_DEADLOCKABLE);
353 // ensure
354 // ccan_list_del(&sync_waiter.node);
355 // end
356 enum rb_thread_status prev_status = th->status;
357 th->status = THREAD_STOPPED_FOREVER;
358 rb_ractor_sleeper_threads_inc(th->ractor);
359 rb_check_deadlock(th->ractor);
361 th->locking_mutex = self;
363 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
365 native_sleep(th, NULL);
367 ccan_list_del(&sync_waiter.node);
369 // unlocked by another thread while sleeping
370 if (!mutex->fiber) {
371 mutex->fiber = fiber;
374 rb_ractor_sleeper_threads_dec(th->ractor);
375 th->status = prev_status;
376 th->locking_mutex = Qfalse;
377 th->locking_mutex = Qfalse;
379 RUBY_DEBUG_LOG("%p wakeup", mutex);
382 if (interruptible_p) {
383 /* release mutex before checking for interrupts...as interrupt checking
384 * code might call rb_raise() */
385 if (mutex->fiber == fiber) mutex->fiber = 0;
386 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
387 if (!mutex->fiber) {
388 mutex->fiber = fiber;
391 else {
392 // clear interrupt information
393 if (RUBY_VM_INTERRUPTED(th->ec)) {
394 // reset interrupts
395 if (saved_ints == 0) {
396 saved_ints = threadptr_get_interrupts(th);
398 else {
399 // ignore additional interrupts
400 threadptr_get_interrupts(th);
406 if (saved_ints) th->ec->interrupt_flag = saved_ints;
407 if (mutex->fiber == fiber) mutex_locked(th, self);
410 RUBY_DEBUG_LOG("%p locked", mutex);
412 // assertion
413 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
415 return self;
418 static VALUE
419 mutex_lock_uninterruptible(VALUE self)
421 return do_mutex_lock(self, 0);
425 * call-seq:
426 * mutex.lock -> self
428 * Attempts to grab the lock and waits if it isn't available.
429 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
431 VALUE
432 rb_mutex_lock(VALUE self)
434 return do_mutex_lock(self, 1);
438 * call-seq:
439 * mutex.owned? -> true or false
441 * Returns +true+ if this lock is currently held by current thread.
443 VALUE
444 rb_mutex_owned_p(VALUE self)
446 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
447 rb_mutex_t *mutex = mutex_ptr(self);
449 return mutex_owned_p(fiber, mutex);
452 static const char *
453 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
455 RUBY_DEBUG_LOG("%p", mutex);
457 if (mutex->fiber == 0) {
458 return "Attempt to unlock a mutex which is not locked";
460 else if (mutex->fiber != fiber) {
461 return "Attempt to unlock a mutex which is locked by another thread/fiber";
464 struct sync_waiter *cur = 0, *next;
466 mutex->fiber = 0;
467 thread_mutex_remove(th, mutex);
469 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
470 ccan_list_del_init(&cur->node);
472 if (cur->th->scheduler != Qnil && cur->fiber) {
473 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
474 return NULL;
476 else {
477 switch (cur->th->status) {
478 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
479 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
480 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
481 rb_threadptr_interrupt(cur->th);
482 return NULL;
483 case THREAD_STOPPED: /* probably impossible */
484 rb_bug("unexpected THREAD_STOPPED");
485 case THREAD_KILLED:
486 /* not sure about this, possible in exit GC? */
487 rb_bug("unexpected THREAD_KILLED");
488 continue;
493 // We did not find any threads to wake up, so we can just return with no error:
494 return NULL;
498 * call-seq:
499 * mutex.unlock -> self
501 * Releases the lock.
502 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
504 VALUE
505 rb_mutex_unlock(VALUE self)
507 const char *err;
508 rb_mutex_t *mutex = mutex_ptr(self);
509 rb_thread_t *th = GET_THREAD();
511 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
512 if (err) rb_raise(rb_eThreadError, "%s", err);
514 return self;
517 #if defined(HAVE_WORKING_FORK)
518 static void
519 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
521 rb_mutex_abandon_all(th->keeping_mutexes);
522 th->keeping_mutexes = NULL;
525 static void
526 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
528 if (th->locking_mutex) {
529 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
531 ccan_list_head_init(&mutex->waitq);
532 th->locking_mutex = Qfalse;
536 static void
537 rb_mutex_abandon_all(rb_mutex_t *mutexes)
539 rb_mutex_t *mutex;
541 while (mutexes) {
542 mutex = mutexes;
543 mutexes = mutex->next_mutex;
544 mutex->fiber = 0;
545 mutex->next_mutex = 0;
546 ccan_list_head_init(&mutex->waitq);
549 #endif
551 static VALUE
552 rb_mutex_sleep_forever(VALUE self)
554 rb_thread_sleep_deadly_allow_spurious_wakeup(self, Qnil, 0);
555 return Qnil;
558 static VALUE
559 rb_mutex_wait_for(VALUE time)
561 rb_hrtime_t *rel = (rb_hrtime_t *)time;
562 /* permit spurious check */
563 return RBOOL(sleep_hrtime(GET_THREAD(), *rel, 0));
566 VALUE
567 rb_mutex_sleep(VALUE self, VALUE timeout)
569 struct timeval t;
570 VALUE woken = Qtrue;
572 if (!NIL_P(timeout)) {
573 t = rb_time_interval(timeout);
576 rb_mutex_unlock(self);
577 time_t beg = time(0);
579 VALUE scheduler = rb_fiber_scheduler_current();
580 if (scheduler != Qnil) {
581 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
582 mutex_lock_uninterruptible(self);
584 else {
585 if (NIL_P(timeout)) {
586 rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self);
588 else {
589 rb_hrtime_t rel = rb_timeval2hrtime(&t);
590 woken = rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
594 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
595 if (!woken) return Qnil;
596 time_t end = time(0) - beg;
597 return TIMET2NUM(end);
601 * call-seq:
602 * mutex.sleep(timeout = nil) -> number or nil
604 * Releases the lock and sleeps +timeout+ seconds if it is given and
605 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
606 * the current thread.
608 * When the thread is next woken up, it will attempt to reacquire
609 * the lock.
611 * Note that this method can wakeup without explicit Thread#wakeup call.
612 * For example, receiving signal and so on.
614 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
616 static VALUE
617 mutex_sleep(int argc, VALUE *argv, VALUE self)
619 VALUE timeout;
621 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
622 return rb_mutex_sleep(self, timeout);
626 * call-seq:
627 * mutex.synchronize { ... } -> result of the block
629 * Obtains a lock, runs the block, and releases the lock when the block
630 * completes. See the example under Thread::Mutex.
633 VALUE
634 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
636 rb_mutex_lock(mutex);
637 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
641 * call-seq:
642 * mutex.synchronize { ... } -> result of the block
644 * Obtains a lock, runs the block, and releases the lock when the block
645 * completes. See the example under Thread::Mutex.
647 static VALUE
648 rb_mutex_synchronize_m(VALUE self)
650 if (!rb_block_given_p()) {
651 rb_raise(rb_eThreadError, "must be called with a block");
654 return rb_mutex_synchronize(self, rb_yield, Qundef);
657 void
658 rb_mutex_allow_trap(VALUE self, int val)
660 Check_TypedStruct(self, &mutex_data_type);
662 if (val)
663 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
664 else
665 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
668 /* Queue */
670 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
671 #define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
672 RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
673 struct rb_queue {
674 struct ccan_list_head waitq;
675 rb_serial_t fork_gen;
676 const VALUE que;
677 int num_waiting;
678 } RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
680 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
681 #define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
682 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
683 RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
684 struct rb_szqueue {
685 struct rb_queue q;
686 int num_waiting_push;
687 struct ccan_list_head pushq;
688 long max;
689 } RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
691 static void
692 queue_mark(void *ptr)
694 struct rb_queue *q = ptr;
696 /* no need to mark threads in waitq, they are on stack */
697 rb_gc_mark(q->que);
700 static size_t
701 queue_memsize(const void *ptr)
703 return sizeof(struct rb_queue);
706 static const rb_data_type_t queue_data_type = {
707 "queue",
708 {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
709 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
712 static VALUE
713 queue_alloc(VALUE klass)
715 VALUE obj;
716 struct rb_queue *q;
718 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
719 ccan_list_head_init(queue_waitq(q));
720 return obj;
723 static int
724 queue_fork_check(struct rb_queue *q)
726 rb_serial_t fork_gen = GET_VM()->fork_gen;
728 if (q->fork_gen == fork_gen) {
729 return 0;
731 /* forked children can't reach into parent thread stacks */
732 q->fork_gen = fork_gen;
733 ccan_list_head_init(queue_waitq(q));
734 q->num_waiting = 0;
735 return 1;
738 static struct rb_queue *
739 queue_ptr(VALUE obj)
741 struct rb_queue *q;
743 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
744 queue_fork_check(q);
746 return q;
749 #define QUEUE_CLOSED FL_USER5
751 static rb_hrtime_t
752 queue_timeout2hrtime(VALUE timeout)
754 if (NIL_P(timeout)) {
755 return (rb_hrtime_t)0;
757 rb_hrtime_t rel = 0;
758 if (FIXNUM_P(timeout)) {
759 rel = rb_sec2hrtime(NUM2TIMET(timeout));
761 else {
762 double2hrtime(&rel, rb_num2dbl(timeout));
764 return rb_hrtime_add(rel, rb_hrtime_now());
767 static void
768 szqueue_mark(void *ptr)
770 struct rb_szqueue *sq = ptr;
772 queue_mark(&sq->q);
775 static size_t
776 szqueue_memsize(const void *ptr)
778 return sizeof(struct rb_szqueue);
781 static const rb_data_type_t szqueue_data_type = {
782 "sized_queue",
783 {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
784 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
787 static VALUE
788 szqueue_alloc(VALUE klass)
790 struct rb_szqueue *sq;
791 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
792 &szqueue_data_type, sq);
793 ccan_list_head_init(szqueue_waitq(sq));
794 ccan_list_head_init(szqueue_pushq(sq));
795 return obj;
798 static struct rb_szqueue *
799 szqueue_ptr(VALUE obj)
801 struct rb_szqueue *sq;
803 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
804 if (queue_fork_check(&sq->q)) {
805 ccan_list_head_init(szqueue_pushq(sq));
806 sq->num_waiting_push = 0;
809 return sq;
812 static VALUE
813 ary_buf_new(void)
815 return rb_ary_hidden_new(1);
818 static VALUE
819 check_array(VALUE obj, VALUE ary)
821 if (!RB_TYPE_P(ary, T_ARRAY)) {
822 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
824 return ary;
827 static long
828 queue_length(VALUE self, struct rb_queue *q)
830 return RARRAY_LEN(check_array(self, q->que));
833 static int
834 queue_closed_p(VALUE self)
836 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
840 * Document-class: ClosedQueueError
842 * The exception class which will be raised when pushing into a closed
843 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
846 NORETURN(static void raise_closed_queue_error(VALUE self));
848 static void
849 raise_closed_queue_error(VALUE self)
851 rb_raise(rb_eClosedQueueError, "queue closed");
854 static VALUE
855 queue_closed_result(VALUE self, struct rb_queue *q)
857 RUBY_ASSERT(queue_length(self, q) == 0);
858 return Qnil;
862 * Document-class: Thread::Queue
864 * The Thread::Queue class implements multi-producer, multi-consumer
865 * queues. It is especially useful in threaded programming when
866 * information must be exchanged safely between multiple threads. The
867 * Thread::Queue class implements all the required locking semantics.
869 * The class implements FIFO (first in, first out) type of queue.
870 * In a FIFO queue, the first tasks added are the first retrieved.
872 * Example:
874 * queue = Thread::Queue.new
876 * producer = Thread.new do
877 * 5.times do |i|
878 * sleep rand(i) # simulate expense
879 * queue << i
880 * puts "#{i} produced"
881 * end
882 * end
884 * consumer = Thread.new do
885 * 5.times do |i|
886 * value = queue.pop
887 * sleep rand(i/2) # simulate expense
888 * puts "consumed #{value}"
889 * end
890 * end
892 * consumer.join
897 * Document-method: Queue::new
899 * call-seq:
900 * Thread::Queue.new -> empty_queue
901 * Thread::Queue.new(enumerable) -> queue
903 * Creates a new queue instance, optionally using the contents of an +enumerable+
904 * for its initial state.
906 * Example:
908 * q = Thread::Queue.new
909 * #=> #<Thread::Queue:0x00007ff7501110d0>
910 * q.empty?
911 * #=> true
913 * q = Thread::Queue.new([1, 2, 3])
914 * #=> #<Thread::Queue:0x00007ff7500ec500>
915 * q.empty?
916 * #=> false
917 * q.pop
918 * #=> 1
921 static VALUE
922 rb_queue_initialize(int argc, VALUE *argv, VALUE self)
924 VALUE initial;
925 struct rb_queue *q = queue_ptr(self);
926 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
927 initial = rb_to_array(initial);
929 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
930 ccan_list_head_init(queue_waitq(q));
931 if (argc == 1) {
932 rb_ary_concat(q->que, initial);
934 return self;
937 static VALUE
938 queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
940 if (queue_closed_p(self)) {
941 raise_closed_queue_error(self);
943 rb_ary_push(check_array(self, q->que), obj);
944 wakeup_one(queue_waitq(q));
945 return self;
949 * Document-method: Thread::Queue#close
950 * call-seq:
951 * close
953 * Closes the queue. A closed queue cannot be re-opened.
955 * After the call to close completes, the following are true:
957 * - +closed?+ will return true
959 * - +close+ will be ignored.
961 * - calling enq/push/<< will raise a +ClosedQueueError+.
963 * - when +empty?+ is false, calling deq/pop/shift will return an object
964 * from the queue as usual.
965 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
966 * deq(true) will raise a +ThreadError+.
968 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
970 * Example:
972 * q = Thread::Queue.new
973 * Thread.new{
974 * while e = q.deq # wait for nil to break loop
975 * # ...
976 * end
978 * q.close
981 static VALUE
982 rb_queue_close(VALUE self)
984 struct rb_queue *q = queue_ptr(self);
986 if (!queue_closed_p(self)) {
987 FL_SET(self, QUEUE_CLOSED);
989 wakeup_all(queue_waitq(q));
992 return self;
996 * Document-method: Thread::Queue#closed?
997 * call-seq: closed?
999 * Returns +true+ if the queue is closed.
1002 static VALUE
1003 rb_queue_closed_p(VALUE self)
1005 return RBOOL(queue_closed_p(self));
1009 * Document-method: Thread::Queue#push
1010 * call-seq:
1011 * push(object)
1012 * enq(object)
1013 * <<(object)
1015 * Pushes the given +object+ to the queue.
1018 static VALUE
1019 rb_queue_push(VALUE self, VALUE obj)
1021 return queue_do_push(self, queue_ptr(self), obj);
1024 static VALUE
1025 queue_sleep(VALUE _args)
1027 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1028 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1029 return Qnil;
1032 struct queue_waiter {
1033 struct sync_waiter w;
1034 union {
1035 struct rb_queue *q;
1036 struct rb_szqueue *sq;
1037 } as;
1040 static VALUE
1041 queue_sleep_done(VALUE p)
1043 struct queue_waiter *qw = (struct queue_waiter *)p;
1045 ccan_list_del(&qw->w.node);
1046 qw->as.q->num_waiting--;
1048 return Qfalse;
1051 static VALUE
1052 szqueue_sleep_done(VALUE p)
1054 struct queue_waiter *qw = (struct queue_waiter *)p;
1056 ccan_list_del(&qw->w.node);
1057 qw->as.sq->num_waiting_push--;
1059 return Qfalse;
1062 static VALUE
1063 queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
1065 check_array(self, q->que);
1066 if (RARRAY_LEN(q->que) == 0) {
1067 if (!should_block) {
1068 rb_raise(rb_eThreadError, "queue empty");
1071 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1072 return Qnil;
1076 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1077 while (RARRAY_LEN(q->que) == 0) {
1078 if (queue_closed_p(self)) {
1079 return queue_closed_result(self, q);
1081 else {
1082 rb_execution_context_t *ec = GET_EC();
1084 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1085 RUBY_ASSERT(queue_closed_p(self) == 0);
1087 struct queue_waiter queue_waiter = {
1088 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1089 .as = {.q = q}
1092 struct ccan_list_head *waitq = queue_waitq(q);
1094 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1095 queue_waiter.as.q->num_waiting++;
1097 struct queue_sleep_arg queue_sleep_arg = {
1098 .self = self,
1099 .timeout = timeout,
1100 .end = end
1103 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1104 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1105 break;
1109 return rb_ary_shift(q->que);
1112 static VALUE
1113 rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1115 return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
1119 * Document-method: Thread::Queue#empty?
1120 * call-seq: empty?
1122 * Returns +true+ if the queue is empty.
1125 static VALUE
1126 rb_queue_empty_p(VALUE self)
1128 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1132 * Document-method: Thread::Queue#clear
1134 * Removes all objects from the queue.
1137 static VALUE
1138 rb_queue_clear(VALUE self)
1140 struct rb_queue *q = queue_ptr(self);
1142 rb_ary_clear(check_array(self, q->que));
1143 return self;
1147 * Document-method: Thread::Queue#length
1148 * call-seq:
1149 * length
1150 * size
1152 * Returns the length of the queue.
1155 static VALUE
1156 rb_queue_length(VALUE self)
1158 return LONG2NUM(queue_length(self, queue_ptr(self)));
1161 NORETURN(static VALUE rb_queue_freeze(VALUE self));
1163 * call-seq:
1164 * freeze
1166 * The queue can't be frozen, so this method raises an exception:
1167 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1170 static VALUE
1171 rb_queue_freeze(VALUE self)
1173 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1174 UNREACHABLE_RETURN(self);
1178 * Document-method: Thread::Queue#num_waiting
1180 * Returns the number of threads waiting on the queue.
1183 static VALUE
1184 rb_queue_num_waiting(VALUE self)
1186 struct rb_queue *q = queue_ptr(self);
1188 return INT2NUM(q->num_waiting);
1192 * Document-class: Thread::SizedQueue
1194 * This class represents queues of specified size capacity. The push operation
1195 * may be blocked if the capacity is full.
1197 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1201 * Document-method: SizedQueue::new
1202 * call-seq: new(max)
1204 * Creates a fixed-length queue with a maximum size of +max+.
1207 static VALUE
1208 rb_szqueue_initialize(VALUE self, VALUE vmax)
1210 long max;
1211 struct rb_szqueue *sq = szqueue_ptr(self);
1213 max = NUM2LONG(vmax);
1214 if (max <= 0) {
1215 rb_raise(rb_eArgError, "queue size must be positive");
1218 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1219 ccan_list_head_init(szqueue_waitq(sq));
1220 ccan_list_head_init(szqueue_pushq(sq));
1221 sq->max = max;
1223 return self;
1227 * Document-method: Thread::SizedQueue#close
1228 * call-seq:
1229 * close
1231 * Similar to Thread::Queue#close.
1233 * The difference is behavior with waiting enqueuing threads.
1235 * If there are waiting enqueuing threads, they are interrupted by
1236 * raising ClosedQueueError('queue closed').
1238 static VALUE
1239 rb_szqueue_close(VALUE self)
1241 if (!queue_closed_p(self)) {
1242 struct rb_szqueue *sq = szqueue_ptr(self);
1244 FL_SET(self, QUEUE_CLOSED);
1245 wakeup_all(szqueue_waitq(sq));
1246 wakeup_all(szqueue_pushq(sq));
1248 return self;
1252 * Document-method: Thread::SizedQueue#max
1254 * Returns the maximum size of the queue.
1257 static VALUE
1258 rb_szqueue_max_get(VALUE self)
1260 return LONG2NUM(szqueue_ptr(self)->max);
1264 * Document-method: Thread::SizedQueue#max=
1265 * call-seq: max=(number)
1267 * Sets the maximum size of the queue to the given +number+.
1270 static VALUE
1271 rb_szqueue_max_set(VALUE self, VALUE vmax)
1273 long max = NUM2LONG(vmax);
1274 long diff = 0;
1275 struct rb_szqueue *sq = szqueue_ptr(self);
1277 if (max <= 0) {
1278 rb_raise(rb_eArgError, "queue size must be positive");
1280 if (max > sq->max) {
1281 diff = max - sq->max;
1283 sq->max = max;
1284 sync_wakeup(szqueue_pushq(sq), diff);
1285 return vmax;
1288 static VALUE
1289 rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1291 struct rb_szqueue *sq = szqueue_ptr(self);
1293 if (queue_length(self, &sq->q) >= sq->max) {
1294 if (RTEST(non_block)) {
1295 rb_raise(rb_eThreadError, "queue full");
1298 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1299 return Qnil;
1303 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1304 while (queue_length(self, &sq->q) >= sq->max) {
1305 if (queue_closed_p(self)) {
1306 raise_closed_queue_error(self);
1308 else {
1309 rb_execution_context_t *ec = GET_EC();
1310 struct queue_waiter queue_waiter = {
1311 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1312 .as = {.sq = sq}
1315 struct ccan_list_head *pushq = szqueue_pushq(sq);
1317 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1318 sq->num_waiting_push++;
1320 struct queue_sleep_arg queue_sleep_arg = {
1321 .self = self,
1322 .timeout = timeout,
1323 .end = end
1325 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1326 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1327 return Qnil;
1332 return queue_do_push(self, &sq->q, object);
1335 static VALUE
1336 szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
1338 struct rb_szqueue *sq = szqueue_ptr(self);
1339 VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
1341 if (queue_length(self, &sq->q) < sq->max) {
1342 wakeup_one(szqueue_pushq(sq));
1345 return retval;
1347 static VALUE
1348 rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1350 return szqueue_do_pop(self, !RTEST(non_block), timeout);
1354 * Document-method: Thread::SizedQueue#clear
1356 * Removes all objects from the queue.
1359 static VALUE
1360 rb_szqueue_clear(VALUE self)
1362 struct rb_szqueue *sq = szqueue_ptr(self);
1364 rb_ary_clear(check_array(self, sq->q.que));
1365 wakeup_all(szqueue_pushq(sq));
1366 return self;
1370 * Document-method: Thread::SizedQueue#length
1371 * call-seq:
1372 * length
1373 * size
1375 * Returns the length of the queue.
1378 static VALUE
1379 rb_szqueue_length(VALUE self)
1381 struct rb_szqueue *sq = szqueue_ptr(self);
1383 return LONG2NUM(queue_length(self, &sq->q));
1387 * Document-method: Thread::SizedQueue#num_waiting
1389 * Returns the number of threads waiting on the queue.
1392 static VALUE
1393 rb_szqueue_num_waiting(VALUE self)
1395 struct rb_szqueue *sq = szqueue_ptr(self);
1397 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1401 * Document-method: Thread::SizedQueue#empty?
1402 * call-seq: empty?
1404 * Returns +true+ if the queue is empty.
1407 static VALUE
1408 rb_szqueue_empty_p(VALUE self)
1410 struct rb_szqueue *sq = szqueue_ptr(self);
1412 return RBOOL(queue_length(self, &sq->q) == 0);
1416 /* ConditionalVariable */
1417 struct rb_condvar {
1418 struct ccan_list_head waitq;
1419 rb_serial_t fork_gen;
1423 * Document-class: Thread::ConditionVariable
1425 * ConditionVariable objects augment class Mutex. Using condition variables,
1426 * it is possible to suspend while in the middle of a critical section until a
1427 * resource becomes available.
1429 * Example:
1431 * mutex = Thread::Mutex.new
1432 * resource = Thread::ConditionVariable.new
1434 * a = Thread.new {
1435 * mutex.synchronize {
1436 * # Thread 'a' now needs the resource
1437 * resource.wait(mutex)
1438 * # 'a' can now have the resource
1442 * b = Thread.new {
1443 * mutex.synchronize {
1444 * # Thread 'b' has finished using the resource
1445 * resource.signal
1450 static size_t
1451 condvar_memsize(const void *ptr)
1453 return sizeof(struct rb_condvar);
1456 static const rb_data_type_t cv_data_type = {
1457 "condvar",
1458 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1459 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1462 static struct rb_condvar *
1463 condvar_ptr(VALUE self)
1465 struct rb_condvar *cv;
1466 rb_serial_t fork_gen = GET_VM()->fork_gen;
1468 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1470 /* forked children can't reach into parent thread stacks */
1471 if (cv->fork_gen != fork_gen) {
1472 cv->fork_gen = fork_gen;
1473 ccan_list_head_init(&cv->waitq);
1476 return cv;
1479 static VALUE
1480 condvar_alloc(VALUE klass)
1482 struct rb_condvar *cv;
1483 VALUE obj;
1485 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1486 ccan_list_head_init(&cv->waitq);
1488 return obj;
1492 * Document-method: ConditionVariable::new
1494 * Creates a new condition variable instance.
1497 static VALUE
1498 rb_condvar_initialize(VALUE self)
1500 struct rb_condvar *cv = condvar_ptr(self);
1501 ccan_list_head_init(&cv->waitq);
1502 return self;
1505 struct sleep_call {
1506 VALUE mutex;
1507 VALUE timeout;
1510 static ID id_sleep;
1512 static VALUE
1513 do_sleep(VALUE args)
1515 struct sleep_call *p = (struct sleep_call *)args;
1516 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1520 * Document-method: Thread::ConditionVariable#wait
1521 * call-seq: wait(mutex, timeout=nil)
1523 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1525 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1526 * even if no other thread doesn't signal.
1528 * Returns the slept result on +mutex+.
1531 static VALUE
1532 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1534 rb_execution_context_t *ec = GET_EC();
1536 struct rb_condvar *cv = condvar_ptr(self);
1537 struct sleep_call args;
1539 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1541 struct sync_waiter sync_waiter = {
1542 .self = args.mutex,
1543 .th = ec->thread_ptr,
1544 .fiber = nonblocking_fiber(ec->fiber_ptr)
1547 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1548 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1552 * Document-method: Thread::ConditionVariable#signal
1554 * Wakes up the first thread in line waiting for this lock.
1557 static VALUE
1558 rb_condvar_signal(VALUE self)
1560 struct rb_condvar *cv = condvar_ptr(self);
1561 wakeup_one(&cv->waitq);
1562 return self;
1566 * Document-method: Thread::ConditionVariable#broadcast
1568 * Wakes up all threads waiting for this lock.
1571 static VALUE
1572 rb_condvar_broadcast(VALUE self)
1574 struct rb_condvar *cv = condvar_ptr(self);
1575 wakeup_all(&cv->waitq);
1576 return self;
1579 NORETURN(static VALUE undumpable(VALUE obj));
1580 /* :nodoc: */
1581 static VALUE
1582 undumpable(VALUE obj)
1584 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1585 UNREACHABLE_RETURN(Qnil);
1588 static VALUE
1589 define_thread_class(VALUE outer, const ID name, VALUE super)
1591 VALUE klass = rb_define_class_id_under(outer, name, super);
1592 rb_const_set(rb_cObject, name, klass);
1593 return klass;
1596 static void
1597 Init_thread_sync(void)
1599 #undef rb_intern
1600 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1601 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1602 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1603 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1604 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1605 #endif
1607 #define DEFINE_CLASS(name, super) \
1608 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1610 /* Mutex */
1611 DEFINE_CLASS(Mutex, Object);
1612 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1613 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1614 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1615 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1616 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1617 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1618 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1619 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1620 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1622 /* Queue */
1623 DEFINE_CLASS(Queue, Object);
1624 rb_define_alloc_func(rb_cQueue, queue_alloc);
1626 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1628 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1629 rb_undef_method(rb_cQueue, "initialize_copy");
1630 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1631 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1632 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1633 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1634 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1635 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1636 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1637 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1638 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1640 rb_define_alias(rb_cQueue, "enq", "push");
1641 rb_define_alias(rb_cQueue, "<<", "push");
1642 rb_define_alias(rb_cQueue, "size", "length");
1644 DEFINE_CLASS(SizedQueue, Queue);
1645 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1647 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1648 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1649 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1650 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1651 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1652 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1653 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1654 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1655 rb_define_alias(rb_cSizedQueue, "size", "length");
1657 /* CVar */
1658 DEFINE_CLASS(ConditionVariable, Object);
1659 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1661 id_sleep = rb_intern("sleep");
1663 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1664 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1665 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1666 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1667 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1668 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1670 rb_provide("thread.rb");
1673 #include "thread_sync.rbinc"