1 // Ractor implementation
4 #include "ruby/thread.h"
5 #include "ruby/ractor.h"
6 #include "ruby/thread_native.h"
8 #include "eval_intern.h"
10 #include "ractor_core.h"
11 #include "internal/complex.h"
12 #include "internal/error.h"
13 #include "internal/gc.h"
14 #include "internal/hash.h"
15 #include "internal/rational.h"
16 #include "internal/struct.h"
17 #include "internal/thread.h"
23 static VALUE rb_cRactorSelector
;
25 VALUE rb_eRactorUnsafeError
;
26 VALUE rb_eRactorIsolationError
;
27 static VALUE rb_eRactorError
;
28 static VALUE rb_eRactorRemoteError
;
29 static VALUE rb_eRactorMovedError
;
30 static VALUE rb_eRactorClosedError
;
31 static VALUE rb_cRactorMovedObject
;
33 static void vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*r
, const char *file
, int line
);
38 ASSERT_ractor_unlocking(rb_ractor_t
*r
)
40 #if RACTOR_CHECK_MODE > 0
41 // GET_EC is NULL in an RJIT worker
42 if (rb_current_execution_context(false) != NULL
&& r
->sync
.locked_by
== rb_ractor_self(GET_RACTOR())) {
43 rb_bug("recursive ractor locking");
49 ASSERT_ractor_locking(rb_ractor_t
*r
)
51 #if RACTOR_CHECK_MODE > 0
52 // GET_EC is NULL in an RJIT worker
53 if (rb_current_execution_context(false) != NULL
&& r
->sync
.locked_by
!= rb_ractor_self(GET_RACTOR())) {
54 rp(r
->sync
.locked_by
);
55 rb_bug("ractor lock is not acquired.");
61 ractor_lock(rb_ractor_t
*r
, const char *file
, int line
)
63 RUBY_DEBUG_LOG2(file
, line
, "locking r:%u%s", r
->pub
.id
, rb_current_ractor_raw(false) == r
? " (self)" : "");
65 ASSERT_ractor_unlocking(r
);
66 rb_native_mutex_lock(&r
->sync
.lock
);
68 #if RACTOR_CHECK_MODE > 0
69 if (rb_current_execution_context(false) != NULL
) { // GET_EC is NULL in an RJIT worker
70 rb_ractor_t
*cr
= rb_current_ractor_raw(false);
71 r
->sync
.locked_by
= cr
? rb_ractor_self(cr
) : Qundef
;
75 RUBY_DEBUG_LOG2(file
, line
, "locked r:%u%s", r
->pub
.id
, rb_current_ractor_raw(false) == r
? " (self)" : "");
79 ractor_lock_self(rb_ractor_t
*cr
, const char *file
, int line
)
81 VM_ASSERT(cr
== GET_RACTOR());
82 #if RACTOR_CHECK_MODE > 0
83 VM_ASSERT(cr
->sync
.locked_by
!= cr
->pub
.self
);
85 ractor_lock(cr
, file
, line
);
89 ractor_unlock(rb_ractor_t
*r
, const char *file
, int line
)
91 ASSERT_ractor_locking(r
);
92 #if RACTOR_CHECK_MODE > 0
93 r
->sync
.locked_by
= Qnil
;
95 rb_native_mutex_unlock(&r
->sync
.lock
);
97 RUBY_DEBUG_LOG2(file
, line
, "r:%u%s", r
->pub
.id
, rb_current_ractor_raw(false) == r
? " (self)" : "");
101 ractor_unlock_self(rb_ractor_t
*cr
, const char *file
, int line
)
103 VM_ASSERT(cr
== GET_RACTOR());
104 #if RACTOR_CHECK_MODE > 0
105 VM_ASSERT(cr
->sync
.locked_by
== cr
->pub
.self
);
107 ractor_unlock(cr
, file
, line
);
110 #define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
111 #define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
112 #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
113 #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
116 rb_ractor_lock_self(rb_ractor_t
*r
)
122 rb_ractor_unlock_self(rb_ractor_t
*r
)
124 RACTOR_UNLOCK_SELF(r
);
130 ractor_status_str(enum ractor_status status
)
133 case ractor_created
: return "created";
134 case ractor_running
: return "running";
135 case ractor_blocking
: return "blocking";
136 case ractor_terminated
: return "terminated";
138 rb_bug("unreachable");
142 ractor_status_set(rb_ractor_t
*r
, enum ractor_status status
)
144 RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r
->pub
.id
, ractor_status_str(r
->status_
), ractor_status_str(status
));
147 if (r
->status_
!= ractor_created
) {
148 VM_ASSERT(r
== GET_RACTOR()); // only self-modification is allowed.
152 // check2: transition check. assume it will be vanished on non-debug build.
153 switch (r
->status_
) {
155 VM_ASSERT(status
== ractor_blocking
);
158 VM_ASSERT(status
== ractor_blocking
||
159 status
== ractor_terminated
);
161 case ractor_blocking
:
162 VM_ASSERT(status
== ractor_running
);
164 case ractor_terminated
:
165 rb_bug("unreachable");
173 ractor_status_p(rb_ractor_t
*r
, enum ractor_status status
)
175 return rb_ractor_status_p(r
, status
);
178 // Ractor data/mark/free
180 static struct rb_ractor_basket
*ractor_queue_at(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, int i
);
181 static void ractor_local_storage_mark(rb_ractor_t
*r
);
182 static void ractor_local_storage_free(rb_ractor_t
*r
);
185 ractor_queue_mark(struct rb_ractor_queue
*rq
)
187 for (int i
=0; i
<rq
->cnt
; i
++) {
188 struct rb_ractor_basket
*b
= ractor_queue_at(NULL
, rq
, i
);
189 rb_gc_mark(b
->sender
);
192 case basket_type_yielding
:
193 case basket_type_take_basket
:
194 case basket_type_deleted
:
195 case basket_type_reserved
:
199 rb_gc_mark(b
->p
.send
.v
);
205 ractor_mark(void *ptr
)
207 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
209 ractor_queue_mark(&r
->sync
.recv_queue
);
210 ractor_queue_mark(&r
->sync
.takers_queue
);
212 rb_gc_mark(r
->receiving_mutex
);
216 rb_gc_mark(r
->r_stdin
);
217 rb_gc_mark(r
->r_stdout
);
218 rb_gc_mark(r
->r_stderr
);
219 rb_hook_list_mark(&r
->pub
.hooks
);
221 if (r
->threads
.cnt
> 0) {
223 ccan_list_for_each(&r
->threads
.set
, th
, lt_node
) {
224 VM_ASSERT(th
!= NULL
);
225 rb_gc_mark(th
->self
);
229 ractor_local_storage_mark(r
);
233 ractor_queue_free(struct rb_ractor_queue
*rq
)
239 ractor_free(void *ptr
)
241 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
242 RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r
));
243 rb_native_mutex_destroy(&r
->sync
.lock
);
244 #ifdef RUBY_THREAD_WIN32_H
245 rb_native_cond_destroy(&r
->sync
.cond
);
247 ractor_queue_free(&r
->sync
.recv_queue
);
248 ractor_queue_free(&r
->sync
.takers_queue
);
249 ractor_local_storage_free(r
);
250 rb_hook_list_free(&r
->pub
.hooks
);
255 ractor_queue_memsize(const struct rb_ractor_queue
*rq
)
257 return sizeof(struct rb_ractor_basket
) * rq
->size
;
261 ractor_memsize(const void *ptr
)
263 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
265 // TODO: more correct?
266 return sizeof(rb_ractor_t
) +
267 ractor_queue_memsize(&r
->sync
.recv_queue
) +
268 ractor_queue_memsize(&r
->sync
.takers_queue
);
271 static const rb_data_type_t ractor_data_type
= {
279 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
/* | RUBY_TYPED_WB_PROTECTED */
283 rb_ractor_p(VALUE gv
)
285 if (rb_typeddata_is_kind_of(gv
, &ractor_data_type
)) {
293 static inline rb_ractor_t
*
294 RACTOR_PTR(VALUE self
)
296 VM_ASSERT(rb_ractor_p(self
));
297 rb_ractor_t
*r
= DATA_PTR(self
);
301 static rb_atomic_t ractor_last_id
;
303 #if RACTOR_CHECK_MODE > 0
305 rb_ractor_current_id(void)
307 if (GET_THREAD()->ractor
== NULL
) {
308 return 1; // main ractor
311 return rb_ractor_id(GET_RACTOR());
319 ractor_queue_setup(struct rb_ractor_queue
*rq
)
324 rq
->baskets
= malloc(sizeof(struct rb_ractor_basket
) * rq
->size
);
327 static struct rb_ractor_basket
*
328 ractor_queue_head(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
330 if (r
!= NULL
) ASSERT_ractor_locking(r
);
331 return &rq
->baskets
[rq
->start
];
334 static struct rb_ractor_basket
*
335 ractor_queue_at(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, int i
)
337 if (r
!= NULL
) ASSERT_ractor_locking(r
);
338 return &rq
->baskets
[(rq
->start
+ i
) % rq
->size
];
342 ractor_queue_advance(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
344 ASSERT_ractor_locking(r
);
346 if (rq
->reserved_cnt
== 0) {
348 rq
->start
= (rq
->start
+ 1) % rq
->size
;
352 ractor_queue_at(r
, rq
, 0)->type
.e
= basket_type_deleted
;
357 ractor_queue_skip_p(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, int i
)
359 struct rb_ractor_basket
*b
= ractor_queue_at(r
, rq
, i
);
360 return basket_type_p(b
, basket_type_deleted
) ||
361 basket_type_p(b
, basket_type_reserved
);
365 ractor_queue_compact(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
367 ASSERT_ractor_locking(r
);
369 while (rq
->cnt
> 0 && basket_type_p(ractor_queue_at(r
, rq
, 0), basket_type_deleted
)) {
370 ractor_queue_advance(r
, rq
);
375 ractor_queue_empty_p(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
377 ASSERT_ractor_locking(r
);
383 ractor_queue_compact(r
, rq
);
385 for (int i
=0; i
<rq
->cnt
; i
++) {
386 if (!ractor_queue_skip_p(r
, rq
, i
)) {
395 ractor_queue_deq(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, struct rb_ractor_basket
*basket
)
397 ASSERT_ractor_locking(r
);
399 for (int i
=0; i
<rq
->cnt
; i
++) {
400 if (!ractor_queue_skip_p(r
, rq
, i
)) {
401 struct rb_ractor_basket
*b
= ractor_queue_at(r
, rq
, i
);
405 b
->type
.e
= basket_type_deleted
;
406 ractor_queue_compact(r
, rq
);
415 ractor_queue_enq(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, struct rb_ractor_basket
*basket
)
417 ASSERT_ractor_locking(r
);
419 if (rq
->size
<= rq
->cnt
) {
420 rq
->baskets
= realloc(rq
->baskets
, sizeof(struct rb_ractor_basket
) * rq
->size
* 2);
421 for (int i
=rq
->size
- rq
->start
; i
<rq
->cnt
; i
++) {
422 rq
->baskets
[i
+ rq
->start
] = rq
->baskets
[i
+ rq
->start
- rq
->size
];
426 rq
->baskets
[(rq
->start
+ rq
->cnt
++) % rq
->size
] = *basket
;
427 // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
431 ractor_queue_delete(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, struct rb_ractor_basket
*basket
)
433 basket
->type
.e
= basket_type_deleted
;
438 static VALUE
ractor_reset_belonging(VALUE obj
); // in this file
441 ractor_basket_value(struct rb_ractor_basket
*b
)
444 case basket_type_ref
:
446 case basket_type_copy
:
447 case basket_type_move
:
448 case basket_type_will
:
449 b
->type
.e
= basket_type_ref
;
450 b
->p
.send
.v
= ractor_reset_belonging(b
->p
.send
.v
);
453 rb_bug("unreachable");
460 ractor_basket_accept(struct rb_ractor_basket
*b
)
462 VALUE v
= ractor_basket_value(b
);
464 if (b
->p
.send
.exception
) {
466 VALUE err
= rb_exc_new_cstr(rb_eRactorRemoteError
, "thrown by remote Ractor.");
467 rb_ivar_set(err
, rb_intern("@ractor"), b
->sender
);
468 rb_ec_setup_exception(NULL
, err
, cause
);
475 // Ractor synchronizations
477 #if USE_RUBY_DEBUG_LOG
479 wait_status_str(enum rb_ractor_wait_status wait_status
)
481 switch ((int)wait_status
) {
482 case wait_none
: return "none";
483 case wait_receiving
: return "receiving";
484 case wait_taking
: return "taking";
485 case wait_yielding
: return "yielding";
486 case wait_receiving
|wait_taking
: return "receiving|taking";
487 case wait_receiving
|wait_yielding
: return "receiving|yielding";
488 case wait_taking
|wait_yielding
: return "taking|yielding";
489 case wait_receiving
|wait_taking
|wait_yielding
: return "receiving|taking|yielding";
491 rb_bug("unreachable");
495 wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status
)
497 switch (wakeup_status
) {
498 case wakeup_none
: return "none";
499 case wakeup_by_send
: return "by_send";
500 case wakeup_by_yield
: return "by_yield";
501 case wakeup_by_take
: return "by_take";
502 case wakeup_by_close
: return "by_close";
503 case wakeup_by_interrupt
: return "by_interrupt";
504 case wakeup_by_retry
: return "by_retry";
506 rb_bug("unreachable");
510 basket_type_name(enum rb_ractor_basket_type type
)
513 case basket_type_none
: return "none";
514 case basket_type_ref
: return "ref";
515 case basket_type_copy
: return "copy";
516 case basket_type_move
: return "move";
517 case basket_type_will
: return "will";
518 case basket_type_deleted
: return "deleted";
519 case basket_type_reserved
: return "reserved";
520 case basket_type_take_basket
: return "take_basket";
521 case basket_type_yielding
: return "yielding";
526 #endif // USE_RUBY_DEBUG_LOG
529 ractor_sleeping_by(const rb_ractor_t
*r
, enum rb_ractor_wait_status wait_status
)
531 return (r
->sync
.wait
.status
& wait_status
) && r
->sync
.wait
.wakeup_status
== wakeup_none
;
534 #ifdef RUBY_THREAD_PTHREAD_H
536 void rb_ractor_sched_wakeup(rb_ractor_t
*r
);
540 rb_ractor_sched_wakeup(rb_ractor_t
*r
)
542 rb_native_cond_broadcast(&r
->sync
.cond
);
548 ractor_wakeup(rb_ractor_t
*r
, enum rb_ractor_wait_status wait_status
, enum rb_ractor_wakeup_status wakeup_status
)
550 ASSERT_ractor_locking(r
);
552 RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
554 wait_status_str(r
->sync
.wait
.status
),
555 wait_status_str(wait_status
),
556 wakeup_status_str(wakeup_status
));
558 if (ractor_sleeping_by(r
, wait_status
)) {
559 r
->sync
.wait
.wakeup_status
= wakeup_status
;
560 rb_ractor_sched_wakeup(r
);
569 ractor_sleep_interrupt(void *ptr
)
571 rb_ractor_t
*r
= ptr
;
575 ractor_wakeup(r
, wait_receiving
| wait_taking
| wait_yielding
, wakeup_by_interrupt
);
580 typedef void (*ractor_sleep_cleanup_function
)(rb_ractor_t
*cr
, void *p
);
583 ractor_check_ints(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, ractor_sleep_cleanup_function cf_func
, void *cf_data
)
585 if (cr
->sync
.wait
.status
!= wait_none
) {
586 enum rb_ractor_wait_status prev_wait_status
= cr
->sync
.wait
.status
;
587 cr
->sync
.wait
.status
= wait_none
;
588 cr
->sync
.wait
.wakeup_status
= wakeup_by_interrupt
;
593 enum ruby_tag_type state
;
595 if ((state
= EC_EXEC_TAG()) == TAG_NONE
) {
596 rb_thread_check_ints();
601 (*cf_func
)(cr
, cf_data
);
602 EC_JUMP_TAG(ec
, state
);
606 rb_thread_check_ints();
612 cr
->sync
.wait
.status
= prev_wait_status
;
616 #ifdef RUBY_THREAD_PTHREAD_H
617 void rb_ractor_sched_sleep(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, rb_unblock_function_t
*ubf
);
622 ractor_cond_wait(rb_ractor_t
*r
)
624 #if RACTOR_CHECK_MODE > 0
625 VALUE locked_by
= r
->sync
.locked_by
;
626 r
->sync
.locked_by
= Qnil
;
628 rb_native_cond_wait(&r
->sync
.cond
, &r
->sync
.lock
);
630 #if RACTOR_CHECK_MODE > 0
631 r
->sync
.locked_by
= locked_by
;
636 ractor_sleep_wo_gvl(void *ptr
)
638 rb_ractor_t
*cr
= ptr
;
639 RACTOR_LOCK_SELF(cr
);
641 VM_ASSERT(cr
->sync
.wait
.status
!= wait_none
);
642 if (cr
->sync
.wait
.wakeup_status
== wakeup_none
) {
643 ractor_cond_wait(cr
);
645 cr
->sync
.wait
.status
= wait_none
;
647 RACTOR_UNLOCK_SELF(cr
);
652 rb_ractor_sched_sleep(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, rb_unblock_function_t
*ubf
)
656 rb_nogvl(ractor_sleep_wo_gvl
, cr
,
658 RB_NOGVL_UBF_ASYNC_SAFE
| RB_NOGVL_INTR_FAIL
);
664 static enum rb_ractor_wakeup_status
665 ractor_sleep_with_cleanup(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, enum rb_ractor_wait_status wait_status
,
666 ractor_sleep_cleanup_function cf_func
, void *cf_data
)
668 enum rb_ractor_wakeup_status wakeup_status
;
669 VM_ASSERT(GET_RACTOR() == cr
);
671 // TODO: multi-threads
672 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
673 VM_ASSERT(wait_status
!= wait_none
);
674 cr
->sync
.wait
.status
= wait_status
;
675 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
677 // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
678 // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
680 RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status
));
682 while (cr
->sync
.wait
.wakeup_status
== wakeup_none
) {
683 rb_ractor_sched_sleep(ec
, cr
, ractor_sleep_interrupt
);
684 ractor_check_ints(ec
, cr
, cf_func
, cf_data
);
687 cr
->sync
.wait
.status
= wait_none
;
689 // TODO: multi-thread
690 wakeup_status
= cr
->sync
.wait
.wakeup_status
;
691 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
693 RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status
));
695 return wakeup_status
;
698 static enum rb_ractor_wakeup_status
699 ractor_sleep(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, enum rb_ractor_wait_status wait_status
)
701 return ractor_sleep_with_cleanup(ec
, cr
, wait_status
, 0, NULL
);
707 ractor_recursive_receive_if(rb_ractor_t
*r
)
709 if (r
->receiving_mutex
&& rb_mutex_owned_p(r
->receiving_mutex
)) {
710 rb_raise(rb_eRactorError
, "can not call receive/receive_if recursively");
715 ractor_try_receive(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, struct rb_ractor_queue
*rq
)
717 struct rb_ractor_basket basket
;
718 ractor_recursive_receive_if(cr
);
719 bool received
= false;
721 RACTOR_LOCK_SELF(cr
);
723 RUBY_DEBUG_LOG("rq->cnt:%d", rq
->cnt
);
724 received
= ractor_queue_deq(cr
, rq
, &basket
);
726 RACTOR_UNLOCK_SELF(cr
);
729 if (cr
->sync
.incoming_port_closed
) {
730 rb_raise(rb_eRactorClosedError
, "The incoming port is already closed");
735 return ractor_basket_accept(&basket
);
740 ractor_wait_receive(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, struct rb_ractor_queue
*rq
)
742 VM_ASSERT(cr
== rb_ec_ractor_ptr(ec
));
743 ractor_recursive_receive_if(cr
);
747 while (ractor_queue_empty_p(cr
, rq
)) {
748 ractor_sleep(ec
, cr
, wait_receiving
);
755 ractor_receive(rb_execution_context_t
*ec
, rb_ractor_t
*cr
)
757 VM_ASSERT(cr
== rb_ec_ractor_ptr(ec
));
759 struct rb_ractor_queue
*rq
= &cr
->sync
.recv_queue
;
761 while (UNDEF_P(v
= ractor_try_receive(ec
, cr
, rq
))) {
762 ractor_wait_receive(ec
, cr
, rq
);
770 rq_dump(struct rb_ractor_queue
*rq
)
773 for (int i
=0; i
<rq
->cnt
; i
++) {
774 struct rb_ractor_basket
*b
= ractor_queue_at(NULL
, rq
, i
);
775 fprintf(stderr
, "%d (start:%d) type:%s %p %s\n", i
, rq
->start
, basket_type_name(b
->type
),
776 (void *)b
, RSTRING_PTR(RARRAY_AREF(b
->v
, 1)));
777 if (basket_type_p(b
, basket_type_reserved
) bug
= true;
779 if (bug
) rb_bug("!!");
783 struct receive_block_data
{
785 struct rb_ractor_queue
*rq
;
792 ractor_receive_if_lock(rb_ractor_t
*cr
)
794 VALUE m
= cr
->receiving_mutex
;
796 m
= cr
->receiving_mutex
= rb_mutex_new();
802 receive_if_body(VALUE ptr
)
804 struct receive_block_data
*data
= (struct receive_block_data
*)ptr
;
806 ractor_receive_if_lock(data
->cr
);
807 VALUE block_result
= rb_yield(data
->v
);
808 rb_ractor_t
*cr
= data
->cr
;
810 RACTOR_LOCK_SELF(cr
);
812 struct rb_ractor_basket
*b
= ractor_queue_at(cr
, data
->rq
, data
->index
);
813 VM_ASSERT(basket_type_p(b
, basket_type_reserved
));
814 data
->rq
->reserved_cnt
--;
816 if (RTEST(block_result
)) {
817 ractor_queue_delete(cr
, data
->rq
, b
);
818 ractor_queue_compact(cr
, data
->rq
);
821 b
->type
.e
= basket_type_ref
;
824 RACTOR_UNLOCK_SELF(cr
);
826 data
->success
= true;
828 if (RTEST(block_result
)) {
837 receive_if_ensure(VALUE v
)
839 struct receive_block_data
*data
= (struct receive_block_data
*)v
;
840 rb_ractor_t
*cr
= data
->cr
;
842 if (!data
->success
) {
843 RACTOR_LOCK_SELF(cr
);
845 struct rb_ractor_basket
*b
= ractor_queue_at(cr
, data
->rq
, data
->index
);
846 VM_ASSERT(basket_type_p(b
, basket_type_reserved
));
847 b
->type
.e
= basket_type_deleted
;
848 data
->rq
->reserved_cnt
--;
850 RACTOR_UNLOCK_SELF(cr
);
853 rb_mutex_unlock(cr
->receiving_mutex
);
858 ractor_receive_if(rb_execution_context_t
*ec
, VALUE crv
, VALUE b
)
860 if (!RTEST(b
)) rb_raise(rb_eArgError
, "no block given");
862 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
863 unsigned int serial
= (unsigned int)-1;
865 struct rb_ractor_queue
*rq
= &cr
->sync
.recv_queue
;
870 ractor_wait_receive(ec
, cr
, rq
);
872 RACTOR_LOCK_SELF(cr
);
874 if (serial
!= rq
->serial
) {
879 // check newer version
880 for (int i
=index
; i
<rq
->cnt
; i
++) {
881 if (!ractor_queue_skip_p(cr
, rq
, i
)) {
882 struct rb_ractor_basket
*b
= ractor_queue_at(cr
, rq
, i
);
883 v
= ractor_basket_value(b
);
884 b
->type
.e
= basket_type_reserved
;
891 RACTOR_UNLOCK_SELF(cr
);
894 struct receive_block_data data
= {
902 VALUE result
= rb_ensure(receive_if_body
, (VALUE
)&data
,
903 receive_if_ensure
, (VALUE
)&data
);
905 if (!UNDEF_P(result
)) return result
;
909 RUBY_VM_CHECK_INTS(ec
);
914 ractor_send_basket(rb_execution_context_t
*ec
, rb_ractor_t
*r
, struct rb_ractor_basket
*b
)
920 if (r
->sync
.incoming_port_closed
) {
924 ractor_queue_enq(r
, &r
->sync
.recv_queue
, b
);
925 ractor_wakeup(r
, wait_receiving
, wakeup_by_send
);
931 rb_raise(rb_eRactorClosedError
, "The incoming-port is already closed");
937 static VALUE
ractor_move(VALUE obj
); // in this file
938 static VALUE
ractor_copy(VALUE obj
); // in this file
941 ractor_basket_prepare_contents(VALUE obj
, VALUE move
, volatile VALUE
*pobj
, enum rb_ractor_basket_type
*ptype
)
944 enum rb_ractor_basket_type type
;
946 if (rb_ractor_shareable_p(obj
)) {
947 type
= basket_type_ref
;
950 else if (!RTEST(move
)) {
951 v
= ractor_copy(obj
);
952 type
= basket_type_copy
;
955 type
= basket_type_move
;
956 v
= ractor_move(obj
);
964 ractor_basket_fill_(rb_ractor_t
*cr
, struct rb_ractor_basket
*basket
, VALUE obj
, bool exc
)
966 VM_ASSERT(cr
== GET_RACTOR());
968 basket
->sender
= cr
->pub
.self
;
969 basket
->p
.send
.exception
= exc
;
970 basket
->p
.send
.v
= obj
;
974 ractor_basket_fill(rb_ractor_t
*cr
, struct rb_ractor_basket
*basket
, VALUE obj
, VALUE move
, bool exc
)
977 enum rb_ractor_basket_type type
;
978 ractor_basket_prepare_contents(obj
, move
, &v
, &type
);
979 ractor_basket_fill_(cr
, basket
, v
, exc
);
980 basket
->type
.e
= type
;
984 ractor_basket_fill_will(rb_ractor_t
*cr
, struct rb_ractor_basket
*basket
, VALUE obj
, bool exc
)
986 ractor_basket_fill_(cr
, basket
, obj
, exc
);
987 basket
->type
.e
= basket_type_will
;
991 ractor_send(rb_execution_context_t
*ec
, rb_ractor_t
*r
, VALUE obj
, VALUE move
)
993 struct rb_ractor_basket basket
;
994 // TODO: Ractor local GC
995 ractor_basket_fill(rb_ec_ractor_ptr(ec
), &basket
, obj
, move
, false);
996 ractor_send_basket(ec
, r
, &basket
);
1003 ractor_take_has_will(rb_ractor_t
*r
)
1005 ASSERT_ractor_locking(r
);
1007 return basket_type_p(&r
->sync
.will_basket
, basket_type_will
);
1011 ractor_take_will(rb_ractor_t
*r
, struct rb_ractor_basket
*b
)
1013 ASSERT_ractor_locking(r
);
1015 if (ractor_take_has_will(r
)) {
1016 *b
= r
->sync
.will_basket
;
1017 r
->sync
.will_basket
.type
.e
= basket_type_none
;
1021 VM_ASSERT(basket_type_p(&r
->sync
.will_basket
, basket_type_none
));
1027 ractor_take_will_lock(rb_ractor_t
*r
, struct rb_ractor_basket
*b
)
1029 ASSERT_ractor_unlocking(r
);
1034 taken
= ractor_take_will(r
, b
);
1042 ractor_register_take(rb_ractor_t
*cr
, rb_ractor_t
*r
, struct rb_ractor_basket
*take_basket
,
1043 bool is_take
, struct rb_ractor_selector_take_config
*config
, bool ignore_error
)
1045 struct rb_ractor_basket b
= {
1046 .type
.e
= basket_type_take_basket
,
1047 .sender
= cr
->pub
.self
,
1050 .basket
= take_basket
,
1055 bool closed
= false;
1059 if (is_take
&& ractor_take_will(r
, take_basket
)) {
1060 RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r
));
1062 else if (!is_take
&& ractor_take_has_will(r
)) {
1063 RUBY_DEBUG_LOG("has_will");
1064 VM_ASSERT(config
!= NULL
);
1065 config
->closed
= true;
1067 else if (r
->sync
.outgoing_port_closed
) {
1071 RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r
));
1072 ractor_queue_enq(r
, &r
->sync
.takers_queue
, &b
);
1074 if (basket_none_p(take_basket
)) {
1075 ractor_wakeup(r
, wait_yielding
, wakeup_by_take
);
1082 if (!ignore_error
) rb_raise(rb_eRactorClosedError
, "The outgoing-port is already closed");
1091 ractor_deregister_take(rb_ractor_t
*r
, struct rb_ractor_basket
*take_basket
)
1093 struct rb_ractor_queue
*ts
= &r
->sync
.takers_queue
;
1094 bool deleted
= false;
1098 if (r
->sync
.outgoing_port_closed
) {
1102 for (int i
=0; i
<ts
->cnt
; i
++) {
1103 struct rb_ractor_basket
*b
= ractor_queue_at(r
, ts
, i
);
1104 if (basket_type_p(b
, basket_type_take_basket
) && b
->p
.take
.basket
== take_basket
) {
1105 ractor_queue_delete(r
, ts
, b
);
1110 ractor_queue_compact(r
, ts
);
1120 ractor_try_take(rb_ractor_t
*cr
, rb_ractor_t
*r
, struct rb_ractor_basket
*take_basket
)
1124 RACTOR_LOCK_SELF(cr
);
1126 if (basket_none_p(take_basket
) || basket_type_p(take_basket
, basket_type_yielding
)) {
1133 RACTOR_UNLOCK_SELF(cr
);
1136 RUBY_DEBUG_LOG("taken");
1137 if (basket_type_p(take_basket
, basket_type_deleted
)) {
1138 VM_ASSERT(r
->sync
.outgoing_port_closed
);
1139 rb_raise(rb_eRactorClosedError
, "The outgoing-port is already closed");
1141 return ractor_basket_accept(take_basket
);
1144 RUBY_DEBUG_LOG("not taken");
1150 #if VM_CHECK_MODE > 0
1152 ractor_check_specific_take_basket_lock(rb_ractor_t
*r
, struct rb_ractor_basket
*tb
)
1155 struct rb_ractor_queue
*ts
= &r
->sync
.takers_queue
;
1159 for (int i
=0; i
<ts
->cnt
; i
++) {
1160 struct rb_ractor_basket
*b
= ractor_queue_at(r
, ts
, i
);
1161 if (basket_type_p(b
, basket_type_take_basket
) && b
->p
.take
.basket
== tb
) {
1174 ractor_take_cleanup(rb_ractor_t
*cr
, rb_ractor_t
*r
, struct rb_ractor_basket
*tb
)
1177 if (basket_none_p(tb
)) { // not yielded yet
1178 if (!ractor_deregister_take(r
, tb
)) {
1179 // not in r's takers queue
1185 VM_ASSERT(!ractor_check_specific_take_basket_lock(r
, tb
));
1189 struct take_wait_take_cleanup_data
{
1191 struct rb_ractor_basket
*tb
;
1195 ractor_wait_take_cleanup(rb_ractor_t
*cr
, void *ptr
)
1197 struct take_wait_take_cleanup_data
*data
= (struct take_wait_take_cleanup_data
*)ptr
;
1198 ractor_take_cleanup(cr
, data
->r
, data
->tb
);
1202 ractor_wait_take(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, rb_ractor_t
*r
, struct rb_ractor_basket
*take_basket
)
1204 struct take_wait_take_cleanup_data data
= {
1209 RACTOR_LOCK_SELF(cr
);
1211 if (basket_none_p(take_basket
) || basket_type_p(take_basket
, basket_type_yielding
)) {
1212 ractor_sleep_with_cleanup(ec
, cr
, wait_taking
, ractor_wait_take_cleanup
, &data
);
1215 RACTOR_UNLOCK_SELF(cr
);
1219 ractor_take(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1221 RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r
));
1223 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1225 struct rb_ractor_basket take_basket
= {
1226 .type
.e
= basket_type_none
,
1230 ractor_register_take(cr
, r
, &take_basket
, true, NULL
, false);
1232 while (UNDEF_P(v
= ractor_try_take(cr
, r
, &take_basket
))) {
1233 ractor_wait_take(ec
, cr
, r
, &take_basket
);
1236 VM_ASSERT(!basket_none_p(&take_basket
));
1237 VM_ASSERT(!ractor_check_specific_take_basket_lock(r
, &take_basket
));
1245 ractor_check_take_basket(rb_ractor_t
*cr
, struct rb_ractor_queue
*rs
)
1247 ASSERT_ractor_locking(cr
);
1249 for (int i
=0; i
<rs
->cnt
; i
++) {
1250 struct rb_ractor_basket
*b
= ractor_queue_at(cr
, rs
, i
);
1251 if (basket_type_p(b
, basket_type_take_basket
) &&
1252 basket_none_p(b
->p
.take
.basket
)) {
1261 ractor_deq_take_basket(rb_ractor_t
*cr
, struct rb_ractor_queue
*rs
, struct rb_ractor_basket
*b
)
1263 ASSERT_ractor_unlocking(cr
);
1264 struct rb_ractor_basket
*first_tb
= NULL
;
1267 RACTOR_LOCK_SELF(cr
);
1269 while (ractor_queue_deq(cr
, rs
, b
)) {
1270 if (basket_type_p(b
, basket_type_take_basket
)) {
1271 struct rb_ractor_basket
*tb
= b
->p
.take
.basket
;
1273 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_none
, basket_type_yielding
) == basket_type_none
) {
1278 ractor_queue_enq(cr
, rs
, b
);
1279 if (first_tb
== NULL
) first_tb
= tb
;
1280 struct rb_ractor_basket
*head
= ractor_queue_head(cr
, rs
);
1281 VM_ASSERT(head
!= NULL
);
1282 if (basket_type_p(head
, basket_type_take_basket
) && head
->p
.take
.basket
== first_tb
) {
1283 break; // loop detected
1288 VM_ASSERT(basket_none_p(b
));
1292 if (found
&& b
->p
.take
.config
&& !b
->p
.take
.config
->oneshot
) {
1293 ractor_queue_enq(cr
, rs
, b
);
1296 RACTOR_UNLOCK_SELF(cr
);
1302 ractor_try_yield(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, struct rb_ractor_queue
*ts
, volatile VALUE obj
, VALUE move
, bool exc
, bool is_will
)
1304 ASSERT_ractor_unlocking(cr
);
1306 struct rb_ractor_basket b
;
1308 if (ractor_deq_take_basket(cr
, ts
, &b
)) {
1309 VM_ASSERT(basket_type_p(&b
, basket_type_take_basket
));
1310 VM_ASSERT(basket_type_p(b
.p
.take
.basket
, basket_type_yielding
));
1312 rb_ractor_t
*tr
= RACTOR_PTR(b
.sender
);
1313 struct rb_ractor_basket
*tb
= b
.p
.take
.basket
;
1314 enum rb_ractor_basket_type type
;
1316 RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr
));
1319 type
= basket_type_will
;
1322 enum ruby_tag_type state
;
1326 if ((state
= EC_EXEC_TAG()) == TAG_NONE
) {
1327 // TODO: Ractor local GC
1328 ractor_basket_prepare_contents(obj
, move
, &obj
, &type
);
1333 RACTOR_LOCK_SELF(cr
);
1335 b
.p
.take
.basket
->type
.e
= basket_type_none
;
1336 ractor_queue_enq(cr
, ts
, &b
);
1338 RACTOR_UNLOCK_SELF(cr
);
1339 EC_JUMP_TAG(ec
, state
);
1345 VM_ASSERT(basket_type_p(tb
, basket_type_yielding
));
1347 RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will
? "will " : "", rb_ractor_id(tr
));
1348 ractor_basket_fill_(cr
, tb
, obj
, exc
);
1349 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_yielding
, type
) != basket_type_yielding
) {
1350 rb_bug("unreachable");
1352 ractor_wakeup(tr
, wait_taking
, wakeup_by_yield
);
1359 RUBY_DEBUG_LOG("no take basket");
1365 ractor_wait_yield(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, struct rb_ractor_queue
*ts
)
1367 RACTOR_LOCK_SELF(cr
);
1369 while (!ractor_check_take_basket(cr
, ts
)) {
1370 ractor_sleep(ec
, cr
, wait_yielding
);
1373 RACTOR_UNLOCK_SELF(cr
);
1377 ractor_yield(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, VALUE obj
, VALUE move
)
1379 struct rb_ractor_queue
*ts
= &cr
->sync
.takers_queue
;
1381 while (!ractor_try_yield(ec
, cr
, ts
, obj
, move
, false, false)) {
1382 ractor_wait_yield(ec
, cr
, ts
);
1390 struct rb_ractor_selector
{
1392 struct rb_ractor_basket take_basket
;
1393 st_table
*take_ractors
; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
1397 ractor_selector_mark_ractors_i(st_data_t key
, st_data_t value
, st_data_t data
)
1399 const rb_ractor_t
*r
= (rb_ractor_t
*)key
;
1400 rb_gc_mark(r
->pub
.self
);
1405 ractor_selector_mark(void *ptr
)
1407 struct rb_ractor_selector
*s
= ptr
;
1409 if (s
->take_ractors
) {
1410 st_foreach(s
->take_ractors
, ractor_selector_mark_ractors_i
, 0);
1413 switch (s
->take_basket
.type
.e
) {
1414 case basket_type_ref
:
1415 case basket_type_copy
:
1416 case basket_type_move
:
1417 case basket_type_will
:
1418 rb_gc_mark(s
->take_basket
.sender
);
1419 rb_gc_mark(s
->take_basket
.p
.send
.v
);
1427 ractor_selector_release_i(st_data_t key
, st_data_t val
, st_data_t data
)
1429 struct rb_ractor_selector
*s
= (struct rb_ractor_selector
*)data
;
1430 struct rb_ractor_selector_take_config
*config
= (struct rb_ractor_selector_take_config
*)val
;
1432 if (!config
->closed
) {
1433 ractor_deregister_take((rb_ractor_t
*)key
, &s
->take_basket
);
1440 ractor_selector_free(void *ptr
)
1442 struct rb_ractor_selector
*s
= ptr
;
1443 st_foreach(s
->take_ractors
, ractor_selector_release_i
, (st_data_t
)s
);
1444 st_free_table(s
->take_ractors
);
1449 ractor_selector_memsize(const void *ptr
)
1451 const struct rb_ractor_selector
*s
= ptr
;
1452 return sizeof(struct rb_ractor_selector
) +
1453 st_memsize(s
->take_ractors
) +
1454 s
->take_ractors
->num_entries
* sizeof(struct rb_ractor_selector_take_config
);
1457 static const rb_data_type_t ractor_selector_data_type
= {
1460 ractor_selector_mark
,
1461 ractor_selector_free
,
1462 ractor_selector_memsize
,
1465 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
,
1468 static struct rb_ractor_selector
*
1469 RACTOR_SELECTOR_PTR(VALUE selv
)
1471 VM_ASSERT(rb_typeddata_is_kind_of(selv
, &ractor_selector_data_type
));
1473 return (struct rb_ractor_selector
*)DATA_PTR(selv
);
1476 // Ractor::Selector.new
1479 ractor_selector_create(VALUE klass
)
1481 struct rb_ractor_selector
*s
;
1482 VALUE selv
= TypedData_Make_Struct(klass
, struct rb_ractor_selector
, &ractor_selector_data_type
, s
);
1483 s
->take_basket
.type
.e
= basket_type_reserved
;
1484 s
->take_ractors
= st_init_numtable(); // ractor (ptr) -> take_config
1488 // Ractor::Selector#add(r)
1491 ractor_selector_add(VALUE selv
, VALUE rv
)
1493 if (!rb_ractor_p(rv
)) {
1494 rb_raise(rb_eArgError
, "Not a ractor object");
1497 rb_ractor_t
*r
= RACTOR_PTR(rv
);
1498 struct rb_ractor_selector
*s
= RACTOR_SELECTOR_PTR(selv
);
1500 if (st_lookup(s
->take_ractors
, (st_data_t
)r
, NULL
)) {
1501 rb_raise(rb_eArgError
, "already added");
1504 struct rb_ractor_selector_take_config
*config
= malloc(sizeof(struct rb_ractor_selector_take_config
));
1505 VM_ASSERT(config
!= NULL
);
1506 config
->closed
= false;
1507 config
->oneshot
= false;
1509 if (ractor_register_take(GET_RACTOR(), r
, &s
->take_basket
, false, config
, true)) {
1510 st_insert(s
->take_ractors
, (st_data_t
)r
, (st_data_t
)config
);
1516 // Ractor::Selector#remove(r)
1519 ractor_selector_remove(VALUE selv
, VALUE rv
)
1521 if (!rb_ractor_p(rv
)) {
1522 rb_raise(rb_eArgError
, "Not a ractor object");
1525 rb_ractor_t
*r
= RACTOR_PTR(rv
);
1526 struct rb_ractor_selector
*s
= RACTOR_SELECTOR_PTR(selv
);
1528 RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r
));
1530 if (!st_lookup(s
->take_ractors
, (st_data_t
)r
, NULL
)) {
1531 rb_raise(rb_eArgError
, "not added yet");
1534 ractor_deregister_take(r
, &s
->take_basket
);
1535 struct rb_ractor_selector_take_config
*config
;
1536 st_delete(s
->take_ractors
, (st_data_t
*)&r
, (st_data_t
*)&config
);
1542 // Ractor::Selector#clear
1544 struct ractor_selector_clear_data
{
1546 rb_execution_context_t
*ec
;
1550 ractor_selector_clear_i(st_data_t key
, st_data_t val
, st_data_t data
)
1552 VALUE selv
= (VALUE
)data
;
1553 rb_ractor_t
*r
= (rb_ractor_t
*)key
;
1554 ractor_selector_remove(selv
, r
->pub
.self
);
1559 ractor_selector_clear(VALUE selv
)
1561 struct rb_ractor_selector
*s
= RACTOR_SELECTOR_PTR(selv
);
1563 st_foreach(s
->take_ractors
, ractor_selector_clear_i
, (st_data_t
)selv
);
1564 st_clear(s
->take_ractors
);
1569 ractor_selector_empty_p(VALUE selv
)
1571 struct rb_ractor_selector
*s
= RACTOR_SELECTOR_PTR(selv
);
1572 return s
->take_ractors
->num_entries
== 0 ? Qtrue
: Qfalse
;
1576 ractor_selector_wait_i(st_data_t key
, st_data_t val
, st_data_t dat
)
1578 rb_ractor_t
*r
= (rb_ractor_t
*)key
;
1579 struct rb_ractor_basket
*tb
= (struct rb_ractor_basket
*)dat
;
1582 if (!basket_none_p(tb
)) {
1583 RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb
->type
.e
));
1589 if (basket_type_p(&r
->sync
.will_basket
, basket_type_will
)) {
1590 RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r
));
1592 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_none
, basket_type_will
) == basket_type_none
) {
1593 ractor_take_will(r
, tb
);
1597 RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb
->type
.e
));
1601 else if (r
->sync
.outgoing_port_closed
) {
1602 RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r
));
1604 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_none
, basket_type_deleted
) == basket_type_none
) {
1605 tb
->sender
= r
->pub
.self
;
1609 RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb
->type
.e
));
1614 RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r
));
1615 ractor_wakeup(r
, wait_yielding
, wakeup_by_take
);
1624 // Ractor::Selector#wait
1627 ractor_selector_wait_cleaup(rb_ractor_t
*cr
, void *ptr
)
1629 struct rb_ractor_basket
*tb
= (struct rb_ractor_basket
*)ptr
;
1631 RACTOR_LOCK_SELF(cr
);
1633 while (basket_type_p(tb
, basket_type_yielding
)) rb_thread_sleep(0);
1634 // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
1635 tb
->type
.e
= basket_type_reserved
;
1637 RACTOR_UNLOCK_SELF(cr
);
1641 ractor_selector__wait(VALUE selv
, VALUE do_receivev
, VALUE do_yieldv
, VALUE yield_value
, VALUE move
)
1643 rb_execution_context_t
*ec
= GET_EC();
1644 struct rb_ractor_selector
*s
= RACTOR_SELECTOR_PTR(selv
);
1645 struct rb_ractor_basket
*tb
= &s
->take_basket
;
1646 struct rb_ractor_basket taken_basket
;
1647 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1648 bool do_receive
= !!RTEST(do_receivev
);
1649 bool do_yield
= !!RTEST(do_yieldv
);
1651 enum rb_ractor_wait_status wait_status
;
1652 struct rb_ractor_queue
*rq
= &cr
->sync
.recv_queue
;
1653 struct rb_ractor_queue
*ts
= &cr
->sync
.takers_queue
;
1655 RUBY_DEBUG_LOG("start");
1658 RUBY_DEBUG_LOG("takers:%ld", s
->take_ractors
->num_entries
);
1660 // setup wait_status
1661 wait_status
= wait_none
;
1662 if (s
->take_ractors
->num_entries
> 0) wait_status
|= wait_taking
;
1663 if (do_receive
) wait_status
|= wait_receiving
;
1664 if (do_yield
) wait_status
|= wait_yielding
;
1666 RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status
));
1668 if (wait_status
== wait_none
) {
1669 rb_raise(rb_eRactorError
, "no taking ractors");
1673 if (do_receive
&& !UNDEF_P(ret_v
= ractor_try_receive(ec
, cr
, rq
))) {
1674 ret_r
= ID2SYM(rb_intern("receive"));
1679 if (do_yield
&& ractor_try_yield(ec
, cr
, ts
, yield_value
, move
, false, false)) {
1681 ret_r
= ID2SYM(rb_intern("yield"));
1685 // check take_basket
1686 VM_ASSERT(basket_type_p(&s
->take_basket
, basket_type_reserved
));
1687 s
->take_basket
.type
.e
= basket_type_none
;
1688 // kick all take target ractors
1689 st_foreach(s
->take_ractors
, ractor_selector_wait_i
, (st_data_t
)tb
);
1691 RACTOR_LOCK_SELF(cr
);
1695 if (!basket_none_p(tb
)) {
1696 RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb
->type
.e
),
1697 tb
->sender
? rb_ractor_id(RACTOR_PTR(tb
->sender
)) : 0);
1700 if (do_receive
&& !ractor_queue_empty_p(cr
, rq
)) {
1701 RUBY_DEBUG_LOG("can receive (%d)", rq
->cnt
);
1704 if (do_yield
&& ractor_check_take_basket(cr
, ts
)) {
1705 RUBY_DEBUG_LOG("can yield");
1709 ractor_sleep_with_cleanup(ec
, cr
, wait_status
, ractor_selector_wait_cleaup
, tb
);
1715 // tb->type.e = basket_type_reserved # do it atomic in the following code
1716 if (taken_basket
.type
.e
== basket_type_yielding
||
1717 RUBY_ATOMIC_CAS(tb
->type
.atomic
, taken_basket
.type
.e
, basket_type_reserved
) != taken_basket
.type
.e
) {
1719 if (basket_type_p(tb
, basket_type_yielding
)) {
1720 RACTOR_UNLOCK_SELF(cr
);
1724 RACTOR_LOCK_SELF(cr
);
1729 RACTOR_UNLOCK_SELF(cr
);
1731 // check the taken resutl
1732 switch (taken_basket
.type
.e
) {
1733 case basket_type_none
:
1734 VM_ASSERT(do_receive
|| do_yield
);
1736 case basket_type_yielding
:
1737 rb_bug("unreachable");
1738 case basket_type_deleted
: {
1739 ractor_selector_remove(selv
, taken_basket
.sender
);
1741 rb_ractor_t
*r
= RACTOR_PTR(taken_basket
.sender
);
1742 if (ractor_take_will_lock(r
, &taken_basket
)) {
1743 RUBY_DEBUG_LOG("has_will");
1746 RUBY_DEBUG_LOG("no will");
1747 // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1748 // remove and retry wait
1753 case basket_type_will
:
1755 ractor_selector_remove(selv
, taken_basket
.sender
);
1761 RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket
.type
.e
));
1763 ret_v
= ractor_basket_accept(&taken_basket
);
1764 ret_r
= taken_basket
.sender
;
1766 return rb_ary_new_from_args(2, ret_r
, ret_v
);
1770 ractor_selector_wait(int argc
, VALUE
*argv
, VALUE selector
)
1776 keywords
[0] = rb_intern("receive");
1777 keywords
[1] = rb_intern("yield_value");
1778 keywords
[2] = rb_intern("move");
1780 rb_scan_args(argc
, argv
, "0:", &options
);
1781 rb_get_kwargs(options
, keywords
, 0, numberof(values
), values
);
1782 return ractor_selector__wait(selector
,
1783 values
[0] == Qundef
? Qfalse
: RTEST(values
[0]),
1784 values
[1] != Qundef
, values
[1], values
[2]);
1788 ractor_selector_new(int argc
, VALUE
*ractors
, VALUE klass
)
1790 VALUE selector
= ractor_selector_create(klass
);
1792 for (int i
=0; i
<argc
; i
++) {
1793 ractor_selector_add(selector
, ractors
[i
]);
1800 ractor_select_internal(rb_execution_context_t
*ec
, VALUE self
, VALUE ractors
, VALUE do_receive
, VALUE do_yield
, VALUE yield_value
, VALUE move
)
1802 VALUE selector
= ractor_selector_new(RARRAY_LENINT(ractors
), (VALUE
*)RARRAY_CONST_PTR(ractors
), rb_cRactorSelector
);
1807 if ((state
= EC_EXEC_TAG() == TAG_NONE
)) {
1808 result
= ractor_selector__wait(selector
, do_receive
, do_yield
, yield_value
, move
);
1812 ractor_selector_clear(selector
);
1815 EC_JUMP_TAG(ec
, state
);
1819 RB_GC_GUARD(ractors
);
1823 // Ractor#close_incoming
1826 ractor_close_incoming(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1832 if (!r
->sync
.incoming_port_closed
) {
1834 r
->sync
.incoming_port_closed
= true;
1835 if (ractor_wakeup(r
, wait_receiving
, wakeup_by_close
)) {
1836 VM_ASSERT(ractor_queue_empty_p(r
, &r
->sync
.recv_queue
));
1837 RUBY_DEBUG_LOG("cancel receiving");
1848 // Ractor#close_outgoing
1851 ractor_close_outgoing(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1857 struct rb_ractor_queue
*ts
= &r
->sync
.takers_queue
;
1859 struct rb_ractor_basket b
;
1861 if (!r
->sync
.outgoing_port_closed
) {
1863 r
->sync
.outgoing_port_closed
= true;
1866 VM_ASSERT(ractor_queue_empty_p(r
, ts
));
1870 // wakeup all taking ractors
1871 while (ractor_queue_deq(r
, ts
, &b
)) {
1872 if (basket_type_p(&b
, basket_type_take_basket
)) {
1873 tr
= RACTOR_PTR(b
.sender
);
1874 struct rb_ractor_basket
*tb
= b
.p
.take
.basket
;
1876 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_none
, basket_type_yielding
) == basket_type_none
) {
1877 b
.p
.take
.basket
->sender
= r
->pub
.self
;
1878 if (RUBY_ATOMIC_CAS(tb
->type
.atomic
, basket_type_yielding
, basket_type_deleted
) != basket_type_yielding
) {
1879 rb_bug("unreachable");
1881 RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b
.sender
)));
1884 if (b
.p
.take
.config
) {
1885 b
.p
.take
.config
->closed
= true;
1888 // TODO: deadlock-able?
1891 ractor_wakeup(tr
, wait_taking
, wakeup_by_close
);
1897 // raising yielding Ractor
1898 ractor_wakeup(r
, wait_yielding
, wakeup_by_close
);
1900 VM_ASSERT(ractor_queue_empty_p(r
, ts
));
1906 // creation/termination
1909 ractor_next_id(void)
1913 id
= (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id
, 1) + 1);
1919 vm_insert_ractor0(rb_vm_t
*vm
, rb_ractor_t
*r
, bool single_ractor_mode
)
1921 RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r
->pub
.id
, vm
->ractor
.cnt
);
1922 VM_ASSERT(single_ractor_mode
|| RB_VM_LOCKED_P());
1924 ccan_list_add_tail(&vm
->ractor
.set
, &r
->vmlr_node
);
1929 cancel_single_ractor_mode(void)
1931 // enable multi-ractor mode
1932 RUBY_DEBUG_LOG("enable multi-ractor mode");
1934 VALUE was_disabled
= rb_gc_enable();
1942 ruby_single_main_ractor
= NULL
;
1946 vm_insert_ractor(rb_vm_t
*vm
, rb_ractor_t
*r
)
1948 VM_ASSERT(ractor_status_p(r
, ractor_created
));
1950 if (rb_multi_ractor_p()) {
1953 vm_insert_ractor0(vm
, r
, false);
1954 vm_ractor_blocking_cnt_inc(vm
, r
, __FILE__
, __LINE__
);
1959 if (vm
->ractor
.cnt
== 0) {
1961 vm_insert_ractor0(vm
, r
, true);
1962 ractor_status_set(r
, ractor_blocking
);
1963 ractor_status_set(r
, ractor_running
);
1966 cancel_single_ractor_mode();
1967 vm_insert_ractor0(vm
, r
, true);
1968 vm_ractor_blocking_cnt_inc(vm
, r
, __FILE__
, __LINE__
);
1974 vm_remove_ractor(rb_vm_t
*vm
, rb_ractor_t
*cr
)
1976 VM_ASSERT(ractor_status_p(cr
, ractor_running
));
1977 VM_ASSERT(vm
->ractor
.cnt
> 1);
1978 VM_ASSERT(cr
->threads
.cnt
== 1);
1982 RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
1983 vm
->ractor
.cnt
, vm
->ractor
.sync
.terminate_waiting
);
1985 VM_ASSERT(vm
->ractor
.cnt
> 0);
1986 ccan_list_del(&cr
->vmlr_node
);
1988 if (vm
->ractor
.cnt
<= 2 && vm
->ractor
.sync
.terminate_waiting
) {
1989 rb_native_cond_signal(&vm
->ractor
.sync
.terminate_cond
);
1993 /* Clear the cached freelist to prevent a memory leak. */
1994 rb_gc_ractor_newobj_cache_clear(&cr
->newobj_cache
);
1996 ractor_status_set(cr
, ractor_terminated
);
2002 ractor_alloc(VALUE klass
)
2005 VALUE rv
= TypedData_Make_Struct(klass
, rb_ractor_t
, &ractor_data_type
, r
);
2006 FL_SET_RAW(rv
, RUBY_FL_SHAREABLE
);
2008 VM_ASSERT(ractor_status_p(r
, ractor_created
));
2013 rb_ractor_main_alloc(void)
2015 rb_ractor_t
*r
= ruby_mimcalloc(1, sizeof(rb_ractor_t
));
2017 fprintf(stderr
, "[FATAL] failed to allocate memory for main ractor\n");
2020 r
->pub
.id
= ++ractor_last_id
;
2024 ruby_single_main_ractor
= r
;
2029 #if defined(HAVE_WORKING_FORK)
2031 rb_ractor_atfork(rb_vm_t
*vm
, rb_thread_t
*th
)
2033 // initialize as a main ractor
2035 vm
->ractor
.blocking_cnt
= 0;
2036 ruby_single_main_ractor
= th
->ractor
;
2037 th
->ractor
->status_
= ractor_created
;
2039 rb_ractor_living_threads_init(th
->ractor
);
2040 rb_ractor_living_threads_insert(th
->ractor
, th
);
2042 VM_ASSERT(vm
->ractor
.blocking_cnt
== 0);
2043 VM_ASSERT(vm
->ractor
.cnt
== 1);
2047 void rb_thread_sched_init(struct rb_thread_sched
*, bool atfork
);
2050 rb_ractor_living_threads_init(rb_ractor_t
*r
)
2052 ccan_list_head_init(&r
->threads
.set
);
2054 r
->threads
.blocking_cnt
= 0;
2058 ractor_init(rb_ractor_t
*r
, VALUE name
, VALUE loc
)
2060 ractor_queue_setup(&r
->sync
.recv_queue
);
2061 ractor_queue_setup(&r
->sync
.takers_queue
);
2062 rb_native_mutex_initialize(&r
->sync
.lock
);
2063 rb_native_cond_initialize(&r
->barrier_wait_cond
);
2065 #ifdef RUBY_THREAD_WIN32_H
2066 rb_native_cond_initialize(&r
->sync
.cond
);
2067 rb_native_cond_initialize(&r
->barrier_wait_cond
);
2070 // thread management
2071 rb_thread_sched_init(&r
->threads
.sched
, false);
2072 rb_ractor_living_threads_init(r
);
2077 StringValueCStr(name
);
2078 enc
= rb_enc_get(name
);
2079 if (!rb_enc_asciicompat(enc
)) {
2080 rb_raise(rb_eArgError
, "ASCII incompatible encoding (%s)",
2083 name
= rb_str_new_frozen(name
);
2090 rb_ractor_main_setup(rb_vm_t
*vm
, rb_ractor_t
*r
, rb_thread_t
*th
)
2092 r
->pub
.self
= TypedData_Wrap_Struct(rb_cRactor
, &ractor_data_type
, r
);
2093 FL_SET_RAW(r
->pub
.self
, RUBY_FL_SHAREABLE
);
2094 ractor_init(r
, Qnil
, Qnil
);
2095 r
->threads
.main
= th
;
2096 rb_ractor_living_threads_insert(r
, th
);
2100 ractor_create(rb_execution_context_t
*ec
, VALUE self
, VALUE loc
, VALUE name
, VALUE args
, VALUE block
)
2102 VALUE rv
= ractor_alloc(self
);
2103 rb_ractor_t
*r
= RACTOR_PTR(rv
);
2104 ractor_init(r
, name
, loc
);
2107 r
->pub
.id
= ractor_next_id();
2108 RUBY_DEBUG_LOG("r:%u", r
->pub
.id
);
2110 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
2111 r
->verbose
= cr
->verbose
;
2112 r
->debug
= cr
->debug
;
2114 rb_yjit_before_ractor_spawn();
2115 rb_rjit_before_ractor_spawn();
2116 rb_thread_create_ractor(r
, args
, block
);
2123 ractor_yield_atexit(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, VALUE v
, bool exc
)
2125 if (cr
->sync
.outgoing_port_closed
) {
2129 ASSERT_ractor_unlocking(cr
);
2131 struct rb_ractor_queue
*ts
= &cr
->sync
.takers_queue
;
2134 if (ractor_try_yield(ec
, cr
, ts
, v
, Qfalse
, exc
, true)) {
2141 if (!ractor_check_take_basket(cr
, ts
)) {
2142 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
2143 RUBY_DEBUG_LOG("leave a will");
2144 ractor_basket_fill_will(cr
, &cr
->sync
.will_basket
, v
, exc
);
2147 RUBY_DEBUG_LOG("rare timing!");
2148 retry
= true; // another ractor is waiting for the yield.
2153 if (retry
) goto retry
;
2158 rb_ractor_atexit(rb_execution_context_t
*ec
, VALUE result
)
2160 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
2161 ractor_yield_atexit(ec
, cr
, result
, false);
2165 rb_ractor_atexit_exception(rb_execution_context_t
*ec
)
2167 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
2168 ractor_yield_atexit(ec
, cr
, ec
->errinfo
, true);
2172 rb_ractor_teardown(rb_execution_context_t
*ec
)
2174 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
2175 ractor_close_incoming(ec
, cr
);
2176 ractor_close_outgoing(ec
, cr
);
2178 // sync with rb_ractor_terminate_interrupt_main_thread()
2181 VM_ASSERT(cr
->threads
.main
!= NULL
);
2182 cr
->threads
.main
= NULL
;
2188 rb_ractor_receive_parameters(rb_execution_context_t
*ec
, rb_ractor_t
*r
, int len
, VALUE
*ptr
)
2190 for (int i
=0; i
<len
; i
++) {
2191 ptr
[i
] = ractor_receive(ec
, r
);
2196 rb_ractor_send_parameters(rb_execution_context_t
*ec
, rb_ractor_t
*r
, VALUE args
)
2198 int len
= RARRAY_LENINT(args
);
2199 for (int i
=0; i
<len
; i
++) {
2200 ractor_send(ec
, r
, RARRAY_AREF(args
, i
), false);
2205 rb_ractor_main_p_(void)
2207 VM_ASSERT(rb_multi_ractor_p());
2208 rb_execution_context_t
*ec
= GET_EC();
2209 return rb_ec_ractor_ptr(ec
) == rb_ec_vm_ptr(ec
)->ractor
.main_ractor
;
2213 rb_obj_is_main_ractor(VALUE gv
)
2215 if (!rb_ractor_p(gv
)) return false;
2216 rb_ractor_t
*r
= DATA_PTR(gv
);
2217 return r
== GET_VM()->ractor
.main_ractor
;
2221 rb_ractor_living_thread_num(const rb_ractor_t
*r
)
2223 return r
->threads
.cnt
;
2226 // only for current ractor
2228 rb_ractor_thread_list(void)
2230 rb_ractor_t
*r
= GET_RACTOR();
2231 rb_thread_t
*th
= 0;
2232 VALUE ary
= rb_ary_new();
2234 ccan_list_for_each(&r
->threads
.set
, th
, lt_node
) {
2235 switch (th
->status
) {
2236 case THREAD_RUNNABLE
:
2237 case THREAD_STOPPED
:
2238 case THREAD_STOPPED_FOREVER
:
2239 rb_ary_push(ary
, th
->self
);
2249 rb_ractor_living_threads_insert(rb_ractor_t
*r
, rb_thread_t
*th
)
2251 VM_ASSERT(th
!= NULL
);
2255 RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r
->pub
.id
, r
->threads
.cnt
);
2256 ccan_list_add_tail(&r
->threads
.set
, &th
->lt_node
);
2261 // first thread for a ractor
2262 if (r
->threads
.cnt
== 1) {
2263 VM_ASSERT(ractor_status_p(r
, ractor_created
));
2264 vm_insert_ractor(th
->vm
, r
);
2269 vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*r
, const char *file
, int line
)
2271 ractor_status_set(r
, ractor_blocking
);
2273 RUBY_DEBUG_LOG2(file
, line
, "vm->ractor.blocking_cnt:%d++", vm
->ractor
.blocking_cnt
);
2274 vm
->ractor
.blocking_cnt
++;
2275 VM_ASSERT(vm
->ractor
.blocking_cnt
<= vm
->ractor
.cnt
);
2279 rb_vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*cr
, const char *file
, int line
)
2281 ASSERT_vm_locking();
2282 VM_ASSERT(GET_RACTOR() == cr
);
2283 vm_ractor_blocking_cnt_inc(vm
, cr
, file
, line
);
2287 rb_vm_ractor_blocking_cnt_dec(rb_vm_t
*vm
, rb_ractor_t
*cr
, const char *file
, int line
)
2289 ASSERT_vm_locking();
2290 VM_ASSERT(GET_RACTOR() == cr
);
2292 RUBY_DEBUG_LOG2(file
, line
, "vm->ractor.blocking_cnt:%d--", vm
->ractor
.blocking_cnt
);
2293 VM_ASSERT(vm
->ractor
.blocking_cnt
> 0);
2294 vm
->ractor
.blocking_cnt
--;
2296 ractor_status_set(cr
, ractor_running
);
2300 ractor_check_blocking(rb_ractor_t
*cr
, unsigned int remained_thread_cnt
, const char *file
, int line
)
2302 VM_ASSERT(cr
== GET_RACTOR());
2304 RUBY_DEBUG_LOG2(file
, line
,
2305 "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
2306 cr
->threads
.cnt
, cr
->threads
.blocking_cnt
,
2307 GET_VM()->ractor
.cnt
, GET_VM()->ractor
.blocking_cnt
);
2309 VM_ASSERT(cr
->threads
.cnt
>= cr
->threads
.blocking_cnt
+ 1);
2311 if (remained_thread_cnt
> 0 &&
2313 cr
->threads
.cnt
== cr
->threads
.blocking_cnt
+ 1) {
2314 // change ractor status: running -> blocking
2315 rb_vm_t
*vm
= GET_VM();
2316 ASSERT_vm_unlocking();
2320 rb_vm_ractor_blocking_cnt_inc(vm
, cr
, file
, line
);
2326 void rb_threadptr_remove(rb_thread_t
*th
);
2329 rb_ractor_living_threads_remove(rb_ractor_t
*cr
, rb_thread_t
*th
)
2331 VM_ASSERT(cr
== GET_RACTOR());
2332 RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr
->threads
.cnt
);
2333 ractor_check_blocking(cr
, cr
->threads
.cnt
- 1, __FILE__
, __LINE__
);
2335 rb_threadptr_remove(th
);
2337 if (cr
->threads
.cnt
== 1) {
2338 vm_remove_ractor(th
->vm
, cr
);
2343 ccan_list_del(&th
->lt_node
);
2351 rb_ractor_blocking_threads_inc(rb_ractor_t
*cr
, const char *file
, int line
)
2353 RUBY_DEBUG_LOG2(file
, line
, "cr->threads.blocking_cnt:%d++", cr
->threads
.blocking_cnt
);
2355 VM_ASSERT(cr
->threads
.cnt
> 0);
2356 VM_ASSERT(cr
== GET_RACTOR());
2358 ractor_check_blocking(cr
, cr
->threads
.cnt
, __FILE__
, __LINE__
);
2359 cr
->threads
.blocking_cnt
++;
2363 rb_ractor_blocking_threads_dec(rb_ractor_t
*cr
, const char *file
, int line
)
2365 RUBY_DEBUG_LOG2(file
, line
,
2366 "r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
2367 cr
->threads
.blocking_cnt
, cr
->threads
.cnt
);
2369 VM_ASSERT(cr
== GET_RACTOR());
2371 if (cr
->threads
.cnt
== cr
->threads
.blocking_cnt
) {
2372 rb_vm_t
*vm
= GET_VM();
2376 rb_vm_ractor_blocking_cnt_dec(vm
, cr
, __FILE__
, __LINE__
);
2381 cr
->threads
.blocking_cnt
--;
2385 rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t
*r
)
2387 VM_ASSERT(r
!= GET_RACTOR());
2388 ASSERT_ractor_unlocking(r
);
2389 ASSERT_vm_locking();
2393 if (ractor_status_p(r
, ractor_running
)) {
2394 rb_execution_context_t
*ec
= r
->threads
.running_ec
;
2396 RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec
);
2404 rb_ractor_terminate_interrupt_main_thread(rb_ractor_t
*r
)
2406 VM_ASSERT(r
!= GET_RACTOR());
2407 ASSERT_ractor_unlocking(r
);
2408 ASSERT_vm_locking();
2410 rb_thread_t
*main_th
= r
->threads
.main
;
2412 if (main_th
->status
!= THREAD_KILLED
) {
2413 RUBY_VM_SET_TERMINATE_INTERRUPT(main_th
->ec
);
2414 rb_threadptr_interrupt(main_th
);
2417 RUBY_DEBUG_LOG("killed (%p)", (void *)main_th
);
2422 void rb_thread_terminate_all(rb_thread_t
*th
); // thread.c
2425 ractor_terminal_interrupt_all(rb_vm_t
*vm
)
2427 if (vm
->ractor
.cnt
> 1) {
2428 // send terminate notification to all ractors
2430 ccan_list_for_each(&vm
->ractor
.set
, r
, vmlr_node
) {
2431 if (r
!= vm
->ractor
.main_ractor
) {
2432 RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r
));
2433 rb_ractor_terminate_interrupt_main_thread(r
);
2439 void rb_add_running_thread(rb_thread_t
*th
);
2440 void rb_del_running_thread(rb_thread_t
*th
);
2443 rb_ractor_terminate_all(void)
2445 rb_vm_t
*vm
= GET_VM();
2446 rb_ractor_t
*cr
= vm
->ractor
.main_ractor
;
2448 RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm
->ractor
.cnt
);
2450 VM_ASSERT(cr
== GET_RACTOR()); // only main-ractor's main-thread should kick it.
2452 if (vm
->ractor
.cnt
> 1) {
2455 ractor_terminal_interrupt_all(vm
); // kill all ractors
2459 rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
2463 while (vm
->ractor
.cnt
> 1) {
2464 RUBY_DEBUG_LOG("terminate_waiting:%d", vm
->ractor
.sync
.terminate_waiting
);
2465 vm
->ractor
.sync
.terminate_waiting
= true;
2468 rb_vm_ractor_blocking_cnt_inc(vm
, cr
, __FILE__
, __LINE__
);
2469 rb_del_running_thread(rb_ec_thread_ptr(cr
->threads
.running_ec
));
2470 rb_vm_cond_timedwait(vm
, &vm
->ractor
.sync
.terminate_cond
, 1000 /* ms */);
2471 rb_add_running_thread(rb_ec_thread_ptr(cr
->threads
.running_ec
));
2472 rb_vm_ractor_blocking_cnt_dec(vm
, cr
, __FILE__
, __LINE__
);
2474 ractor_terminal_interrupt_all(vm
);
2480 rb_execution_context_t
*
2481 rb_vm_main_ractor_ec(rb_vm_t
*vm
)
2483 /* This code needs to carefully work around two bugs:
2484 * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is
2485 * actually currently running (as opposed to without M:N threading, when
2486 * running_ec will still point to the _last_ thread which ran)
2487 * - Bug #20197: If the main thread is sleeping, setting its postponed job
2488 * interrupt flag is pointless; it won't look at the flag until it stops sleeping
2489 * for some reason. It would be better to set the flag on the running ec, which
2490 * will presumably look at it soon.
2492 * Solution: use running_ec if it's set, otherwise fall back to the main thread ec.
2493 * This is still susceptible to some rare race conditions (what if the last thread
2494 * to run just entered a long-running sleep?), but seems like the best balance of
2495 * robustness and complexity.
2497 rb_execution_context_t
*running_ec
= vm
->ractor
.main_ractor
->threads
.running_ec
;
2498 if (running_ec
) { return running_ec
; }
2499 return vm
->ractor
.main_thread
->ec
;
2503 ractor_moved_missing(int argc
, VALUE
*argv
, VALUE self
)
2505 rb_raise(rb_eRactorMovedError
, "can not send any methods to a moved object");
2508 #ifndef USE_RACTOR_SELECTOR
2509 #define USE_RACTOR_SELECTOR 0
2512 RUBY_SYMBOL_EXPORT_BEGIN
2513 void rb_init_ractor_selector(void);
2514 RUBY_SYMBOL_EXPORT_END
2517 rb_init_ractor_selector(void)
2519 rb_cRactorSelector
= rb_define_class_under(rb_cRactor
, "Selector", rb_cObject
);
2520 rb_undef_alloc_func(rb_cRactorSelector
);
2522 rb_define_singleton_method(rb_cRactorSelector
, "new", ractor_selector_new
, -1);
2523 rb_define_method(rb_cRactorSelector
, "add", ractor_selector_add
, 1);
2524 rb_define_method(rb_cRactorSelector
, "remove", ractor_selector_remove
, 1);
2525 rb_define_method(rb_cRactorSelector
, "clear", ractor_selector_clear
, 0);
2526 rb_define_method(rb_cRactorSelector
, "empty?", ractor_selector_empty_p
, 0);
2527 rb_define_method(rb_cRactorSelector
, "wait", ractor_selector_wait
, -1);
2528 rb_define_method(rb_cRactorSelector
, "_wait", ractor_selector__wait
, 4);
2532 * Document-class: Ractor::ClosedError
2534 * Raised when an attempt is made to send a message to a closed port,
2535 * or to retrieve a message from a closed and empty port.
2536 * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
2537 * and are closed implicitly when a Ractor terminates.
2539 * r = Ractor.new { sleep(500) }
2541 * r.take # Ractor::ClosedError
2543 * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
2544 * the loops without propagating the error:
2548 * msg = receive # raises ClosedError and loop traps it
2549 * puts "Received: #{msg}"
2551 * puts "loop exited"
2554 * 3.times{|i| r << i}
2557 * puts "Continue successfully"
2565 * Continue successfully
2569 * Document-class: Ractor::RemoteError
2571 * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
2572 * Its +cause+ will contain the original exception, and +ractor+ is the original ractor
2575 * r = Ractor.new { raise "Something weird happened" }
2580 * p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
2581 * p e.ractor == r # => true
2582 * p e.cause # => #<RuntimeError: Something weird happened>
2588 * Document-class: Ractor::MovedError
2590 * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
2592 * r = Ractor.new { sleep }
2595 * r.send(ary, move: true)
2597 * # Ractor::MovedError (can not send any methods to a moved object)
2602 * Document-class: Ractor::MovedObject
2604 * A special object which replaces any value that was moved to another ractor in Ractor#send
2605 * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
2607 * r = Ractor.new { receive }
2610 * r.send(ary, move: true)
2611 * p Ractor::MovedObject === ary
2614 * # Ractor::MovedError (can not send any methods to a moved object)
2617 // Main docs are in ractor.rb, but without this clause there are weird artifacts
2618 // in their rendering.
2620 * Document-class: Ractor
2627 rb_cRactor
= rb_define_class("Ractor", rb_cObject
);
2628 rb_undef_alloc_func(rb_cRactor
);
2630 rb_eRactorError
= rb_define_class_under(rb_cRactor
, "Error", rb_eRuntimeError
);
2631 rb_eRactorIsolationError
= rb_define_class_under(rb_cRactor
, "IsolationError", rb_eRactorError
);
2632 rb_eRactorRemoteError
= rb_define_class_under(rb_cRactor
, "RemoteError", rb_eRactorError
);
2633 rb_eRactorMovedError
= rb_define_class_under(rb_cRactor
, "MovedError", rb_eRactorError
);
2634 rb_eRactorClosedError
= rb_define_class_under(rb_cRactor
, "ClosedError", rb_eStopIteration
);
2635 rb_eRactorUnsafeError
= rb_define_class_under(rb_cRactor
, "UnsafeError", rb_eRactorError
);
2637 rb_cRactorMovedObject
= rb_define_class_under(rb_cRactor
, "MovedObject", rb_cBasicObject
);
2638 rb_undef_alloc_func(rb_cRactorMovedObject
);
2639 rb_define_method(rb_cRactorMovedObject
, "method_missing", ractor_moved_missing
, -1);
2641 // override methods defined in BasicObject
2642 rb_define_method(rb_cRactorMovedObject
, "__send__", ractor_moved_missing
, -1);
2643 rb_define_method(rb_cRactorMovedObject
, "!", ractor_moved_missing
, -1);
2644 rb_define_method(rb_cRactorMovedObject
, "==", ractor_moved_missing
, -1);
2645 rb_define_method(rb_cRactorMovedObject
, "!=", ractor_moved_missing
, -1);
2646 rb_define_method(rb_cRactorMovedObject
, "__id__", ractor_moved_missing
, -1);
2647 rb_define_method(rb_cRactorMovedObject
, "equal?", ractor_moved_missing
, -1);
2648 rb_define_method(rb_cRactorMovedObject
, "instance_eval", ractor_moved_missing
, -1);
2649 rb_define_method(rb_cRactorMovedObject
, "instance_exec", ractor_moved_missing
, -1);
2651 #if USE_RACTOR_SELECTOR
2652 rb_init_ractor_selector();
2657 rb_ractor_dump(void)
2659 rb_vm_t
*vm
= GET_VM();
2662 ccan_list_for_each(&vm
->ractor
.set
, r
, vmlr_node
) {
2663 if (r
!= vm
->ractor
.main_ractor
) {
2664 fprintf(stderr
, "r:%u (%s)\n", r
->pub
.id
, ractor_status_str(r
->status_
));
2670 rb_ractor_stdin(void)
2672 if (rb_ractor_main_p()) {
2676 rb_ractor_t
*cr
= GET_RACTOR();
2682 rb_ractor_stdout(void)
2684 if (rb_ractor_main_p()) {
2688 rb_ractor_t
*cr
= GET_RACTOR();
2689 return cr
->r_stdout
;
2694 rb_ractor_stderr(void)
2696 if (rb_ractor_main_p()) {
2700 rb_ractor_t
*cr
= GET_RACTOR();
2701 return cr
->r_stderr
;
2706 rb_ractor_stdin_set(VALUE in
)
2708 if (rb_ractor_main_p()) {
2712 rb_ractor_t
*cr
= GET_RACTOR();
2713 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stdin
, in
);
2718 rb_ractor_stdout_set(VALUE out
)
2720 if (rb_ractor_main_p()) {
2724 rb_ractor_t
*cr
= GET_RACTOR();
2725 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stdout
, out
);
2730 rb_ractor_stderr_set(VALUE err
)
2732 if (rb_ractor_main_p()) {
2736 rb_ractor_t
*cr
= GET_RACTOR();
2737 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stderr
, err
);
2742 rb_ractor_hooks(rb_ractor_t
*cr
)
2744 return &cr
->pub
.hooks
;
2747 /// traverse function
2753 enum obj_traverse_iterator_result
{
2759 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func
)(VALUE obj
);
2760 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func
)(VALUE obj
);
2761 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func
)(VALUE obj
);
2763 static enum obj_traverse_iterator_result
null_leave(VALUE obj
);
2765 struct obj_traverse_data
{
2766 rb_obj_traverse_enter_func enter_func
;
2767 rb_obj_traverse_leave_func leave_func
;
2774 struct obj_traverse_callback_data
{
2776 struct obj_traverse_data
*data
;
2779 static int obj_traverse_i(VALUE obj
, struct obj_traverse_data
*data
);
2782 obj_hash_traverse_i(VALUE key
, VALUE val
, VALUE ptr
)
2784 struct obj_traverse_callback_data
*d
= (struct obj_traverse_callback_data
*)ptr
;
2786 if (obj_traverse_i(key
, d
->data
)) {
2791 if (obj_traverse_i(val
, d
->data
)) {
2800 obj_traverse_reachable_i(VALUE obj
, void *ptr
)
2802 struct obj_traverse_callback_data
*d
= (struct obj_traverse_callback_data
*)ptr
;
2804 if (obj_traverse_i(obj
, d
->data
)) {
2809 static struct st_table
*
2810 obj_traverse_rec(struct obj_traverse_data
*data
)
2812 if (UNLIKELY(!data
->rec
)) {
2813 data
->rec_hash
= rb_ident_hash_new();
2814 data
->rec
= RHASH_ST_TABLE(data
->rec_hash
);
2820 obj_traverse_ivar_foreach_i(ID key
, VALUE val
, st_data_t ptr
)
2822 struct obj_traverse_callback_data
*d
= (struct obj_traverse_callback_data
*)ptr
;
2824 if (obj_traverse_i(val
, d
->data
)) {
2833 obj_traverse_i(VALUE obj
, struct obj_traverse_data
*data
)
2835 if (RB_SPECIAL_CONST_P(obj
)) return 0;
2837 switch (data
->enter_func(obj
)) {
2838 case traverse_cont
: break;
2839 case traverse_skip
: return 0; // skip children
2840 case traverse_stop
: return 1; // stop search
2843 if (UNLIKELY(st_insert(obj_traverse_rec(data
), obj
, 1))) {
2844 // already traversed
2848 struct obj_traverse_callback_data d
= {
2852 rb_ivar_foreach(obj
, obj_traverse_ivar_foreach_i
, (st_data_t
)&d
);
2853 if (d
.stop
) return 1;
2855 switch (BUILTIN_TYPE(obj
)) {
2867 /* Instance variables already traversed. */
2872 for (int i
= 0; i
< RARRAY_LENINT(obj
); i
++) {
2873 VALUE e
= rb_ary_entry(obj
, i
);
2874 if (obj_traverse_i(e
, data
)) return 1;
2881 if (obj_traverse_i(RHASH_IFNONE(obj
), data
)) return 1;
2883 struct obj_traverse_callback_data d
= {
2887 rb_hash_foreach(obj
, obj_hash_traverse_i
, (VALUE
)&d
);
2888 if (d
.stop
) return 1;
2894 long len
= RSTRUCT_LEN(obj
);
2895 const VALUE
*ptr
= RSTRUCT_CONST_PTR(obj
);
2897 for (long i
=0; i
<len
; i
++) {
2898 if (obj_traverse_i(ptr
[i
], data
)) return 1;
2904 if (obj_traverse_i(RRATIONAL(obj
)->num
, data
)) return 1;
2905 if (obj_traverse_i(RRATIONAL(obj
)->den
, data
)) return 1;
2908 if (obj_traverse_i(RCOMPLEX(obj
)->real
, data
)) return 1;
2909 if (obj_traverse_i(RCOMPLEX(obj
)->imag
, data
)) return 1;
2915 struct obj_traverse_callback_data d
= {
2919 RB_VM_LOCK_ENTER_NO_BARRIER();
2921 rb_objspace_reachable_objects_from(obj
, obj_traverse_reachable_i
, &d
);
2923 RB_VM_LOCK_LEAVE_NO_BARRIER();
2924 if (d
.stop
) return 1;
2934 rb_bug("unreachable");
2937 if (data
->leave_func(obj
) == traverse_stop
) {
2945 struct rb_obj_traverse_final_data
{
2946 rb_obj_traverse_final_func final_func
;
2951 obj_traverse_final_i(st_data_t key
, st_data_t val
, st_data_t arg
)
2953 struct rb_obj_traverse_final_data
*data
= (void *)arg
;
2954 if (data
->final_func(key
)) {
2964 rb_obj_traverse(VALUE obj
,
2965 rb_obj_traverse_enter_func enter_func
,
2966 rb_obj_traverse_leave_func leave_func
,
2967 rb_obj_traverse_final_func final_func
)
2969 struct obj_traverse_data data
= {
2970 .enter_func
= enter_func
,
2971 .leave_func
= leave_func
,
2975 if (obj_traverse_i(obj
, &data
)) return 1;
2976 if (final_func
&& data
.rec
) {
2977 struct rb_obj_traverse_final_data f
= {final_func
, 0};
2978 st_foreach(data
.rec
, obj_traverse_final_i
, (st_data_t
)&f
);
2985 frozen_shareable_p(VALUE obj
, bool *made_shareable
)
2987 if (CHILLED_STRING_P(obj
)) {
2990 else if (!RB_TYPE_P(obj
, T_DATA
)) {
2993 else if (RTYPEDDATA_P(obj
)) {
2994 const rb_data_type_t
*type
= RTYPEDDATA_TYPE(obj
);
2995 if (type
->flags
& RUBY_TYPED_FROZEN_SHAREABLE
) {
2998 else if (made_shareable
&& rb_obj_is_proc(obj
)) {
2999 // special path to make shareable Proc.
3000 rb_proc_ractor_make_shareable(obj
);
3001 *made_shareable
= true;
3002 VM_ASSERT(RB_OBJ_SHAREABLE_P(obj
));
3010 static enum obj_traverse_iterator_result
3011 make_shareable_check_shareable(VALUE obj
)
3013 VM_ASSERT(!SPECIAL_CONST_P(obj
));
3014 bool made_shareable
= false;
3016 if (rb_ractor_shareable_p(obj
)) {
3017 return traverse_skip
;
3019 else if (CHILLED_STRING_P(obj
)) {
3020 rb_funcall(obj
, idFreeze
, 0);
3022 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj
))) {
3023 rb_raise(rb_eRactorError
, "#freeze does not freeze object correctly");
3026 if (RB_OBJ_SHAREABLE_P(obj
)) {
3027 return traverse_skip
;
3030 else if (!frozen_shareable_p(obj
, &made_shareable
)) {
3031 if (made_shareable
) {
3032 return traverse_skip
;
3035 rb_raise(rb_eRactorError
, "can not make shareable object for %"PRIsVALUE
, obj
);
3039 if (!RB_OBJ_FROZEN_RAW(obj
)) {
3040 rb_funcall(obj
, idFreeze
, 0);
3042 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj
))) {
3043 rb_raise(rb_eRactorError
, "#freeze does not freeze object correctly");
3046 if (RB_OBJ_SHAREABLE_P(obj
)) {
3047 return traverse_skip
;
3051 return traverse_cont
;
3054 static enum obj_traverse_iterator_result
3055 mark_shareable(VALUE obj
)
3057 FL_SET_RAW(obj
, RUBY_FL_SHAREABLE
);
3058 return traverse_cont
;
3062 rb_ractor_make_shareable(VALUE obj
)
3064 rb_obj_traverse(obj
,
3065 make_shareable_check_shareable
,
3066 null_leave
, mark_shareable
);
3071 rb_ractor_make_shareable_copy(VALUE obj
)
3073 VALUE copy
= ractor_copy(obj
);
3074 return rb_ractor_make_shareable(copy
);
3078 rb_ractor_ensure_shareable(VALUE obj
, VALUE name
)
3080 if (!rb_ractor_shareable_p(obj
)) {
3081 VALUE message
= rb_sprintf("cannot assign unshareable object to %"PRIsVALUE
,
3083 rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError
, message
));
3089 rb_ractor_ensure_main_ractor(const char *msg
)
3091 if (!rb_ractor_main_p()) {
3092 rb_raise(rb_eRactorIsolationError
, "%s", msg
);
3096 static enum obj_traverse_iterator_result
3097 shareable_p_enter(VALUE obj
)
3099 if (RB_OBJ_SHAREABLE_P(obj
)) {
3100 return traverse_skip
;
3102 else if (RB_TYPE_P(obj
, T_CLASS
) ||
3103 RB_TYPE_P(obj
, T_MODULE
) ||
3104 RB_TYPE_P(obj
, T_ICLASS
)) {
3106 mark_shareable(obj
);
3107 return traverse_skip
;
3109 else if (RB_OBJ_FROZEN_RAW(obj
) &&
3110 frozen_shareable_p(obj
, NULL
)) {
3111 return traverse_cont
;
3114 return traverse_stop
; // fail
3118 rb_ractor_shareable_p_continue(VALUE obj
)
3120 if (rb_obj_traverse(obj
,
3121 shareable_p_enter
, null_leave
,
3130 #if RACTOR_CHECK_MODE > 0
3131 static enum obj_traverse_iterator_result
3132 reset_belonging_enter(VALUE obj
)
3134 if (rb_ractor_shareable_p(obj
)) {
3135 return traverse_skip
;
3138 rb_ractor_setup_belonging(obj
);
3139 return traverse_cont
;
3144 static enum obj_traverse_iterator_result
3145 null_leave(VALUE obj
)
3147 return traverse_cont
;
3151 ractor_reset_belonging(VALUE obj
)
3153 #if RACTOR_CHECK_MODE > 0
3154 rb_obj_traverse(obj
, reset_belonging_enter
, null_leave
, NULL
);
3160 /// traverse and replace function
3166 struct obj_traverse_replace_data
;
3167 static int obj_traverse_replace_i(VALUE obj
, struct obj_traverse_replace_data
*data
);
3168 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func
)(VALUE obj
, struct obj_traverse_replace_data
*data
);
3169 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func
)(VALUE obj
, struct obj_traverse_replace_data
*data
);
3171 struct obj_traverse_replace_data
{
3172 rb_obj_traverse_replace_enter_func enter_func
;
3173 rb_obj_traverse_replace_leave_func leave_func
;
3182 struct obj_traverse_replace_callback_data
{
3185 struct obj_traverse_replace_data
*data
;
3189 obj_hash_traverse_replace_foreach_i(st_data_t key
, st_data_t value
, st_data_t argp
, int error
)
3195 obj_hash_traverse_replace_i(st_data_t
*key
, st_data_t
*val
, st_data_t ptr
, int exists
)
3197 struct obj_traverse_replace_callback_data
*d
= (struct obj_traverse_replace_callback_data
*)ptr
;
3198 struct obj_traverse_replace_data
*data
= d
->data
;
3200 if (obj_traverse_replace_i(*key
, data
)) {
3204 else if (*key
!= data
->replacement
) {
3205 VALUE v
= *key
= data
->replacement
;
3206 RB_OBJ_WRITTEN(d
->src
, Qundef
, v
);
3209 if (obj_traverse_replace_i(*val
, data
)) {
3213 else if (*val
!= data
->replacement
) {
3214 VALUE v
= *val
= data
->replacement
;
3215 RB_OBJ_WRITTEN(d
->src
, Qundef
, v
);
3222 obj_iv_hash_traverse_replace_foreach_i(st_data_t _key
, st_data_t _val
, st_data_t _data
, int _x
)
3228 obj_iv_hash_traverse_replace_i(st_data_t
* _key
, st_data_t
* val
, st_data_t ptr
, int exists
)
3230 struct obj_traverse_replace_callback_data
*d
= (struct obj_traverse_replace_callback_data
*)ptr
;
3231 struct obj_traverse_replace_data
*data
= d
->data
;
3233 if (obj_traverse_replace_i(*(VALUE
*)val
, data
)) {
3237 else if (*(VALUE
*)val
!= data
->replacement
) {
3238 VALUE v
= *(VALUE
*)val
= data
->replacement
;
3239 RB_OBJ_WRITTEN(d
->src
, Qundef
, v
);
3245 static struct st_table
*
3246 obj_traverse_replace_rec(struct obj_traverse_replace_data
*data
)
3248 if (UNLIKELY(!data
->rec
)) {
3249 data
->rec_hash
= rb_ident_hash_new();
3250 data
->rec
= RHASH_ST_TABLE(data
->rec_hash
);
3256 obj_refer_only_shareables_p_i(VALUE obj
, void *ptr
)
3258 int *pcnt
= (int *)ptr
;
3260 if (!rb_ractor_shareable_p(obj
)) {
3266 obj_refer_only_shareables_p(VALUE obj
)
3269 RB_VM_LOCK_ENTER_NO_BARRIER();
3271 rb_objspace_reachable_objects_from(obj
, obj_refer_only_shareables_p_i
, &cnt
);
3273 RB_VM_LOCK_LEAVE_NO_BARRIER();
3278 obj_traverse_replace_i(VALUE obj
, struct obj_traverse_replace_data
*data
)
3280 st_data_t replacement
;
3282 if (RB_SPECIAL_CONST_P(obj
)) {
3283 data
->replacement
= obj
;
3287 switch (data
->enter_func(obj
, data
)) {
3288 case traverse_cont
: break;
3289 case traverse_skip
: return 0; // skip children
3290 case traverse_stop
: return 1; // stop search
3293 replacement
= (st_data_t
)data
->replacement
;
3295 if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data
), (st_data_t
)obj
, &replacement
))) {
3296 data
->replacement
= (VALUE
)replacement
;
3300 st_insert(obj_traverse_replace_rec(data
), (st_data_t
)obj
, replacement
);
3307 #define CHECK_AND_REPLACE(v) do { \
3309 if (obj_traverse_replace_i(_val, data)) { return 1; } \
3310 else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
3313 if (UNLIKELY(FL_TEST_RAW(obj
, FL_EXIVAR
))) {
3314 struct gen_ivtbl
*ivtbl
;
3315 rb_ivar_generic_ivtbl_lookup(obj
, &ivtbl
);
3317 if (UNLIKELY(rb_shape_obj_too_complex(obj
))) {
3318 struct obj_traverse_replace_callback_data d
= {
3323 rb_st_foreach_with_replace(
3324 ivtbl
->as
.complex.table
,
3325 obj_iv_hash_traverse_replace_foreach_i
,
3326 obj_iv_hash_traverse_replace_i
,
3329 if (d
.stop
) return 1;
3332 for (uint32_t i
= 0; i
< ivtbl
->as
.shape
.numiv
; i
++) {
3333 if (!UNDEF_P(ivtbl
->as
.shape
.ivptr
[i
])) {
3334 CHECK_AND_REPLACE(ivtbl
->as
.shape
.ivptr
[i
]);
3340 switch (BUILTIN_TYPE(obj
)) {
3350 rb_str_make_independent(obj
);
3355 if (rb_shape_obj_too_complex(obj
)) {
3356 struct obj_traverse_replace_callback_data d
= {
3361 rb_st_foreach_with_replace(
3362 ROBJECT_IV_HASH(obj
),
3363 obj_iv_hash_traverse_replace_foreach_i
,
3364 obj_iv_hash_traverse_replace_i
,
3367 if (d
.stop
) return 1;
3370 uint32_t len
= ROBJECT_IV_COUNT(obj
);
3371 VALUE
*ptr
= ROBJECT_IVPTR(obj
);
3373 for (uint32_t i
= 0; i
< len
; i
++) {
3374 CHECK_AND_REPLACE(ptr
[i
]);
3382 rb_ary_cancel_sharing(obj
);
3384 for (int i
= 0; i
< RARRAY_LENINT(obj
); i
++) {
3385 VALUE e
= rb_ary_entry(obj
, i
);
3387 if (obj_traverse_replace_i(e
, data
)) {
3390 else if (e
!= data
->replacement
) {
3391 RARRAY_ASET(obj
, i
, data
->replacement
);
3399 struct obj_traverse_replace_callback_data d
= {
3404 rb_hash_stlike_foreach_with_replace(obj
,
3405 obj_hash_traverse_replace_foreach_i
,
3406 obj_hash_traverse_replace_i
,
3408 if (d
.stop
) return 1;
3409 // TODO: rehash here?
3411 VALUE ifnone
= RHASH_IFNONE(obj
);
3412 if (obj_traverse_replace_i(ifnone
, data
)) {
3415 else if (ifnone
!= data
->replacement
) {
3416 RHASH_SET_IFNONE(obj
, data
->replacement
);
3423 long len
= RSTRUCT_LEN(obj
);
3424 const VALUE
*ptr
= RSTRUCT_CONST_PTR(obj
);
3426 for (long i
=0; i
<len
; i
++) {
3427 CHECK_AND_REPLACE(ptr
[i
]);
3433 CHECK_AND_REPLACE(RRATIONAL(obj
)->num
);
3434 CHECK_AND_REPLACE(RRATIONAL(obj
)->den
);
3437 CHECK_AND_REPLACE(RCOMPLEX(obj
)->real
);
3438 CHECK_AND_REPLACE(RCOMPLEX(obj
)->imag
);
3442 if (!data
->move
&& obj_refer_only_shareables_p(obj
)) {
3446 rb_raise(rb_eRactorError
, "can not %s %"PRIsVALUE
" object.",
3447 data
->move
? "move" : "copy", rb_class_of(obj
));
3451 // not supported yet
3460 rb_bug("unreachable");
3463 data
->replacement
= (VALUE
)replacement
;
3465 if (data
->leave_func(obj
, data
) == traverse_stop
) {
3476 rb_obj_traverse_replace(VALUE obj
,
3477 rb_obj_traverse_replace_enter_func enter_func
,
3478 rb_obj_traverse_replace_leave_func leave_func
,
3481 struct obj_traverse_replace_data data
= {
3482 .enter_func
= enter_func
,
3483 .leave_func
= leave_func
,
3485 .replacement
= Qundef
,
3489 if (obj_traverse_replace_i(obj
, &data
)) {
3493 return data
.replacement
;
3505 static const VALUE fl_users
= FL_USER1
| FL_USER2
| FL_USER3
|
3506 FL_USER4
| FL_USER5
| FL_USER6
| FL_USER7
|
3507 FL_USER8
| FL_USER9
| FL_USER10
| FL_USER11
|
3508 FL_USER12
| FL_USER13
| FL_USER14
| FL_USER15
|
3509 FL_USER16
| FL_USER17
| FL_USER18
| FL_USER19
;
3512 ractor_moved_bang(VALUE obj
)
3514 // invalidate src object
3515 struct RVALUE
*rv
= (void *)obj
;
3517 rv
->klass
= rb_cRactorMovedObject
;
3521 rv
->flags
= rv
->flags
& ~fl_users
;
3523 if (BUILTIN_TYPE(obj
) == T_OBJECT
) ROBJECT_SET_SHAPE_ID(obj
, ROOT_SHAPE_ID
);
3525 // TODO: record moved location
3528 static enum obj_traverse_iterator_result
3529 move_enter(VALUE obj
, struct obj_traverse_replace_data
*data
)
3531 if (rb_ractor_shareable_p(obj
)) {
3532 data
->replacement
= obj
;
3533 return traverse_skip
;
3536 VALUE moved
= rb_obj_alloc(RBASIC_CLASS(obj
));
3537 rb_shape_set_shape(moved
, rb_shape_get_shape(obj
));
3538 data
->replacement
= moved
;
3539 return traverse_cont
;
3543 void rb_replace_generic_ivar(VALUE clone
, VALUE obj
); // variable.c
3545 static enum obj_traverse_iterator_result
3546 move_leave(VALUE obj
, struct obj_traverse_replace_data
*data
)
3548 VALUE v
= data
->replacement
;
3549 struct RVALUE
*dst
= (struct RVALUE
*)v
;
3550 struct RVALUE
*src
= (struct RVALUE
*)obj
;
3552 dst
->flags
= (dst
->flags
& ~fl_users
) | (src
->flags
& fl_users
);
3558 if (UNLIKELY(FL_TEST_RAW(obj
, FL_EXIVAR
))) {
3559 rb_replace_generic_ivar(v
, obj
);
3562 // TODO: generic_ivar
3564 ractor_moved_bang(obj
);
3565 return traverse_cont
;
3569 ractor_move(VALUE obj
)
3571 VALUE val
= rb_obj_traverse_replace(obj
, move_enter
, move_leave
, true);
3572 if (!UNDEF_P(val
)) {
3576 rb_raise(rb_eRactorError
, "can not move the object");
3580 static enum obj_traverse_iterator_result
3581 copy_enter(VALUE obj
, struct obj_traverse_replace_data
*data
)
3583 if (rb_ractor_shareable_p(obj
)) {
3584 data
->replacement
= obj
;
3585 return traverse_skip
;
3588 data
->replacement
= rb_obj_clone(obj
);
3589 return traverse_cont
;
3593 static enum obj_traverse_iterator_result
3594 copy_leave(VALUE obj
, struct obj_traverse_replace_data
*data
)
3596 return traverse_cont
;
3600 ractor_copy(VALUE obj
)
3602 VALUE val
= rb_obj_traverse_replace(obj
, copy_enter
, copy_leave
, false);
3603 if (!UNDEF_P(val
)) {
3607 rb_raise(rb_eRactorError
, "can not copy the object");
3611 // Ractor local storage
3613 struct rb_ractor_local_key_struct
{
3614 const struct rb_ractor_local_storage_type
*type
;
3618 static struct freed_ractor_local_keys_struct
{
3621 rb_ractor_local_key_t
*keys
;
3622 } freed_ractor_local_keys
;
3625 ractor_local_storage_mark_i(st_data_t key
, st_data_t val
, st_data_t dmy
)
3627 struct rb_ractor_local_key_struct
*k
= (struct rb_ractor_local_key_struct
*)key
;
3628 if (k
->type
->mark
) (*k
->type
->mark
)((void *)val
);
3632 static enum rb_id_table_iterator_result
3633 idkey_local_storage_mark_i(ID id
, VALUE val
, void *dmy
)
3636 return ID_TABLE_CONTINUE
;
3640 ractor_local_storage_mark(rb_ractor_t
*r
)
3642 if (r
->local_storage
) {
3643 st_foreach(r
->local_storage
, ractor_local_storage_mark_i
, 0);
3645 for (int i
=0; i
<freed_ractor_local_keys
.cnt
; i
++) {
3646 rb_ractor_local_key_t key
= freed_ractor_local_keys
.keys
[i
];
3647 st_data_t val
, k
= (st_data_t
)key
;
3648 if (st_delete(r
->local_storage
, &k
, &val
) &&
3649 (key
= (rb_ractor_local_key_t
)k
)->type
->free
) {
3650 (*key
->type
->free
)((void *)val
);
3655 if (r
->idkey_local_storage
) {
3656 rb_id_table_foreach(r
->idkey_local_storage
, idkey_local_storage_mark_i
, NULL
);
3661 ractor_local_storage_free_i(st_data_t key
, st_data_t val
, st_data_t dmy
)
3663 struct rb_ractor_local_key_struct
*k
= (struct rb_ractor_local_key_struct
*)key
;
3664 if (k
->type
->free
) (*k
->type
->free
)((void *)val
);
3669 ractor_local_storage_free(rb_ractor_t
*r
)
3671 if (r
->local_storage
) {
3672 st_foreach(r
->local_storage
, ractor_local_storage_free_i
, 0);
3673 st_free_table(r
->local_storage
);
3676 if (r
->idkey_local_storage
) {
3677 rb_id_table_free(r
->idkey_local_storage
);
3682 rb_ractor_local_storage_value_mark(void *ptr
)
3684 rb_gc_mark((VALUE
)ptr
);
3687 static const struct rb_ractor_local_storage_type ractor_local_storage_type_null
= {
3692 const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free
= {
3697 static const struct rb_ractor_local_storage_type ractor_local_storage_type_value
= {
3698 rb_ractor_local_storage_value_mark
,
3702 rb_ractor_local_key_t
3703 rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type
*type
)
3705 rb_ractor_local_key_t key
= ALLOC(struct rb_ractor_local_key_struct
);
3706 key
->type
= type
? type
: &ractor_local_storage_type_null
;
3707 key
->main_cache
= (void *)Qundef
;
3711 rb_ractor_local_key_t
3712 rb_ractor_local_storage_value_newkey(void)
3714 return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value
);
3718 rb_ractor_local_storage_delkey(rb_ractor_local_key_t key
)
3722 if (freed_ractor_local_keys
.cnt
== freed_ractor_local_keys
.capa
) {
3723 freed_ractor_local_keys
.capa
= freed_ractor_local_keys
.capa
? freed_ractor_local_keys
.capa
* 2 : 4;
3724 REALLOC_N(freed_ractor_local_keys
.keys
, rb_ractor_local_key_t
, freed_ractor_local_keys
.capa
);
3726 freed_ractor_local_keys
.keys
[freed_ractor_local_keys
.cnt
++] = key
;
3732 ractor_local_ref(rb_ractor_local_key_t key
, void **pret
)
3734 if (rb_ractor_main_p()) {
3735 if (!UNDEF_P((VALUE
)key
->main_cache
)) {
3736 *pret
= key
->main_cache
;
3744 rb_ractor_t
*cr
= GET_RACTOR();
3746 if (cr
->local_storage
&& st_lookup(cr
->local_storage
, (st_data_t
)key
, (st_data_t
*)pret
)) {
3756 ractor_local_set(rb_ractor_local_key_t key
, void *ptr
)
3758 rb_ractor_t
*cr
= GET_RACTOR();
3760 if (cr
->local_storage
== NULL
) {
3761 cr
->local_storage
= st_init_numtable();
3764 st_insert(cr
->local_storage
, (st_data_t
)key
, (st_data_t
)ptr
);
3766 if (rb_ractor_main_p()) {
3767 key
->main_cache
= ptr
;
3772 rb_ractor_local_storage_value(rb_ractor_local_key_t key
)
3775 if (ractor_local_ref(key
, &val
)) {
3784 rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key
, VALUE
*val
)
3786 if (ractor_local_ref(key
, (void **)val
)) {
3795 rb_ractor_local_storage_value_set(rb_ractor_local_key_t key
, VALUE val
)
3797 ractor_local_set(key
, (void *)val
);
3801 rb_ractor_local_storage_ptr(rb_ractor_local_key_t key
)
3804 if (ractor_local_ref(key
, &ret
)) {
3813 rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key
, void *ptr
)
3815 ractor_local_set(key
, ptr
);
3818 #define DEFAULT_KEYS_CAPA 0x10
3821 rb_ractor_finish_marking(void)
3823 for (int i
=0; i
<freed_ractor_local_keys
.cnt
; i
++) {
3824 ruby_xfree(freed_ractor_local_keys
.keys
[i
]);
3826 freed_ractor_local_keys
.cnt
= 0;
3827 if (freed_ractor_local_keys
.capa
> DEFAULT_KEYS_CAPA
) {
3828 freed_ractor_local_keys
.capa
= DEFAULT_KEYS_CAPA
;
3829 REALLOC_N(freed_ractor_local_keys
.keys
, rb_ractor_local_key_t
, DEFAULT_KEYS_CAPA
);
3834 ractor_local_value(rb_execution_context_t
*ec
, VALUE self
, VALUE sym
)
3836 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
3837 ID id
= rb_check_id(&sym
);
3838 struct rb_id_table
*tbl
= cr
->idkey_local_storage
;
3841 if (id
&& tbl
&& rb_id_table_lookup(tbl
, id
, &val
)) {
3850 ractor_local_value_set(rb_execution_context_t
*ec
, VALUE self
, VALUE sym
, VALUE val
)
3852 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
3853 ID id
= SYM2ID(rb_to_symbol(sym
));
3854 struct rb_id_table
*tbl
= cr
->idkey_local_storage
;
3857 tbl
= cr
->idkey_local_storage
= rb_id_table_create(2);
3859 rb_id_table_insert(tbl
, id
, val
);
3863 #include "ractor.rbinc"