[ruby/etc] bump up to 1.3.1
[ruby-80x24.org.git] / thread_sync.c
blobeaf2c025b9bbd33a87522bed30705fa168f2a267
1 /* included by thread.c */
2 #include "ccan/list/list.h"
4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
7 /* Mutex */
8 typedef struct rb_mutex_struct {
9 rb_fiber_t *fiber;
10 struct rb_mutex_struct *next_mutex;
11 struct list_head waitq; /* protected by GVL */
12 } rb_mutex_t;
14 /* sync_waiter is always on-stack */
15 struct sync_waiter {
16 VALUE self;
17 rb_thread_t *th;
18 rb_fiber_t *fiber;
19 struct list_node node;
22 #define MUTEX_ALLOW_TRAP FL_USER1
24 static void
25 sync_wakeup(struct list_head *head, long max)
27 struct sync_waiter *cur = 0, *next;
29 list_for_each_safe(head, cur, next, node) {
30 list_del_init(&cur->node);
32 if (cur->th->status != THREAD_KILLED) {
34 if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
35 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
37 else {
38 rb_threadptr_interrupt(cur->th);
39 cur->th->status = THREAD_RUNNABLE;
42 if (--max == 0) return;
47 static void
48 wakeup_one(struct list_head *head)
50 sync_wakeup(head, 1);
53 static void
54 wakeup_all(struct list_head *head)
56 sync_wakeup(head, LONG_MAX);
59 #if defined(HAVE_WORKING_FORK)
60 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
61 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
62 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
63 #endif
64 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
67 * Document-class: Thread::Mutex
69 * Thread::Mutex implements a simple semaphore that can be used to
70 * coordinate access to shared data from multiple concurrent threads.
72 * Example:
74 * semaphore = Thread::Mutex.new
76 * a = Thread.new {
77 * semaphore.synchronize {
78 * # access shared resource
79 * }
80 * }
82 * b = Thread.new {
83 * semaphore.synchronize {
84 * # access shared resource
85 * }
86 * }
90 #define mutex_mark ((void(*)(void*))0)
92 static size_t
93 rb_mutex_num_waiting(rb_mutex_t *mutex)
95 struct sync_waiter *w = 0;
96 size_t n = 0;
98 list_for_each(&mutex->waitq, w, node) {
99 n++;
102 return n;
105 rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
107 static void
108 mutex_free(void *ptr)
110 rb_mutex_t *mutex = ptr;
111 if (mutex->fiber) {
112 /* rb_warn("free locked mutex"); */
113 const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
114 if (err) rb_bug("%s", err);
116 ruby_xfree(ptr);
119 static size_t
120 mutex_memsize(const void *ptr)
122 return sizeof(rb_mutex_t);
125 static const rb_data_type_t mutex_data_type = {
126 "mutex",
127 {mutex_mark, mutex_free, mutex_memsize,},
128 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
131 static rb_mutex_t *
132 mutex_ptr(VALUE obj)
134 rb_mutex_t *mutex;
136 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
138 return mutex;
141 VALUE
142 rb_obj_is_mutex(VALUE obj)
144 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
147 static VALUE
148 mutex_alloc(VALUE klass)
150 VALUE obj;
151 rb_mutex_t *mutex;
153 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
155 list_head_init(&mutex->waitq);
156 return obj;
160 * call-seq:
161 * Thread::Mutex.new -> mutex
163 * Creates a new Mutex
165 static VALUE
166 mutex_initialize(VALUE self)
168 return self;
171 VALUE
172 rb_mutex_new(void)
174 return mutex_alloc(rb_cMutex);
178 * call-seq:
179 * mutex.locked? -> true or false
181 * Returns +true+ if this lock is currently held by some thread.
183 VALUE
184 rb_mutex_locked_p(VALUE self)
186 rb_mutex_t *mutex = mutex_ptr(self);
188 return RBOOL(mutex->fiber);
191 static void
192 thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
194 if (thread->keeping_mutexes) {
195 mutex->next_mutex = thread->keeping_mutexes;
198 thread->keeping_mutexes = mutex;
201 static void
202 thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
204 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
206 while (*keeping_mutexes && *keeping_mutexes != mutex) {
207 // Move to the next mutex in the list:
208 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
211 if (*keeping_mutexes) {
212 *keeping_mutexes = mutex->next_mutex;
213 mutex->next_mutex = NULL;
217 static void
218 mutex_locked(rb_thread_t *th, VALUE self)
220 rb_mutex_t *mutex = mutex_ptr(self);
222 thread_mutex_insert(th, mutex);
226 * call-seq:
227 * mutex.try_lock -> true or false
229 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
230 * lock was granted.
232 VALUE
233 rb_mutex_trylock(VALUE self)
235 rb_mutex_t *mutex = mutex_ptr(self);
237 if (mutex->fiber == 0) {
238 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
239 rb_thread_t *th = GET_THREAD();
240 mutex->fiber = fiber;
242 mutex_locked(th, self);
243 return Qtrue;
246 return Qfalse;
250 * At maximum, only one thread can use cond_timedwait and watch deadlock
251 * periodically. Multiple polling thread (i.e. concurrent deadlock check)
252 * introduces new race conditions. [Bug #6278] [ruby-core:44275]
254 static const rb_thread_t *patrol_thread = NULL;
256 static VALUE
257 mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
259 return RBOOL(mutex->fiber == fiber);
262 static VALUE
263 call_rb_fiber_scheduler_block(VALUE mutex)
265 return rb_fiber_scheduler_block(rb_fiber_scheduler_current(), mutex, Qnil);
268 static VALUE
269 delete_from_waitq(VALUE value)
271 struct sync_waiter *sync_waiter = (void *)value;
272 list_del(&sync_waiter->node);
274 return Qnil;
277 static VALUE
278 do_mutex_lock(VALUE self, int interruptible_p)
280 rb_execution_context_t *ec = GET_EC();
281 rb_thread_t *th = ec->thread_ptr;
282 rb_fiber_t *fiber = ec->fiber_ptr;
283 rb_mutex_t *mutex = mutex_ptr(self);
285 /* When running trap handler */
286 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
287 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
288 rb_raise(rb_eThreadError, "can't be called from trap context");
291 if (rb_mutex_trylock(self) == Qfalse) {
292 if (mutex->fiber == fiber) {
293 rb_raise(rb_eThreadError, "deadlock; recursive locking");
296 while (mutex->fiber != fiber) {
297 VALUE scheduler = rb_fiber_scheduler_current();
298 if (scheduler != Qnil) {
299 struct sync_waiter sync_waiter = {
300 .self = self,
301 .th = th,
302 .fiber = fiber
305 list_add_tail(&mutex->waitq, &sync_waiter.node);
307 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
309 if (!mutex->fiber) {
310 mutex->fiber = fiber;
313 else {
314 enum rb_thread_status prev_status = th->status;
315 rb_hrtime_t *timeout = 0;
316 rb_hrtime_t rel = rb_msec2hrtime(100);
318 th->status = THREAD_STOPPED_FOREVER;
319 th->locking_mutex = self;
320 rb_ractor_sleeper_threads_inc(th->ractor);
322 * Carefully! while some contended threads are in native_sleep(),
323 * ractor->sleeper is unstable value. we have to avoid both deadlock
324 * and busy loop.
326 if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
327 !patrol_thread) {
328 timeout = &rel;
329 patrol_thread = th;
332 struct sync_waiter sync_waiter = {
333 .self = self,
334 .th = th,
335 .fiber = fiber
338 list_add_tail(&mutex->waitq, &sync_waiter.node);
340 native_sleep(th, timeout); /* release GVL */
342 list_del(&sync_waiter.node);
344 if (!mutex->fiber) {
345 mutex->fiber = fiber;
348 if (patrol_thread == th)
349 patrol_thread = NULL;
351 th->locking_mutex = Qfalse;
352 if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
353 rb_check_deadlock(th->ractor);
355 if (th->status == THREAD_STOPPED_FOREVER) {
356 th->status = prev_status;
358 rb_ractor_sleeper_threads_dec(th->ractor);
361 if (interruptible_p) {
362 /* release mutex before checking for interrupts...as interrupt checking
363 * code might call rb_raise() */
364 if (mutex->fiber == fiber) mutex->fiber = 0;
365 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
366 if (!mutex->fiber) {
367 mutex->fiber = fiber;
372 if (mutex->fiber == fiber) mutex_locked(th, self);
375 // assertion
376 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
378 return self;
381 static VALUE
382 mutex_lock_uninterruptible(VALUE self)
384 return do_mutex_lock(self, 0);
388 * call-seq:
389 * mutex.lock -> self
391 * Attempts to grab the lock and waits if it isn't available.
392 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
394 VALUE
395 rb_mutex_lock(VALUE self)
397 return do_mutex_lock(self, 1);
401 * call-seq:
402 * mutex.owned? -> true or false
404 * Returns +true+ if this lock is currently held by current thread.
406 VALUE
407 rb_mutex_owned_p(VALUE self)
409 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
410 rb_mutex_t *mutex = mutex_ptr(self);
412 return mutex_owned_p(fiber, mutex);
415 static const char *
416 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
418 const char *err = NULL;
420 if (mutex->fiber == 0) {
421 err = "Attempt to unlock a mutex which is not locked";
423 else if (mutex->fiber != fiber) {
424 err = "Attempt to unlock a mutex which is locked by another thread/fiber";
426 else {
427 struct sync_waiter *cur = 0, *next;
429 mutex->fiber = 0;
430 list_for_each_safe(&mutex->waitq, cur, next, node) {
431 list_del_init(&cur->node);
433 if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
434 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
435 goto found;
437 else {
438 switch (cur->th->status) {
439 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
440 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
441 rb_threadptr_interrupt(cur->th);
442 goto found;
443 case THREAD_STOPPED: /* probably impossible */
444 rb_bug("unexpected THREAD_STOPPED");
445 case THREAD_KILLED:
446 /* not sure about this, possible in exit GC? */
447 rb_bug("unexpected THREAD_KILLED");
448 continue;
453 found:
454 thread_mutex_remove(th, mutex);
457 return err;
461 * call-seq:
462 * mutex.unlock -> self
464 * Releases the lock.
465 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
467 VALUE
468 rb_mutex_unlock(VALUE self)
470 const char *err;
471 rb_mutex_t *mutex = mutex_ptr(self);
472 rb_thread_t *th = GET_THREAD();
474 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
475 if (err) rb_raise(rb_eThreadError, "%s", err);
477 return self;
480 #if defined(HAVE_WORKING_FORK)
481 static void
482 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
484 rb_mutex_abandon_all(th->keeping_mutexes);
485 th->keeping_mutexes = NULL;
488 static void
489 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
491 if (th->locking_mutex) {
492 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
494 list_head_init(&mutex->waitq);
495 th->locking_mutex = Qfalse;
499 static void
500 rb_mutex_abandon_all(rb_mutex_t *mutexes)
502 rb_mutex_t *mutex;
504 while (mutexes) {
505 mutex = mutexes;
506 mutexes = mutex->next_mutex;
507 mutex->fiber = 0;
508 mutex->next_mutex = 0;
509 list_head_init(&mutex->waitq);
512 #endif
514 static VALUE
515 rb_mutex_sleep_forever(VALUE self)
517 rb_thread_sleep_deadly_allow_spurious_wakeup(self);
518 return Qnil;
521 static VALUE
522 rb_mutex_wait_for(VALUE time)
524 rb_hrtime_t *rel = (rb_hrtime_t *)time;
525 /* permit spurious check */
526 return RBOOL(sleep_hrtime(GET_THREAD(), *rel, 0));
529 VALUE
530 rb_mutex_sleep(VALUE self, VALUE timeout)
532 struct timeval t;
533 VALUE woken = Qtrue;
535 if (!NIL_P(timeout)) {
536 t = rb_time_interval(timeout);
539 rb_mutex_unlock(self);
540 time_t beg = time(0);
542 VALUE scheduler = rb_fiber_scheduler_current();
543 if (scheduler != Qnil) {
544 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
545 mutex_lock_uninterruptible(self);
547 else {
548 if (NIL_P(timeout)) {
549 rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self);
551 else {
552 rb_hrtime_t rel = rb_timeval2hrtime(&t);
553 woken = rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
557 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
558 if (!woken) return Qnil;
559 time_t end = time(0) - beg;
560 return TIMET2NUM(end);
564 * call-seq:
565 * mutex.sleep(timeout = nil) -> number or nil
567 * Releases the lock and sleeps +timeout+ seconds if it is given and
568 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
569 * the current thread.
571 * When the thread is next woken up, it will attempt to reacquire
572 * the lock.
574 * Note that this method can wakeup without explicit Thread#wakeup call.
575 * For example, receiving signal and so on.
577 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
579 static VALUE
580 mutex_sleep(int argc, VALUE *argv, VALUE self)
582 VALUE timeout;
584 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
585 return rb_mutex_sleep(self, timeout);
589 * call-seq:
590 * mutex.synchronize { ... } -> result of the block
592 * Obtains a lock, runs the block, and releases the lock when the block
593 * completes. See the example under Thread::Mutex.
596 VALUE
597 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
599 rb_mutex_lock(mutex);
600 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
604 * call-seq:
605 * mutex.synchronize { ... } -> result of the block
607 * Obtains a lock, runs the block, and releases the lock when the block
608 * completes. See the example under Thread::Mutex.
610 static VALUE
611 rb_mutex_synchronize_m(VALUE self)
613 if (!rb_block_given_p()) {
614 rb_raise(rb_eThreadError, "must be called with a block");
617 return rb_mutex_synchronize(self, rb_yield, Qundef);
620 void rb_mutex_allow_trap(VALUE self, int val)
622 Check_TypedStruct(self, &mutex_data_type);
624 if (val)
625 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
626 else
627 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
630 /* Queue */
632 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
633 PACKED_STRUCT_UNALIGNED(struct rb_queue {
634 struct list_head waitq;
635 rb_serial_t fork_gen;
636 const VALUE que;
637 int num_waiting;
640 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
641 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
642 PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
643 struct rb_queue q;
644 int num_waiting_push;
645 struct list_head pushq;
646 long max;
649 static void
650 queue_mark(void *ptr)
652 struct rb_queue *q = ptr;
654 /* no need to mark threads in waitq, they are on stack */
655 rb_gc_mark(q->que);
658 static size_t
659 queue_memsize(const void *ptr)
661 return sizeof(struct rb_queue);
664 static const rb_data_type_t queue_data_type = {
665 "queue",
666 {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
667 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
670 static VALUE
671 queue_alloc(VALUE klass)
673 VALUE obj;
674 struct rb_queue *q;
676 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
677 list_head_init(queue_waitq(q));
678 return obj;
681 static int
682 queue_fork_check(struct rb_queue *q)
684 rb_serial_t fork_gen = GET_VM()->fork_gen;
686 if (q->fork_gen == fork_gen) {
687 return 0;
689 /* forked children can't reach into parent thread stacks */
690 q->fork_gen = fork_gen;
691 list_head_init(queue_waitq(q));
692 q->num_waiting = 0;
693 return 1;
696 static struct rb_queue *
697 queue_ptr(VALUE obj)
699 struct rb_queue *q;
701 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
702 queue_fork_check(q);
704 return q;
707 #define QUEUE_CLOSED FL_USER5
709 static void
710 szqueue_mark(void *ptr)
712 struct rb_szqueue *sq = ptr;
714 queue_mark(&sq->q);
717 static size_t
718 szqueue_memsize(const void *ptr)
720 return sizeof(struct rb_szqueue);
723 static const rb_data_type_t szqueue_data_type = {
724 "sized_queue",
725 {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
726 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
729 static VALUE
730 szqueue_alloc(VALUE klass)
732 struct rb_szqueue *sq;
733 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
734 &szqueue_data_type, sq);
735 list_head_init(szqueue_waitq(sq));
736 list_head_init(szqueue_pushq(sq));
737 return obj;
740 static struct rb_szqueue *
741 szqueue_ptr(VALUE obj)
743 struct rb_szqueue *sq;
745 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
746 if (queue_fork_check(&sq->q)) {
747 list_head_init(szqueue_pushq(sq));
748 sq->num_waiting_push = 0;
751 return sq;
754 static VALUE
755 ary_buf_new(void)
757 return rb_ary_tmp_new(1);
760 static VALUE
761 check_array(VALUE obj, VALUE ary)
763 if (!RB_TYPE_P(ary, T_ARRAY)) {
764 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
766 return ary;
769 static long
770 queue_length(VALUE self, struct rb_queue *q)
772 return RARRAY_LEN(check_array(self, q->que));
775 static int
776 queue_closed_p(VALUE self)
778 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
782 * Document-class: ClosedQueueError
784 * The exception class which will be raised when pushing into a closed
785 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
788 NORETURN(static void raise_closed_queue_error(VALUE self));
790 static void
791 raise_closed_queue_error(VALUE self)
793 rb_raise(rb_eClosedQueueError, "queue closed");
796 static VALUE
797 queue_closed_result(VALUE self, struct rb_queue *q)
799 assert(queue_length(self, q) == 0);
800 return Qnil;
804 * Document-class: Thread::Queue
806 * The Thread::Queue class implements multi-producer, multi-consumer
807 * queues. It is especially useful in threaded programming when
808 * information must be exchanged safely between multiple threads. The
809 * Thread::Queue class implements all the required locking semantics.
811 * The class implements FIFO type of queue. In a FIFO queue, the first
812 * tasks added are the first retrieved.
814 * Example:
816 * queue = Thread::Queue.new
818 * producer = Thread.new do
819 * 5.times do |i|
820 * sleep rand(i) # simulate expense
821 * queue << i
822 * puts "#{i} produced"
823 * end
824 * end
826 * consumer = Thread.new do
827 * 5.times do |i|
828 * value = queue.pop
829 * sleep rand(i/2) # simulate expense
830 * puts "consumed #{value}"
831 * end
832 * end
834 * consumer.join
839 * Document-method: Queue::new
841 * call-seq:
842 * Thread::Queue.new -> empty_queue
843 * Thread::Queue.new(enumerable) -> queue
845 * Creates a new queue instance, optionally using the contents of an +enumerable+
846 * for its initial state.
848 * Example:
850 * q = Thread::Queue.new
851 * #=> #<Thread::Queue:0x00007ff7501110d0>
852 * q.empty?
853 * #=> true
855 * q = Thread::Queue.new([1, 2, 3])
856 * #=> #<Thread::Queue:0x00007ff7500ec500>
857 * q.empty?
858 * #=> false
859 * q.pop
860 * #=> 1
863 static VALUE
864 rb_queue_initialize(int argc, VALUE *argv, VALUE self)
866 VALUE initial;
867 struct rb_queue *q = queue_ptr(self);
868 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
869 initial = rb_to_array(initial);
871 RB_OBJ_WRITE(self, &q->que, ary_buf_new());
872 list_head_init(queue_waitq(q));
873 if (argc == 1) {
874 rb_ary_concat(q->que, initial);
876 return self;
879 static VALUE
880 queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
882 if (queue_closed_p(self)) {
883 raise_closed_queue_error(self);
885 rb_ary_push(check_array(self, q->que), obj);
886 wakeup_one(queue_waitq(q));
887 return self;
891 * Document-method: Thread::Queue#close
892 * call-seq:
893 * close
895 * Closes the queue. A closed queue cannot be re-opened.
897 * After the call to close completes, the following are true:
899 * - +closed?+ will return true
901 * - +close+ will be ignored.
903 * - calling enq/push/<< will raise a +ClosedQueueError+.
905 * - when +empty?+ is false, calling deq/pop/shift will return an object
906 * from the queue as usual.
907 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
908 * deq(true) will raise a +ThreadError+.
910 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
912 * Example:
914 * q = Thread::Queue.new
915 * Thread.new{
916 * while e = q.deq # wait for nil to break loop
917 * # ...
918 * end
920 * q.close
923 static VALUE
924 rb_queue_close(VALUE self)
926 struct rb_queue *q = queue_ptr(self);
928 if (!queue_closed_p(self)) {
929 FL_SET(self, QUEUE_CLOSED);
931 wakeup_all(queue_waitq(q));
934 return self;
938 * Document-method: Thread::Queue#closed?
939 * call-seq: closed?
941 * Returns +true+ if the queue is closed.
944 static VALUE
945 rb_queue_closed_p(VALUE self)
947 return RBOOL(queue_closed_p(self));
951 * Document-method: Thread::Queue#push
952 * call-seq:
953 * push(object)
954 * enq(object)
955 * <<(object)
957 * Pushes the given +object+ to the queue.
960 static VALUE
961 rb_queue_push(VALUE self, VALUE obj)
963 return queue_do_push(self, queue_ptr(self), obj);
966 static VALUE
967 queue_sleep(VALUE self)
969 rb_thread_sleep_deadly_allow_spurious_wakeup(self);
970 return Qnil;
973 struct queue_waiter {
974 struct sync_waiter w;
975 union {
976 struct rb_queue *q;
977 struct rb_szqueue *sq;
978 } as;
981 static VALUE
982 queue_sleep_done(VALUE p)
984 struct queue_waiter *qw = (struct queue_waiter *)p;
986 list_del(&qw->w.node);
987 qw->as.q->num_waiting--;
989 return Qfalse;
992 static VALUE
993 szqueue_sleep_done(VALUE p)
995 struct queue_waiter *qw = (struct queue_waiter *)p;
997 list_del(&qw->w.node);
998 qw->as.sq->num_waiting_push--;
1000 return Qfalse;
1003 static VALUE
1004 queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
1006 check_array(self, q->que);
1008 while (RARRAY_LEN(q->que) == 0) {
1009 if (!should_block) {
1010 rb_raise(rb_eThreadError, "queue empty");
1012 else if (queue_closed_p(self)) {
1013 return queue_closed_result(self, q);
1015 else {
1016 rb_execution_context_t *ec = GET_EC();
1018 assert(RARRAY_LEN(q->que) == 0);
1019 assert(queue_closed_p(self) == 0);
1021 struct queue_waiter queue_waiter = {
1022 .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1023 .as = {.q = q}
1026 struct list_head *waitq = queue_waitq(q);
1028 list_add_tail(waitq, &queue_waiter.w.node);
1029 queue_waiter.as.q->num_waiting++;
1031 rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter);
1035 return rb_ary_shift(q->que);
1038 static int
1039 queue_pop_should_block(int argc, const VALUE *argv)
1041 int should_block = 1;
1042 rb_check_arity(argc, 0, 1);
1043 if (argc > 0) {
1044 should_block = !RTEST(argv[0]);
1046 return should_block;
1050 * Document-method: Thread::Queue#pop
1051 * call-seq:
1052 * pop(non_block=false)
1053 * deq(non_block=false)
1054 * shift(non_block=false)
1056 * Retrieves data from the queue.
1058 * If the queue is empty, the calling thread is suspended until data is pushed
1059 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1060 * +ThreadError+ is raised.
1063 static VALUE
1064 rb_queue_pop(int argc, VALUE *argv, VALUE self)
1066 int should_block = queue_pop_should_block(argc, argv);
1067 return queue_do_pop(self, queue_ptr(self), should_block);
1071 * Document-method: Thread::Queue#empty?
1072 * call-seq: empty?
1074 * Returns +true+ if the queue is empty.
1077 static VALUE
1078 rb_queue_empty_p(VALUE self)
1080 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1084 * Document-method: Thread::Queue#clear
1086 * Removes all objects from the queue.
1089 static VALUE
1090 rb_queue_clear(VALUE self)
1092 struct rb_queue *q = queue_ptr(self);
1094 rb_ary_clear(check_array(self, q->que));
1095 return self;
1099 * Document-method: Thread::Queue#length
1100 * call-seq:
1101 * length
1102 * size
1104 * Returns the length of the queue.
1107 static VALUE
1108 rb_queue_length(VALUE self)
1110 return LONG2NUM(queue_length(self, queue_ptr(self)));
1114 * Document-method: Thread::Queue#num_waiting
1116 * Returns the number of threads waiting on the queue.
1119 static VALUE
1120 rb_queue_num_waiting(VALUE self)
1122 struct rb_queue *q = queue_ptr(self);
1124 return INT2NUM(q->num_waiting);
1128 * Document-class: Thread::SizedQueue
1130 * This class represents queues of specified size capacity. The push operation
1131 * may be blocked if the capacity is full.
1133 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1137 * Document-method: SizedQueue::new
1138 * call-seq: new(max)
1140 * Creates a fixed-length queue with a maximum size of +max+.
1143 static VALUE
1144 rb_szqueue_initialize(VALUE self, VALUE vmax)
1146 long max;
1147 struct rb_szqueue *sq = szqueue_ptr(self);
1149 max = NUM2LONG(vmax);
1150 if (max <= 0) {
1151 rb_raise(rb_eArgError, "queue size must be positive");
1154 RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
1155 list_head_init(szqueue_waitq(sq));
1156 list_head_init(szqueue_pushq(sq));
1157 sq->max = max;
1159 return self;
1163 * Document-method: Thread::SizedQueue#close
1164 * call-seq:
1165 * close
1167 * Similar to Thread::Queue#close.
1169 * The difference is behavior with waiting enqueuing threads.
1171 * If there are waiting enqueuing threads, they are interrupted by
1172 * raising ClosedQueueError('queue closed').
1174 static VALUE
1175 rb_szqueue_close(VALUE self)
1177 if (!queue_closed_p(self)) {
1178 struct rb_szqueue *sq = szqueue_ptr(self);
1180 FL_SET(self, QUEUE_CLOSED);
1181 wakeup_all(szqueue_waitq(sq));
1182 wakeup_all(szqueue_pushq(sq));
1184 return self;
1188 * Document-method: Thread::SizedQueue#max
1190 * Returns the maximum size of the queue.
1193 static VALUE
1194 rb_szqueue_max_get(VALUE self)
1196 return LONG2NUM(szqueue_ptr(self)->max);
1200 * Document-method: Thread::SizedQueue#max=
1201 * call-seq: max=(number)
1203 * Sets the maximum size of the queue to the given +number+.
1206 static VALUE
1207 rb_szqueue_max_set(VALUE self, VALUE vmax)
1209 long max = NUM2LONG(vmax);
1210 long diff = 0;
1211 struct rb_szqueue *sq = szqueue_ptr(self);
1213 if (max <= 0) {
1214 rb_raise(rb_eArgError, "queue size must be positive");
1216 if (max > sq->max) {
1217 diff = max - sq->max;
1219 sq->max = max;
1220 sync_wakeup(szqueue_pushq(sq), diff);
1221 return vmax;
1224 static int
1225 szqueue_push_should_block(int argc, const VALUE *argv)
1227 int should_block = 1;
1228 rb_check_arity(argc, 1, 2);
1229 if (argc > 1) {
1230 should_block = !RTEST(argv[1]);
1232 return should_block;
1236 * Document-method: Thread::SizedQueue#push
1237 * call-seq:
1238 * push(object, non_block=false)
1239 * enq(object, non_block=false)
1240 * <<(object)
1242 * Pushes +object+ to the queue.
1244 * If there is no space left in the queue, waits until space becomes
1245 * available, unless +non_block+ is true. If +non_block+ is true, the
1246 * thread isn't suspended, and +ThreadError+ is raised.
1249 static VALUE
1250 rb_szqueue_push(int argc, VALUE *argv, VALUE self)
1252 struct rb_szqueue *sq = szqueue_ptr(self);
1253 int should_block = szqueue_push_should_block(argc, argv);
1255 while (queue_length(self, &sq->q) >= sq->max) {
1256 if (!should_block) {
1257 rb_raise(rb_eThreadError, "queue full");
1259 else if (queue_closed_p(self)) {
1260 break;
1262 else {
1263 rb_execution_context_t *ec = GET_EC();
1264 struct queue_waiter queue_waiter = {
1265 .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
1266 .as = {.sq = sq}
1269 struct list_head *pushq = szqueue_pushq(sq);
1271 list_add_tail(pushq, &queue_waiter.w.node);
1272 sq->num_waiting_push++;
1274 rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter);
1278 if (queue_closed_p(self)) {
1279 raise_closed_queue_error(self);
1282 return queue_do_push(self, &sq->q, argv[0]);
1285 static VALUE
1286 szqueue_do_pop(VALUE self, int should_block)
1288 struct rb_szqueue *sq = szqueue_ptr(self);
1289 VALUE retval = queue_do_pop(self, &sq->q, should_block);
1291 if (queue_length(self, &sq->q) < sq->max) {
1292 wakeup_one(szqueue_pushq(sq));
1295 return retval;
1299 * Document-method: Thread::SizedQueue#pop
1300 * call-seq:
1301 * pop(non_block=false)
1302 * deq(non_block=false)
1303 * shift(non_block=false)
1305 * Retrieves data from the queue.
1307 * If the queue is empty, the calling thread is suspended until data is pushed
1308 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1309 * +ThreadError+ is raised.
1312 static VALUE
1313 rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
1315 int should_block = queue_pop_should_block(argc, argv);
1316 return szqueue_do_pop(self, should_block);
1320 * Document-method: Thread::SizedQueue#clear
1322 * Removes all objects from the queue.
1325 static VALUE
1326 rb_szqueue_clear(VALUE self)
1328 struct rb_szqueue *sq = szqueue_ptr(self);
1330 rb_ary_clear(check_array(self, sq->q.que));
1331 wakeup_all(szqueue_pushq(sq));
1332 return self;
1336 * Document-method: Thread::SizedQueue#length
1337 * call-seq:
1338 * length
1339 * size
1341 * Returns the length of the queue.
1344 static VALUE
1345 rb_szqueue_length(VALUE self)
1347 struct rb_szqueue *sq = szqueue_ptr(self);
1349 return LONG2NUM(queue_length(self, &sq->q));
1353 * Document-method: Thread::SizedQueue#num_waiting
1355 * Returns the number of threads waiting on the queue.
1358 static VALUE
1359 rb_szqueue_num_waiting(VALUE self)
1361 struct rb_szqueue *sq = szqueue_ptr(self);
1363 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1367 * Document-method: Thread::SizedQueue#empty?
1368 * call-seq: empty?
1370 * Returns +true+ if the queue is empty.
1373 static VALUE
1374 rb_szqueue_empty_p(VALUE self)
1376 struct rb_szqueue *sq = szqueue_ptr(self);
1378 return RBOOL(queue_length(self, &sq->q) == 0);
1382 /* ConditionalVariable */
1383 struct rb_condvar {
1384 struct list_head waitq;
1385 rb_serial_t fork_gen;
1389 * Document-class: Thread::ConditionVariable
1391 * ConditionVariable objects augment class Mutex. Using condition variables,
1392 * it is possible to suspend while in the middle of a critical section until a
1393 * resource becomes available.
1395 * Example:
1397 * mutex = Thread::Mutex.new
1398 * resource = Thread::ConditionVariable.new
1400 * a = Thread.new {
1401 * mutex.synchronize {
1402 * # Thread 'a' now needs the resource
1403 * resource.wait(mutex)
1404 * # 'a' can now have the resource
1408 * b = Thread.new {
1409 * mutex.synchronize {
1410 * # Thread 'b' has finished using the resource
1411 * resource.signal
1416 static size_t
1417 condvar_memsize(const void *ptr)
1419 return sizeof(struct rb_condvar);
1422 static const rb_data_type_t cv_data_type = {
1423 "condvar",
1424 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1425 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1428 static struct rb_condvar *
1429 condvar_ptr(VALUE self)
1431 struct rb_condvar *cv;
1432 rb_serial_t fork_gen = GET_VM()->fork_gen;
1434 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1436 /* forked children can't reach into parent thread stacks */
1437 if (cv->fork_gen != fork_gen) {
1438 cv->fork_gen = fork_gen;
1439 list_head_init(&cv->waitq);
1442 return cv;
1445 static VALUE
1446 condvar_alloc(VALUE klass)
1448 struct rb_condvar *cv;
1449 VALUE obj;
1451 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1452 list_head_init(&cv->waitq);
1454 return obj;
1458 * Document-method: ConditionVariable::new
1460 * Creates a new condition variable instance.
1463 static VALUE
1464 rb_condvar_initialize(VALUE self)
1466 struct rb_condvar *cv = condvar_ptr(self);
1467 list_head_init(&cv->waitq);
1468 return self;
1471 struct sleep_call {
1472 VALUE mutex;
1473 VALUE timeout;
1476 static ID id_sleep;
1478 static VALUE
1479 do_sleep(VALUE args)
1481 struct sleep_call *p = (struct sleep_call *)args;
1482 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1486 * Document-method: Thread::ConditionVariable#wait
1487 * call-seq: wait(mutex, timeout=nil)
1489 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1491 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1492 * even if no other thread doesn't signal.
1494 * Returns the slept result on +mutex+.
1497 static VALUE
1498 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1500 rb_execution_context_t *ec = GET_EC();
1502 struct rb_condvar *cv = condvar_ptr(self);
1503 struct sleep_call args;
1505 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1507 struct sync_waiter sync_waiter = {
1508 .self = args.mutex,
1509 .th = ec->thread_ptr,
1510 .fiber = ec->fiber_ptr
1513 list_add_tail(&cv->waitq, &sync_waiter.node);
1514 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1518 * Document-method: Thread::ConditionVariable#signal
1520 * Wakes up the first thread in line waiting for this lock.
1523 static VALUE
1524 rb_condvar_signal(VALUE self)
1526 struct rb_condvar *cv = condvar_ptr(self);
1527 wakeup_one(&cv->waitq);
1528 return self;
1532 * Document-method: Thread::ConditionVariable#broadcast
1534 * Wakes up all threads waiting for this lock.
1537 static VALUE
1538 rb_condvar_broadcast(VALUE self)
1540 struct rb_condvar *cv = condvar_ptr(self);
1541 wakeup_all(&cv->waitq);
1542 return self;
1545 NORETURN(static VALUE undumpable(VALUE obj));
1546 /* :nodoc: */
1547 static VALUE
1548 undumpable(VALUE obj)
1550 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1551 UNREACHABLE_RETURN(Qnil);
1554 static VALUE
1555 define_thread_class(VALUE outer, const ID name, VALUE super)
1557 VALUE klass = rb_define_class_id_under(outer, name, super);
1558 rb_const_set(rb_cObject, name, klass);
1559 return klass;
1562 static void
1563 Init_thread_sync(void)
1565 #undef rb_intern
1566 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1567 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1568 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1569 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1570 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1571 #endif
1573 #define DEFINE_CLASS(name, super) \
1574 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1576 /* Mutex */
1577 DEFINE_CLASS(Mutex, Object);
1578 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1579 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1580 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1581 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1582 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1583 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1584 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1585 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1586 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1588 /* Queue */
1589 DEFINE_CLASS(Queue, Object);
1590 rb_define_alloc_func(rb_cQueue, queue_alloc);
1592 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1594 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1595 rb_undef_method(rb_cQueue, "initialize_copy");
1596 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1597 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1598 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1599 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1600 rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
1601 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1602 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1603 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1604 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1606 rb_define_alias(rb_cQueue, "enq", "push");
1607 rb_define_alias(rb_cQueue, "<<", "push");
1608 rb_define_alias(rb_cQueue, "deq", "pop");
1609 rb_define_alias(rb_cQueue, "shift", "pop");
1610 rb_define_alias(rb_cQueue, "size", "length");
1612 DEFINE_CLASS(SizedQueue, Queue);
1613 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1615 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1616 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1617 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1618 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1619 rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
1620 rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
1621 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1622 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1623 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1624 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1626 rb_define_alias(rb_cSizedQueue, "enq", "push");
1627 rb_define_alias(rb_cSizedQueue, "<<", "push");
1628 rb_define_alias(rb_cSizedQueue, "deq", "pop");
1629 rb_define_alias(rb_cSizedQueue, "shift", "pop");
1630 rb_define_alias(rb_cSizedQueue, "size", "length");
1632 /* CVar */
1633 DEFINE_CLASS(ConditionVariable, Object);
1634 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1636 id_sleep = rb_intern("sleep");
1638 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1639 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1640 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1641 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1642 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1643 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1645 rb_provide("thread.rb");