1 // Ractor implementation
4 #include "ruby/thread.h"
5 #include "ruby/ractor.h"
6 #include "ruby/thread_native.h"
9 #include "ractor_core.h"
10 #include "internal/complex.h"
11 #include "internal/error.h"
12 #include "internal/hash.h"
13 #include "internal/rational.h"
14 #include "internal/struct.h"
15 #include "internal/thread.h"
18 #include "transient_heap.h"
23 VALUE rb_eRactorUnsafeError
;
24 VALUE rb_eRactorIsolationError
;
25 static VALUE rb_eRactorError
;
26 static VALUE rb_eRactorRemoteError
;
27 static VALUE rb_eRactorMovedError
;
28 static VALUE rb_eRactorClosedError
;
29 static VALUE rb_cRactorMovedObject
;
31 static void vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*r
, const char *file
, int line
);
34 ASSERT_ractor_unlocking(rb_ractor_t
*r
)
36 #if RACTOR_CHECK_MODE > 0
37 // GET_EC is NULL in an MJIT worker
38 if (rb_current_execution_context(false) != NULL
&& r
->sync
.locked_by
== rb_ractor_self(GET_RACTOR())) {
39 rb_bug("recursive ractor locking");
45 ASSERT_ractor_locking(rb_ractor_t
*r
)
47 #if RACTOR_CHECK_MODE > 0
48 // GET_EC is NULL in an MJIT worker
49 if (rb_current_execution_context(false) != NULL
&& r
->sync
.locked_by
!= rb_ractor_self(GET_RACTOR())) {
50 rp(r
->sync
.locked_by
);
51 rb_bug("ractor lock is not acquired.");
57 ractor_lock(rb_ractor_t
*r
, const char *file
, int line
)
59 RUBY_DEBUG_LOG2(file
, line
, "locking r:%u%s", r
->pub
.id
, GET_RACTOR() == r
? " (self)" : "");
61 ASSERT_ractor_unlocking(r
);
62 rb_native_mutex_lock(&r
->sync
.lock
);
64 #if RACTOR_CHECK_MODE > 0
65 if (rb_current_execution_context(false) != NULL
) { // GET_EC is NULL in an MJIT worker
66 r
->sync
.locked_by
= rb_ractor_self(GET_RACTOR());
70 RUBY_DEBUG_LOG2(file
, line
, "locked r:%u%s", r
->pub
.id
, GET_RACTOR() == r
? " (self)" : "");
74 ractor_lock_self(rb_ractor_t
*cr
, const char *file
, int line
)
76 VM_ASSERT(cr
== GET_RACTOR());
77 VM_ASSERT(cr
->sync
.locked_by
!= cr
->pub
.self
);
78 ractor_lock(cr
, file
, line
);
82 ractor_unlock(rb_ractor_t
*r
, const char *file
, int line
)
84 ASSERT_ractor_locking(r
);
85 #if RACTOR_CHECK_MODE > 0
86 r
->sync
.locked_by
= Qnil
;
88 rb_native_mutex_unlock(&r
->sync
.lock
);
90 RUBY_DEBUG_LOG2(file
, line
, "r:%u%s", r
->pub
.id
, GET_RACTOR() == r
? " (self)" : "");
94 ractor_unlock_self(rb_ractor_t
*cr
, const char *file
, int line
)
96 VM_ASSERT(cr
== GET_RACTOR());
97 VM_ASSERT(cr
->sync
.locked_by
== cr
->pub
.self
);
98 ractor_unlock(cr
, file
, line
);
101 #define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
102 #define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
103 #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
104 #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
107 ractor_cond_wait(rb_ractor_t
*r
)
109 #if RACTOR_CHECK_MODE > 0
110 VALUE locked_by
= r
->sync
.locked_by
;
111 r
->sync
.locked_by
= Qnil
;
113 rb_native_cond_wait(&r
->sync
.cond
, &r
->sync
.lock
);
115 #if RACTOR_CHECK_MODE > 0
116 r
->sync
.locked_by
= locked_by
;
121 ractor_status_str(enum ractor_status status
)
124 case ractor_created
: return "created";
125 case ractor_running
: return "running";
126 case ractor_blocking
: return "blocking";
127 case ractor_terminated
: return "terminated";
129 rb_bug("unreachable");
133 ractor_status_set(rb_ractor_t
*r
, enum ractor_status status
)
135 RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r
->pub
.id
, ractor_status_str(r
->status_
), ractor_status_str(status
));
138 if (r
->status_
!= ractor_created
) {
139 VM_ASSERT(r
== GET_RACTOR()); // only self-modification is allowed.
143 // check2: transition check. assume it will be vanished on non-debug build.
144 switch (r
->status_
) {
146 VM_ASSERT(status
== ractor_blocking
);
149 VM_ASSERT(status
== ractor_blocking
||
150 status
== ractor_terminated
);
152 case ractor_blocking
:
153 VM_ASSERT(status
== ractor_running
);
155 case ractor_terminated
:
156 VM_ASSERT(0); // unreachable
164 ractor_status_p(rb_ractor_t
*r
, enum ractor_status status
)
166 return rb_ractor_status_p(r
, status
);
169 static struct rb_ractor_basket
*ractor_queue_at(struct rb_ractor_queue
*rq
, int i
);
172 ractor_queue_mark(struct rb_ractor_queue
*rq
)
174 for (int i
=0; i
<rq
->cnt
; i
++) {
175 struct rb_ractor_basket
*b
= ractor_queue_at(rq
, i
);
177 rb_gc_mark(b
->sender
);
181 static void ractor_local_storage_mark(rb_ractor_t
*r
);
182 static void ractor_local_storage_free(rb_ractor_t
*r
);
185 ractor_mark(void *ptr
)
187 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
189 ractor_queue_mark(&r
->sync
.incoming_queue
);
190 rb_gc_mark(r
->sync
.wait
.taken_basket
.v
);
191 rb_gc_mark(r
->sync
.wait
.taken_basket
.sender
);
192 rb_gc_mark(r
->sync
.wait
.yielded_basket
.v
);
193 rb_gc_mark(r
->sync
.wait
.yielded_basket
.sender
);
194 rb_gc_mark(r
->receiving_mutex
);
198 rb_gc_mark(r
->r_stdin
);
199 rb_gc_mark(r
->r_stdout
);
200 rb_gc_mark(r
->r_stderr
);
201 rb_hook_list_mark(&r
->pub
.hooks
);
203 if (r
->threads
.cnt
> 0) {
205 list_for_each(&r
->threads
.set
, th
, lt_node
) {
206 VM_ASSERT(th
!= NULL
);
207 rb_gc_mark(th
->self
);
211 ractor_local_storage_mark(r
);
215 ractor_queue_free(struct rb_ractor_queue
*rq
)
221 ractor_waiting_list_free(struct rb_ractor_waiting_list
*wl
)
227 ractor_free(void *ptr
)
229 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
230 rb_native_mutex_destroy(&r
->sync
.lock
);
231 rb_native_cond_destroy(&r
->sync
.cond
);
232 ractor_queue_free(&r
->sync
.incoming_queue
);
233 ractor_waiting_list_free(&r
->sync
.taking_ractors
);
234 ractor_local_storage_free(r
);
235 rb_hook_list_free(&r
->pub
.hooks
);
240 ractor_queue_memsize(const struct rb_ractor_queue
*rq
)
242 return sizeof(struct rb_ractor_basket
) * rq
->size
;
246 ractor_waiting_list_memsize(const struct rb_ractor_waiting_list
*wl
)
248 return sizeof(rb_ractor_t
*) * wl
->size
;
252 ractor_memsize(const void *ptr
)
254 rb_ractor_t
*r
= (rb_ractor_t
*)ptr
;
257 return sizeof(rb_ractor_t
) +
258 ractor_queue_memsize(&r
->sync
.incoming_queue
) +
259 ractor_waiting_list_memsize(&r
->sync
.taking_ractors
);
262 static const rb_data_type_t ractor_data_type
= {
270 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
/* | RUBY_TYPED_WB_PROTECTED */
274 rb_ractor_p(VALUE gv
)
276 if (rb_typeddata_is_kind_of(gv
, &ractor_data_type
)) {
284 static inline rb_ractor_t
*
285 RACTOR_PTR(VALUE self
)
287 VM_ASSERT(rb_ractor_p(self
));
289 rb_ractor_t
*r
= DATA_PTR(self
);
294 static rb_atomic_t ractor_last_id
;
296 #if RACTOR_CHECK_MODE > 0
297 MJIT_FUNC_EXPORTED
uint32_t
298 rb_ractor_current_id(void)
300 if (GET_THREAD()->ractor
== NULL
) {
301 return 1; // main ractor
304 return rb_ractor_id(GET_RACTOR());
310 ractor_queue_setup(struct rb_ractor_queue
*rq
)
315 rq
->baskets
= malloc(sizeof(struct rb_ractor_basket
) * rq
->size
);
318 static struct rb_ractor_basket
*
319 ractor_queue_at(struct rb_ractor_queue
*rq
, int i
)
321 return &rq
->baskets
[(rq
->start
+ i
) % rq
->size
];
325 ractor_queue_advance(struct rb_ractor_queue
*rq
)
327 ASSERT_ractor_locking(GET_RACTOR());
329 if (rq
->reserved_cnt
== 0) {
331 rq
->start
= (rq
->start
+ 1) % rq
->size
;
335 ractor_queue_at(rq
, 0)->type
= basket_type_deleted
;
340 ractor_queue_skip_p(struct rb_ractor_queue
*rq
, int i
)
342 struct rb_ractor_basket
*b
= ractor_queue_at(rq
, i
);
343 return b
->type
== basket_type_deleted
||
344 b
->type
== basket_type_reserved
;
348 ractor_queue_compact(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
350 ASSERT_ractor_locking(r
);
352 while (rq
->cnt
> 0 && ractor_queue_at(rq
, 0)->type
== basket_type_deleted
) {
353 ractor_queue_advance(rq
);
358 ractor_queue_empty_p(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
)
360 ASSERT_ractor_locking(r
);
366 ractor_queue_compact(r
, rq
);
368 for (int i
=0; i
<rq
->cnt
; i
++) {
369 if (!ractor_queue_skip_p(rq
, i
)) {
378 ractor_queue_deq(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, struct rb_ractor_basket
*basket
)
384 if (!ractor_queue_empty_p(r
, rq
)) {
385 for (int i
=0; i
<rq
->cnt
; i
++) {
386 if (!ractor_queue_skip_p(rq
, i
)) {
387 struct rb_ractor_basket
*b
= ractor_queue_at(rq
, i
);
391 b
->type
= basket_type_deleted
;
392 ractor_queue_compact(r
, rq
);
405 ractor_queue_enq(rb_ractor_t
*r
, struct rb_ractor_queue
*rq
, struct rb_ractor_basket
*basket
)
407 ASSERT_ractor_locking(r
);
409 if (rq
->size
<= rq
->cnt
) {
410 rq
->baskets
= realloc(rq
->baskets
, sizeof(struct rb_ractor_basket
) * rq
->size
* 2);
411 for (int i
=rq
->size
- rq
->start
; i
<rq
->cnt
; i
++) {
412 rq
->baskets
[i
+ rq
->start
] = rq
->baskets
[i
+ rq
->start
- rq
->size
];
416 rq
->baskets
[(rq
->start
+ rq
->cnt
++) % rq
->size
] = *basket
;
417 // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
421 ractor_basket_clear(struct rb_ractor_basket
*b
)
423 b
->type
= basket_type_none
;
428 static VALUE
ractor_reset_belonging(VALUE obj
); // in this file
431 ractor_basket_value(struct rb_ractor_basket
*b
)
434 case basket_type_ref
:
436 case basket_type_copy
:
437 case basket_type_move
:
438 case basket_type_will
:
439 b
->type
= basket_type_ref
;
440 b
->v
= ractor_reset_belonging(b
->v
);
443 rb_bug("unreachable");
450 ractor_basket_accept(struct rb_ractor_basket
*b
)
452 VALUE v
= ractor_basket_value(b
);
456 VALUE err
= rb_exc_new_cstr(rb_eRactorRemoteError
, "thrown by remote Ractor.");
457 rb_ivar_set(err
, rb_intern("@ractor"), b
->sender
);
458 ractor_basket_clear(b
);
459 rb_ec_setup_exception(NULL
, err
, cause
);
463 ractor_basket_clear(b
);
468 ractor_recursive_receive_if(rb_ractor_t
*r
)
470 if (r
->receiving_mutex
&& rb_mutex_owned_p(r
->receiving_mutex
)) {
471 rb_raise(rb_eRactorError
, "can not call receive/receive_if recursively");
476 ractor_try_receive(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
478 struct rb_ractor_queue
*rq
= &r
->sync
.incoming_queue
;
479 struct rb_ractor_basket basket
;
481 ractor_recursive_receive_if(r
);
483 if (ractor_queue_deq(r
, rq
, &basket
) == false) {
484 if (r
->sync
.incoming_port_closed
) {
485 rb_raise(rb_eRactorClosedError
, "The incoming port is already closed");
492 return ractor_basket_accept(&basket
);
496 ractor_sleeping_by(const rb_ractor_t
*r
, enum ractor_wait_status wait_status
)
498 return (r
->sync
.wait
.status
& wait_status
) && r
->sync
.wait
.wakeup_status
== wakeup_none
;
502 ractor_wakeup(rb_ractor_t
*r
, enum ractor_wait_status wait_status
, enum ractor_wakeup_status wakeup_status
)
504 ASSERT_ractor_locking(r
);
506 // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", RUBY_FUNCTION_NAME_STRING, (void *)r,
507 // wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
508 // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
510 if (ractor_sleeping_by(r
, wait_status
)) {
511 r
->sync
.wait
.wakeup_status
= wakeup_status
;
512 rb_native_cond_signal(&r
->sync
.cond
);
521 ractor_sleep_wo_gvl(void *ptr
)
523 rb_ractor_t
*cr
= ptr
;
524 RACTOR_LOCK_SELF(cr
);
526 VM_ASSERT(cr
->sync
.wait
.status
!= wait_none
);
527 if (cr
->sync
.wait
.wakeup_status
== wakeup_none
) {
528 ractor_cond_wait(cr
);
530 cr
->sync
.wait
.status
= wait_none
;
532 RACTOR_UNLOCK_SELF(cr
);
537 ractor_sleep_interrupt(void *ptr
)
539 rb_ractor_t
*r
= ptr
;
543 ractor_wakeup(r
, wait_receiving
| wait_taking
| wait_yielding
, wakeup_by_interrupt
);
548 #if USE_RUBY_DEBUG_LOG
550 wait_status_str(enum ractor_wait_status wait_status
)
552 switch ((int)wait_status
) {
553 case wait_none
: return "none";
554 case wait_receiving
: return "receiving";
555 case wait_taking
: return "taking";
556 case wait_yielding
: return "yielding";
557 case wait_receiving
|wait_taking
: return "receiving|taking";
558 case wait_receiving
|wait_yielding
: return "receiving|yielding";
559 case wait_taking
|wait_yielding
: return "taking|yielding";
560 case wait_receiving
|wait_taking
|wait_yielding
: return "receiving|taking|yielding";
562 rb_bug("unreachable");
566 wakeup_status_str(enum ractor_wakeup_status wakeup_status
)
568 switch (wakeup_status
) {
569 case wakeup_none
: return "none";
570 case wakeup_by_send
: return "by_send";
571 case wakeup_by_yield
: return "by_yield";
572 case wakeup_by_take
: return "by_take";
573 case wakeup_by_close
: return "by_close";
574 case wakeup_by_interrupt
: return "by_interrupt";
575 case wakeup_by_retry
: return "by_retry";
577 rb_bug("unreachable");
579 #endif // USE_RUBY_DEBUG_LOG
582 ractor_sleep(rb_execution_context_t
*ec
, rb_ractor_t
*cr
)
584 VM_ASSERT(GET_RACTOR() == cr
);
585 VM_ASSERT(cr
->sync
.wait
.status
!= wait_none
);
586 // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
587 // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
591 rb_nogvl(ractor_sleep_wo_gvl
, cr
,
592 ractor_sleep_interrupt
, cr
,
593 RB_NOGVL_UBF_ASYNC_SAFE
| RB_NOGVL_INTR_FAIL
);
597 // rb_nogvl() can be canceled by interrupts
598 if (cr
->sync
.wait
.status
!= wait_none
) {
599 cr
->sync
.wait
.status
= wait_none
;
600 cr
->sync
.wait
.wakeup_status
= wakeup_by_interrupt
;
603 rb_thread_check_ints();
604 RACTOR_LOCK(cr
); // reachable?
609 ractor_register_taking(rb_ractor_t
*r
, rb_ractor_t
*cr
)
611 VM_ASSERT(cr
== GET_RACTOR());
612 bool retry_try
= false;
616 if (ractor_sleeping_by(r
, wait_yielding
)) {
617 // already waiting for yielding. retry try_take.
621 // insert cr into taking list
622 struct rb_ractor_waiting_list
*wl
= &r
->sync
.taking_ractors
;
624 for (int i
=0; i
<wl
->cnt
; i
++) {
625 if (wl
->ractors
[i
] == cr
) {
626 // TODO: make it clean code.
627 rb_native_mutex_unlock(&r
->sync
.lock
);
628 rb_raise(rb_eRuntimeError
, "Already another thread of same ractor is waiting.");
634 wl
->ractors
= malloc(sizeof(rb_ractor_t
*) * wl
->size
);
635 if (wl
->ractors
== NULL
) rb_bug("can't allocate buffer");
637 else if (wl
->size
<= wl
->cnt
+ 1) {
639 wl
->ractors
= realloc(wl
->ractors
, sizeof(rb_ractor_t
*) * wl
->size
);
640 if (wl
->ractors
== NULL
) rb_bug("can't re-allocate buffer");
642 wl
->ractors
[wl
->cnt
++] = cr
;
650 if (cr
->sync
.wait
.wakeup_status
== wakeup_none
) {
651 VM_ASSERT(cr
->sync
.wait
.status
!= wait_none
);
653 cr
->sync
.wait
.wakeup_status
= wakeup_by_retry
;
654 cr
->sync
.wait
.status
= wait_none
;
662 ractor_waiting_list_del(rb_ractor_t
*r
, struct rb_ractor_waiting_list
*wl
, rb_ractor_t
*wr
)
667 for (int i
=0; i
<wl
->cnt
; i
++) {
668 if (wl
->ractors
[i
] == wr
) {
673 if (pos
>= 0) { // found
675 for (int i
=pos
; i
<wl
->cnt
; i
++) {
676 wl
->ractors
[i
] = wl
->ractors
[i
+1];
684 ractor_waiting_list_shift(rb_ractor_t
*r
, struct rb_ractor_waiting_list
*wl
)
686 ASSERT_ractor_locking(r
);
687 VM_ASSERT(&r
->sync
.taking_ractors
== wl
);
690 rb_ractor_t
*tr
= wl
->ractors
[0];
691 for (int i
=1; i
<wl
->cnt
; i
++) {
692 wl
->ractors
[i
-1] = wl
->ractors
[i
];
703 ractor_receive_wait(rb_execution_context_t
*ec
, rb_ractor_t
*cr
)
705 VM_ASSERT(cr
== rb_ec_ractor_ptr(ec
));
706 ractor_recursive_receive_if(cr
);
710 if (ractor_queue_empty_p(cr
, &cr
->sync
.incoming_queue
)) {
711 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
712 cr
->sync
.wait
.status
= wait_receiving
;
713 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
714 ractor_sleep(ec
, cr
);
715 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
722 ractor_receive(rb_execution_context_t
*ec
, rb_ractor_t
*cr
)
724 VM_ASSERT(cr
== rb_ec_ractor_ptr(ec
));
727 while ((v
= ractor_try_receive(ec
, cr
)) == Qundef
) {
728 ractor_receive_wait(ec
, cr
);
737 basket_type_name(enum rb_ractor_basket_type type
)
740 #define T(t) case basket_type_##t: return #t
748 default: rb_bug("unreachable");
753 rq_dump(struct rb_ractor_queue
*rq
)
756 for (int i
=0; i
<rq
->cnt
; i
++) {
757 struct rb_ractor_basket
*b
= ractor_queue_at(rq
, i
);
758 fprintf(stderr
, "%d (start:%d) type:%s %p %s\n", i
, rq
->start
, basket_type_name(b
->type
),
759 (void *)b
, RSTRING_PTR(RARRAY_AREF(b
->v
, 1)));
760 if (b
->type
== basket_type_reserved
) bug
= true;
762 if (bug
) rb_bug("!!");
766 struct receive_block_data
{
768 struct rb_ractor_queue
*rq
;
775 ractor_receive_if_lock(rb_ractor_t
*cr
)
777 VALUE m
= cr
->receiving_mutex
;
779 m
= cr
->receiving_mutex
= rb_mutex_new();
785 receive_if_body(VALUE ptr
)
787 struct receive_block_data
*data
= (struct receive_block_data
*)ptr
;
789 ractor_receive_if_lock(data
->cr
);
790 VALUE block_result
= rb_yield(data
->v
);
792 RACTOR_LOCK_SELF(data
->cr
);
794 struct rb_ractor_basket
*b
= ractor_queue_at(data
->rq
, data
->index
);
795 VM_ASSERT(b
->type
== basket_type_reserved
);
796 data
->rq
->reserved_cnt
--;
798 if (RTEST(block_result
)) {
799 b
->type
= basket_type_deleted
;
800 ractor_queue_compact(data
->cr
, data
->rq
);
803 b
->type
= basket_type_ref
;
806 RACTOR_UNLOCK_SELF(data
->cr
);
808 data
->success
= true;
810 if (RTEST(block_result
)) {
819 receive_if_ensure(VALUE v
)
821 struct receive_block_data
*data
= (struct receive_block_data
*)v
;
823 if (!data
->success
) {
824 RACTOR_LOCK_SELF(data
->cr
);
826 struct rb_ractor_basket
*b
= ractor_queue_at(data
->rq
, data
->index
);
827 VM_ASSERT(b
->type
== basket_type_reserved
);
828 b
->type
= basket_type_deleted
;
829 data
->rq
->reserved_cnt
--;
831 RACTOR_UNLOCK_SELF(data
->cr
);
834 rb_mutex_unlock(data
->cr
->receiving_mutex
);
839 ractor_receive_if(rb_execution_context_t
*ec
, VALUE crv
, VALUE b
)
841 if (!RTEST(b
)) rb_raise(rb_eArgError
, "no block given");
843 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
844 unsigned int serial
= (unsigned int)-1;
846 struct rb_ractor_queue
*rq
= &cr
->sync
.incoming_queue
;
851 ractor_receive_wait(ec
, cr
);
853 RACTOR_LOCK_SELF(cr
);
855 if (serial
!= rq
->serial
) {
860 // check newer version
861 for (int i
=index
; i
<rq
->cnt
; i
++) {
862 if (!ractor_queue_skip_p(rq
, i
)) {
863 struct rb_ractor_basket
*b
= ractor_queue_at(rq
, i
);
864 v
= ractor_basket_value(b
);
865 b
->type
= basket_type_reserved
;
872 RACTOR_UNLOCK_SELF(cr
);
875 struct receive_block_data data
= {
883 VALUE result
= rb_ensure(receive_if_body
, (VALUE
)&data
,
884 receive_if_ensure
, (VALUE
)&data
);
886 if (result
!= Qundef
) return result
;
893 ractor_send_basket(rb_execution_context_t
*ec
, rb_ractor_t
*r
, struct rb_ractor_basket
*b
)
896 struct rb_ractor_queue
*rq
= &r
->sync
.incoming_queue
;
900 if (r
->sync
.incoming_port_closed
) {
904 ractor_queue_enq(r
, rq
, b
);
905 if (ractor_wakeup(r
, wait_receiving
, wakeup_by_send
)) {
906 RUBY_DEBUG_LOG("wakeup");
913 rb_raise(rb_eRactorClosedError
, "The incoming-port is already closed");
917 static VALUE
ractor_move(VALUE obj
); // in this file
918 static VALUE
ractor_copy(VALUE obj
); // in this file
921 ractor_basket_setup(rb_execution_context_t
*ec
, struct rb_ractor_basket
*basket
, VALUE obj
, VALUE move
, bool exc
, bool is_will
, bool is_yield
)
923 basket
->sender
= rb_ec_ractor_ptr(ec
)->pub
.self
;
924 basket
->exception
= exc
;
927 basket
->type
= basket_type_will
;
930 else if (rb_ractor_shareable_p(obj
)) {
931 basket
->type
= basket_type_ref
;
934 else if (!RTEST(move
)) {
935 basket
->v
= ractor_copy(obj
);
936 basket
->type
= basket_type_copy
;
939 basket
->type
= basket_type_move
;
942 basket
->v
= obj
; // call ractor_move() when yielding timing.
945 basket
->v
= ractor_move(obj
);
951 ractor_send(rb_execution_context_t
*ec
, rb_ractor_t
*r
, VALUE obj
, VALUE move
)
953 struct rb_ractor_basket basket
;
954 ractor_basket_setup(ec
, &basket
, obj
, move
, false, false, false);
955 ractor_send_basket(ec
, r
, &basket
);
960 ractor_try_take(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
962 struct rb_ractor_basket basket
= {
963 .type
= basket_type_none
,
969 if (ractor_sleeping_by(r
, wait_yielding
)) {
970 MAYBE_UNUSED(bool) wakeup_result
;
971 VM_ASSERT(r
->sync
.wait
.yielded_basket
.type
!= basket_type_none
);
973 if (r
->sync
.wait
.yielded_basket
.type
== basket_type_move
) {
974 wakeup_result
= ractor_wakeup(r
, wait_yielding
, wakeup_by_retry
);
977 wakeup_result
= ractor_wakeup(r
, wait_yielding
, wakeup_by_take
);
978 basket
= r
->sync
.wait
.yielded_basket
;
979 ractor_basket_clear(&r
->sync
.wait
.yielded_basket
);
981 VM_ASSERT(wakeup_result
);
983 else if (r
->sync
.outgoing_port_closed
) {
989 if (basket
.type
== basket_type_none
) {
991 rb_raise(rb_eRactorClosedError
, "The outgoing-port is already closed");
998 return ractor_basket_accept(&basket
);
1003 ractor_yield_move_body(VALUE v
)
1005 return ractor_move(v
);
1009 ractor_try_yield(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, struct rb_ractor_basket
*basket
)
1011 ASSERT_ractor_unlocking(cr
);
1012 VM_ASSERT(basket
->type
!= basket_type_none
);
1014 if (cr
->sync
.outgoing_port_closed
) {
1015 rb_raise(rb_eRactorClosedError
, "The outgoing-port is already closed");
1023 r
= ractor_waiting_list_shift(cr
, &cr
->sync
.taking_ractors
);
1028 bool retry_shift
= false;
1032 if (ractor_sleeping_by(r
, wait_taking
)) {
1033 VM_ASSERT(r
->sync
.wait
.taken_basket
.type
== basket_type_none
);
1035 if (basket
->type
== basket_type_move
) {
1036 enum ractor_wait_status prev_wait_status
= r
->sync
.wait
.status
;
1037 r
->sync
.wait
.status
= wait_moving
;
1042 VALUE moved_value
= rb_protect(ractor_yield_move_body
, basket
->v
, &state
);
1044 r
->sync
.wait
.status
= prev_wait_status
;
1048 basket
->v
= moved_value
;
1053 if (!ractor_wakeup(r
, wait_moving
, wakeup_by_yield
)) {
1058 ractor_wakeup(r
, wait_taking
, wakeup_by_yield
);
1060 r
->sync
.wait
.taken_basket
= *basket
;
1069 // get candidate take-waiting ractor, but already woke up by another reason.
1070 // retry to check another ractor.
1082 // select(r1, r2, r3, receive: true, yield: obj)
1084 ractor_select(rb_execution_context_t
*ec
, const VALUE
*rs
, const int rs_len
, VALUE yielded_value
, bool move
, VALUE
*ret_r
)
1086 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1087 VALUE crv
= cr
->pub
.self
;
1090 bool interrupted
= false;
1091 enum ractor_wait_status wait_status
= 0;
1092 bool yield_p
= (yielded_value
!= Qundef
) ? true : false;
1093 const int alen
= rs_len
+ (yield_p
? 1 : 0);
1095 struct ractor_select_action
{
1096 enum ractor_select_action_type
{
1097 ractor_select_action_take
,
1098 ractor_select_action_receive
,
1099 ractor_select_action_yield
,
1102 } *actions
= ALLOCA_N(struct ractor_select_action
, alen
);
1104 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
1105 VM_ASSERT(cr
->sync
.wait
.wakeup_status
== wakeup_none
);
1106 VM_ASSERT(cr
->sync
.wait
.taken_basket
.type
== basket_type_none
);
1107 VM_ASSERT(cr
->sync
.wait
.yielded_basket
.type
== basket_type_none
);
1110 for (i
=0; i
<rs_len
; i
++) {
1114 actions
[i
].type
= ractor_select_action_receive
;
1115 actions
[i
].v
= Qnil
;
1116 wait_status
|= wait_receiving
;
1118 else if (rb_ractor_p(v
)) {
1119 actions
[i
].type
= ractor_select_action_take
;
1121 wait_status
|= wait_taking
;
1124 rb_raise(rb_eArgError
, "should be a ractor object, but %"PRIsVALUE
, v
);
1132 actions
[rs_len
].type
= ractor_select_action_yield
;
1133 actions
[rs_len
].v
= Qundef
;
1134 wait_status
|= wait_yielding
;
1135 ractor_basket_setup(ec
, &cr
->sync
.wait
.yielded_basket
, yielded_value
, move
, false, false, true);
1138 // TODO: shuffle actions
1141 RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status
));
1143 for (i
=0; i
<alen
; i
++) {
1145 switch (actions
[i
].type
) {
1146 case ractor_select_action_take
:
1148 v
= ractor_try_take(ec
, RACTOR_PTR(rv
));
1155 case ractor_select_action_receive
:
1156 v
= ractor_try_receive(ec
, cr
);
1158 *ret_r
= ID2SYM(rb_intern("receive"));
1163 case ractor_select_action_yield
:
1165 if (ractor_try_yield(ec
, cr
, &cr
->sync
.wait
.yielded_basket
)) {
1166 *ret_r
= ID2SYM(rb_intern("yield"));
1175 RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status
));
1179 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
1180 cr
->sync
.wait
.status
= wait_status
;
1181 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
1186 for (i
=0; i
<alen
; i
++) {
1188 switch (actions
[i
].type
) {
1189 case ractor_select_action_take
:
1190 r
= RACTOR_PTR(actions
[i
].v
);
1191 ractor_register_taking(r
, cr
);
1193 case ractor_select_action_yield
:
1194 case ractor_select_action_receive
:
1202 if (cr
->sync
.wait
.wakeup_status
== wakeup_none
) {
1203 for (i
=0; i
<alen
; i
++) {
1206 switch (actions
[i
].type
) {
1207 case ractor_select_action_take
:
1208 r
= RACTOR_PTR(actions
[i
].v
);
1209 if (ractor_sleeping_by(r
, wait_yielding
)) {
1210 RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r
->pub
.id
);
1211 cr
->sync
.wait
.wakeup_status
= wakeup_by_retry
;
1215 case ractor_select_action_receive
:
1216 if (cr
->sync
.incoming_queue
.cnt
> 0) {
1217 RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr
->sync
.incoming_queue
.cnt
);
1218 cr
->sync
.wait
.wakeup_status
= wakeup_by_retry
;
1222 case ractor_select_action_yield
:
1223 if (cr
->sync
.taking_ractors
.cnt
> 0) {
1224 RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr
->sync
.taking_ractors
.cnt
);
1225 cr
->sync
.wait
.wakeup_status
= wakeup_by_retry
;
1228 else if (cr
->sync
.outgoing_port_closed
) {
1229 cr
->sync
.wait
.wakeup_status
= wakeup_by_close
;
1236 RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr
->sync
.wait
.status
));
1237 ractor_sleep(ec
, cr
);
1238 RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr
->sync
.wait
.wakeup_status
));
1242 RUBY_DEBUG_LOG("no need to sleep %s->%s",
1243 wait_status_str(cr
->sync
.wait
.status
),
1244 wakeup_status_str(cr
->sync
.wait
.wakeup_status
));
1245 cr
->sync
.wait
.status
= wait_none
;
1251 for (i
=0; i
<alen
; i
++) {
1253 switch (actions
[i
].type
) {
1254 case ractor_select_action_take
:
1255 r
= RACTOR_PTR(actions
[i
].v
);
1256 ractor_waiting_list_del(r
, &r
->sync
.taking_ractors
, cr
);
1258 case ractor_select_action_receive
:
1259 case ractor_select_action_yield
:
1265 enum ractor_wakeup_status wakeup_status
= cr
->sync
.wait
.wakeup_status
;
1266 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
1268 switch (wakeup_status
) {
1270 // OK. something happens.
1273 case wakeup_by_retry
:
1276 case wakeup_by_send
:
1278 // retry loop and try_receive will succss.
1280 case wakeup_by_yield
:
1281 // take was succeeded!
1282 // cr.wait.taken_basket contains passed block
1283 VM_ASSERT(cr
->sync
.wait
.taken_basket
.type
!= basket_type_none
);
1284 *ret_r
= cr
->sync
.wait
.taken_basket
.sender
;
1285 VM_ASSERT(rb_ractor_p(*ret_r
));
1286 ret
= ractor_basket_accept(&cr
->sync
.wait
.taken_basket
);
1288 case wakeup_by_take
:
1289 *ret_r
= ID2SYM(rb_intern("yield"));
1292 case wakeup_by_close
:
1294 // retry loop and will get CloseError.
1296 case wakeup_by_interrupt
:
1304 RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status
));
1306 if (cr
->sync
.wait
.yielded_basket
.type
!= basket_type_none
) {
1307 ractor_basket_clear(&cr
->sync
.wait
.yielded_basket
);
1310 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
1311 VM_ASSERT(cr
->sync
.wait
.wakeup_status
== wakeup_none
);
1312 VM_ASSERT(cr
->sync
.wait
.taken_basket
.type
== basket_type_none
);
1313 VM_ASSERT(cr
->sync
.wait
.yielded_basket
.type
== basket_type_none
);
1316 rb_vm_check_ints_blocking(ec
);
1317 interrupted
= false;
1321 VM_ASSERT(ret
!= Qundef
);
1326 ractor_yield(rb_execution_context_t
*ec
, rb_ractor_t
*r
, VALUE obj
, VALUE move
)
1329 ractor_select(ec
, NULL
, 0, obj
, RTEST(move
) ? true : false, &ret_r
);
1334 ractor_take(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1337 VALUE v
= ractor_select(ec
, &r
->pub
.self
, 1, Qundef
, false, &ret_r
);
1342 ractor_close_incoming(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1348 if (!r
->sync
.incoming_port_closed
) {
1350 r
->sync
.incoming_port_closed
= true;
1351 if (ractor_wakeup(r
, wait_receiving
, wakeup_by_close
)) {
1352 VM_ASSERT(r
->sync
.incoming_queue
.cnt
== 0);
1353 RUBY_DEBUG_LOG("cancel receiving");
1365 ractor_close_outgoing(rb_execution_context_t
*ec
, rb_ractor_t
*r
)
1371 if (!r
->sync
.outgoing_port_closed
) {
1373 r
->sync
.outgoing_port_closed
= true;
1379 // wakeup all taking ractors
1380 rb_ractor_t
*taking_ractor
;
1381 while ((taking_ractor
= ractor_waiting_list_shift(r
, &r
->sync
.taking_ractors
)) != NULL
) {
1382 RACTOR_LOCK(taking_ractor
);
1383 ractor_wakeup(taking_ractor
, wait_taking
, wakeup_by_close
);
1384 RACTOR_UNLOCK(taking_ractor
);
1387 // raising yielding Ractor
1388 if (!r
->yield_atexit
&&
1389 ractor_wakeup(r
, wait_yielding
, wakeup_by_close
)) {
1390 RUBY_DEBUG_LOG("cancel yielding");
1397 // creation/termination
1400 ractor_next_id(void)
1404 id
= (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id
, 1) + 1);
1410 vm_insert_ractor0(rb_vm_t
*vm
, rb_ractor_t
*r
, bool single_ractor_mode
)
1412 RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r
->pub
.id
, vm
->ractor
.cnt
);
1413 VM_ASSERT(single_ractor_mode
|| RB_VM_LOCKED_P());
1415 list_add_tail(&vm
->ractor
.set
, &r
->vmlr_node
);
1420 cancel_single_ractor_mode(void)
1422 // enable multi-ractor mode
1423 RUBY_DEBUG_LOG("enable multi-ractor mode");
1425 VALUE was_disabled
= rb_gc_enable();
1428 rb_transient_heap_evacuate();
1434 ruby_single_main_ractor
= NULL
;
1436 if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL
)) {
1437 rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL
,
1438 "Ractor is experimental, and the behavior may change in future versions of Ruby! "
1439 "Also there are many implementation issues.");
1444 vm_insert_ractor(rb_vm_t
*vm
, rb_ractor_t
*r
)
1446 VM_ASSERT(ractor_status_p(r
, ractor_created
));
1448 if (rb_multi_ractor_p()) {
1451 vm_insert_ractor0(vm
, r
, false);
1452 vm_ractor_blocking_cnt_inc(vm
, r
, __FILE__
, __LINE__
);
1457 if (vm
->ractor
.cnt
== 0) {
1459 vm_insert_ractor0(vm
, r
, true);
1460 ractor_status_set(r
, ractor_blocking
);
1461 ractor_status_set(r
, ractor_running
);
1464 cancel_single_ractor_mode();
1465 vm_insert_ractor0(vm
, r
, true);
1466 vm_ractor_blocking_cnt_inc(vm
, r
, __FILE__
, __LINE__
);
1472 vm_remove_ractor(rb_vm_t
*vm
, rb_ractor_t
*cr
)
1474 VM_ASSERT(ractor_status_p(cr
, ractor_running
));
1475 VM_ASSERT(vm
->ractor
.cnt
> 1);
1476 VM_ASSERT(cr
->threads
.cnt
== 1);
1480 RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
1481 vm
->ractor
.cnt
, vm
->ractor
.sync
.terminate_waiting
);
1483 VM_ASSERT(vm
->ractor
.cnt
> 0);
1484 list_del(&cr
->vmlr_node
);
1486 if (vm
->ractor
.cnt
<= 2 && vm
->ractor
.sync
.terminate_waiting
) {
1487 rb_native_cond_signal(&vm
->ractor
.sync
.terminate_cond
);
1491 /* Clear the cached freelist to prevent a memory leak. */
1492 rb_gc_ractor_newobj_cache_clear(&cr
->newobj_cache
);
1494 ractor_status_set(cr
, ractor_terminated
);
1500 ractor_alloc(VALUE klass
)
1503 VALUE rv
= TypedData_Make_Struct(klass
, rb_ractor_t
, &ractor_data_type
, r
);
1504 FL_SET_RAW(rv
, RUBY_FL_SHAREABLE
);
1506 VM_ASSERT(ractor_status_p(r
, ractor_created
));
1511 rb_ractor_main_alloc(void)
1513 rb_ractor_t
*r
= ruby_mimmalloc(sizeof(rb_ractor_t
));
1515 fprintf(stderr
, "[FATAL] failed to allocate memory for main ractor\n");
1518 MEMZERO(r
, rb_ractor_t
, 1);
1519 r
->pub
.id
= ++ractor_last_id
;
1523 ruby_single_main_ractor
= r
;
1528 #if defined(HAVE_WORKING_FORK)
1530 rb_ractor_atfork(rb_vm_t
*vm
, rb_thread_t
*th
)
1532 // initialize as a main ractor
1534 vm
->ractor
.blocking_cnt
= 0;
1535 ruby_single_main_ractor
= th
->ractor
;
1536 th
->ractor
->status_
= ractor_created
;
1538 rb_ractor_living_threads_init(th
->ractor
);
1539 rb_ractor_living_threads_insert(th
->ractor
, th
);
1541 VM_ASSERT(vm
->ractor
.blocking_cnt
== 0);
1542 VM_ASSERT(vm
->ractor
.cnt
== 1);
1546 void rb_gvl_init(rb_global_vm_lock_t
*gvl
);
1549 rb_ractor_living_threads_init(rb_ractor_t
*r
)
1551 list_head_init(&r
->threads
.set
);
1553 r
->threads
.blocking_cnt
= 0;
1557 ractor_init(rb_ractor_t
*r
, VALUE name
, VALUE loc
)
1559 ractor_queue_setup(&r
->sync
.incoming_queue
);
1560 rb_native_mutex_initialize(&r
->sync
.lock
);
1561 rb_native_cond_initialize(&r
->sync
.cond
);
1562 rb_native_cond_initialize(&r
->barrier_wait_cond
);
1564 // thread management
1565 rb_gvl_init(&r
->threads
.gvl
);
1566 rb_ractor_living_threads_init(r
);
1571 StringValueCStr(name
);
1572 enc
= rb_enc_get(name
);
1573 if (!rb_enc_asciicompat(enc
)) {
1574 rb_raise(rb_eArgError
, "ASCII incompatible encoding (%s)",
1577 name
= rb_str_new_frozen(name
);
1584 rb_ractor_main_setup(rb_vm_t
*vm
, rb_ractor_t
*r
, rb_thread_t
*th
)
1586 r
->pub
.self
= TypedData_Wrap_Struct(rb_cRactor
, &ractor_data_type
, r
);
1587 FL_SET_RAW(r
->pub
.self
, RUBY_FL_SHAREABLE
);
1588 ractor_init(r
, Qnil
, Qnil
);
1589 r
->threads
.main
= th
;
1590 rb_ractor_living_threads_insert(r
, th
);
1594 ractor_create(rb_execution_context_t
*ec
, VALUE self
, VALUE loc
, VALUE name
, VALUE args
, VALUE block
)
1596 VALUE rv
= ractor_alloc(self
);
1597 rb_ractor_t
*r
= RACTOR_PTR(rv
);
1598 ractor_init(r
, name
, loc
);
1601 r
->pub
.id
= ractor_next_id();
1602 RUBY_DEBUG_LOG("r:%u", r
->pub
.id
);
1604 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1605 r
->verbose
= cr
->verbose
;
1606 r
->debug
= cr
->debug
;
1608 rb_yjit_before_ractor_spawn();
1609 rb_thread_create_ractor(r
, args
, block
);
1616 ractor_yield_atexit(rb_execution_context_t
*ec
, rb_ractor_t
*cr
, VALUE v
, bool exc
)
1618 if (cr
->sync
.outgoing_port_closed
) {
1622 ASSERT_ractor_unlocking(cr
);
1624 struct rb_ractor_basket basket
;
1625 ractor_basket_setup(ec
, &basket
, v
, Qfalse
, exc
, true, true /* this flag is ignored because move is Qfalse */);
1628 if (ractor_try_yield(ec
, cr
, &basket
)) {
1635 if (cr
->sync
.taking_ractors
.cnt
== 0) {
1636 cr
->sync
.wait
.yielded_basket
= basket
;
1638 VM_ASSERT(cr
->sync
.wait
.status
== wait_none
);
1639 cr
->sync
.wait
.status
= wait_yielding
;
1640 cr
->sync
.wait
.wakeup_status
= wakeup_none
;
1642 VM_ASSERT(cr
->yield_atexit
== false);
1643 cr
->yield_atexit
= true;
1646 retry
= true; // another ractor is waiting for the yield.
1651 if (retry
) goto retry
;
1656 rb_ractor_teardown(rb_execution_context_t
*ec
)
1658 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1659 ractor_close_incoming(ec
, cr
);
1660 ractor_close_outgoing(ec
, cr
);
1662 // sync with rb_ractor_terminate_interrupt_main_thread()
1665 VM_ASSERT(cr
->threads
.main
!= NULL
);
1666 cr
->threads
.main
= NULL
;
1672 rb_ractor_atexit(rb_execution_context_t
*ec
, VALUE result
)
1674 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1675 ractor_yield_atexit(ec
, cr
, result
, false);
1679 rb_ractor_atexit_exception(rb_execution_context_t
*ec
)
1681 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
1682 ractor_yield_atexit(ec
, cr
, ec
->errinfo
, true);
1686 rb_ractor_receive_parameters(rb_execution_context_t
*ec
, rb_ractor_t
*r
, int len
, VALUE
*ptr
)
1688 for (int i
=0; i
<len
; i
++) {
1689 ptr
[i
] = ractor_receive(ec
, r
);
1694 rb_ractor_send_parameters(rb_execution_context_t
*ec
, rb_ractor_t
*r
, VALUE args
)
1696 int len
= RARRAY_LENINT(args
);
1697 for (int i
=0; i
<len
; i
++) {
1698 ractor_send(ec
, r
, RARRAY_AREF(args
, i
), false);
1702 MJIT_FUNC_EXPORTED
bool
1703 rb_ractor_main_p_(void)
1705 VM_ASSERT(rb_multi_ractor_p());
1706 rb_execution_context_t
*ec
= GET_EC();
1707 return rb_ec_ractor_ptr(ec
) == rb_ec_vm_ptr(ec
)->ractor
.main_ractor
;
1711 rb_obj_is_main_ractor(VALUE gv
)
1713 if (!rb_ractor_p(gv
)) return false;
1714 rb_ractor_t
*r
= DATA_PTR(gv
);
1715 return r
== GET_VM()->ractor
.main_ractor
;
1718 rb_global_vm_lock_t
*
1719 rb_ractor_gvl(rb_ractor_t
*r
)
1721 return &r
->threads
.gvl
;
1725 rb_ractor_living_thread_num(const rb_ractor_t
*r
)
1727 return r
->threads
.cnt
;
1731 rb_ractor_thread_list(rb_ractor_t
*r
)
1733 rb_thread_t
*th
= 0;
1739 ts
= ALLOCA_N(VALUE
, r
->threads
.cnt
);
1742 list_for_each(&r
->threads
.set
, th
, lt_node
) {
1743 switch (th
->status
) {
1744 case THREAD_RUNNABLE
:
1745 case THREAD_STOPPED
:
1746 case THREAD_STOPPED_FOREVER
:
1747 ts
[ts_cnt
++] = th
->self
;
1755 VALUE ary
= rb_ary_new();
1756 for (int i
=0; i
<ts_cnt
; i
++) {
1757 rb_ary_push(ary
, ts
[i
]);
1764 rb_ractor_living_threads_insert(rb_ractor_t
*r
, rb_thread_t
*th
)
1766 VM_ASSERT(th
!= NULL
);
1770 RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r
->pub
.id
, r
->threads
.cnt
);
1771 list_add_tail(&r
->threads
.set
, &th
->lt_node
);
1776 // first thread for a ractor
1777 if (r
->threads
.cnt
== 1) {
1778 VM_ASSERT(ractor_status_p(r
, ractor_created
));
1779 vm_insert_ractor(th
->vm
, r
);
1784 vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*r
, const char *file
, int line
)
1786 ractor_status_set(r
, ractor_blocking
);
1788 RUBY_DEBUG_LOG2(file
, line
, "vm->ractor.blocking_cnt:%d++", vm
->ractor
.blocking_cnt
);
1789 vm
->ractor
.blocking_cnt
++;
1790 VM_ASSERT(vm
->ractor
.blocking_cnt
<= vm
->ractor
.cnt
);
1794 rb_vm_ractor_blocking_cnt_inc(rb_vm_t
*vm
, rb_ractor_t
*cr
, const char *file
, int line
)
1796 ASSERT_vm_locking();
1797 VM_ASSERT(GET_RACTOR() == cr
);
1798 vm_ractor_blocking_cnt_inc(vm
, cr
, file
, line
);
1802 rb_vm_ractor_blocking_cnt_dec(rb_vm_t
*vm
, rb_ractor_t
*cr
, const char *file
, int line
)
1804 ASSERT_vm_locking();
1805 VM_ASSERT(GET_RACTOR() == cr
);
1807 RUBY_DEBUG_LOG2(file
, line
, "vm->ractor.blocking_cnt:%d--", vm
->ractor
.blocking_cnt
);
1808 VM_ASSERT(vm
->ractor
.blocking_cnt
> 0);
1809 vm
->ractor
.blocking_cnt
--;
1811 ractor_status_set(cr
, ractor_running
);
1815 ractor_check_blocking(rb_ractor_t
*cr
, unsigned int remained_thread_cnt
, const char *file
, int line
)
1817 VM_ASSERT(cr
== GET_RACTOR());
1819 RUBY_DEBUG_LOG2(file
, line
,
1820 "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
1821 cr
->threads
.cnt
, cr
->threads
.blocking_cnt
,
1822 GET_VM()->ractor
.cnt
, GET_VM()->ractor
.blocking_cnt
);
1824 VM_ASSERT(cr
->threads
.cnt
>= cr
->threads
.blocking_cnt
+ 1);
1826 if (remained_thread_cnt
> 0 &&
1828 cr
->threads
.cnt
== cr
->threads
.blocking_cnt
+ 1) {
1829 // change ractor status: running -> blocking
1830 rb_vm_t
*vm
= GET_VM();
1831 ASSERT_vm_unlocking();
1835 rb_vm_ractor_blocking_cnt_inc(vm
, cr
, file
, line
);
1842 rb_ractor_living_threads_remove(rb_ractor_t
*cr
, rb_thread_t
*th
)
1844 VM_ASSERT(cr
== GET_RACTOR());
1845 RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr
->threads
.cnt
);
1846 ractor_check_blocking(cr
, cr
->threads
.cnt
- 1, __FILE__
, __LINE__
);
1848 if (cr
->threads
.cnt
== 1) {
1849 vm_remove_ractor(th
->vm
, cr
);
1854 list_del(&th
->lt_node
);
1862 rb_ractor_blocking_threads_inc(rb_ractor_t
*cr
, const char *file
, int line
)
1864 RUBY_DEBUG_LOG2(file
, line
, "cr->threads.blocking_cnt:%d++", cr
->threads
.blocking_cnt
);
1866 VM_ASSERT(cr
->threads
.cnt
> 0);
1867 VM_ASSERT(cr
== GET_RACTOR());
1869 ractor_check_blocking(cr
, cr
->threads
.cnt
, __FILE__
, __LINE__
);
1870 cr
->threads
.blocking_cnt
++;
1874 rb_ractor_blocking_threads_dec(rb_ractor_t
*cr
, const char *file
, int line
)
1876 RUBY_DEBUG_LOG2(file
, line
,
1877 "r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
1878 cr
->threads
.blocking_cnt
, cr
->threads
.cnt
);
1880 VM_ASSERT(cr
== GET_RACTOR());
1882 if (cr
->threads
.cnt
== cr
->threads
.blocking_cnt
) {
1883 rb_vm_t
*vm
= GET_VM();
1887 rb_vm_ractor_blocking_cnt_dec(vm
, cr
, __FILE__
, __LINE__
);
1892 cr
->threads
.blocking_cnt
--;
1896 rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t
*r
)
1898 VM_ASSERT(r
!= GET_RACTOR());
1899 ASSERT_ractor_unlocking(r
);
1900 ASSERT_vm_locking();
1904 if (ractor_status_p(r
, ractor_running
)) {
1905 rb_execution_context_t
*ec
= r
->threads
.running_ec
;
1907 RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec
);
1915 rb_ractor_terminate_interrupt_main_thread(rb_ractor_t
*r
)
1917 VM_ASSERT(r
!= GET_RACTOR());
1918 ASSERT_ractor_unlocking(r
);
1919 ASSERT_vm_locking();
1921 rb_thread_t
*main_th
= r
->threads
.main
;
1923 if (main_th
->status
!= THREAD_KILLED
) {
1924 RUBY_VM_SET_TERMINATE_INTERRUPT(main_th
->ec
);
1925 rb_threadptr_interrupt(main_th
);
1928 RUBY_DEBUG_LOG("killed (%p)", (void *)main_th
);
1933 void rb_thread_terminate_all(rb_thread_t
*th
); // thread.c
1936 ractor_terminal_interrupt_all(rb_vm_t
*vm
)
1938 if (vm
->ractor
.cnt
> 1) {
1939 // send terminate notification to all ractors
1941 list_for_each(&vm
->ractor
.set
, r
, vmlr_node
) {
1942 if (r
!= vm
->ractor
.main_ractor
) {
1943 rb_ractor_terminate_interrupt_main_thread(r
);
1950 rb_ractor_terminate_all(void)
1952 rb_vm_t
*vm
= GET_VM();
1953 rb_ractor_t
*cr
= vm
->ractor
.main_ractor
;
1955 VM_ASSERT(cr
== GET_RACTOR()); // only main-ractor's main-thread should kick it.
1957 if (vm
->ractor
.cnt
> 1) {
1959 ractor_terminal_interrupt_all(vm
); // kill all ractors
1962 rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
1966 while (vm
->ractor
.cnt
> 1) {
1967 RUBY_DEBUG_LOG("terminate_waiting:%d", vm
->ractor
.sync
.terminate_waiting
);
1968 vm
->ractor
.sync
.terminate_waiting
= true;
1971 rb_vm_ractor_blocking_cnt_inc(vm
, cr
, __FILE__
, __LINE__
);
1972 rb_vm_cond_timedwait(vm
, &vm
->ractor
.sync
.terminate_cond
, 1000 /* ms */);
1973 rb_vm_ractor_blocking_cnt_dec(vm
, cr
, __FILE__
, __LINE__
);
1975 ractor_terminal_interrupt_all(vm
);
1981 rb_execution_context_t
*
1982 rb_vm_main_ractor_ec(rb_vm_t
*vm
)
1984 return vm
->ractor
.main_ractor
->threads
.running_ec
;
1988 ractor_moved_missing(int argc
, VALUE
*argv
, VALUE self
)
1990 rb_raise(rb_eRactorMovedError
, "can not send any methods to a moved object");
1994 * Document-class: Ractor::ClosedError
1996 * Raised when an attempt is made to send a message to a closed port,
1997 * or to retrieve a message from a closed and empty port.
1998 * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
1999 * and are closed implicitly when a Ractor terminates.
2001 * r = Ractor.new { sleep(500) }
2003 * r.take # Ractor::ClosedError
2005 * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
2006 * the loops without propagating the error:
2010 * msg = receive # raises ClosedError and loop traps it
2011 * puts "Received: #{msg}"
2013 * puts "loop exited"
2016 * 3.times{|i| r << i}
2019 * puts "Continue successfully"
2027 * Continue successfully
2031 * Document-class: Ractor::RemoteError
2033 * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
2034 * Its +cause+ will contain the original exception, and +ractor+ is the original ractor
2037 * r = Ractor.new { raise "Something weird happened" }
2042 * p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
2043 * p e.ractor == r # => true
2044 * p e.cause # => #<RuntimeError: Something weird happened>
2050 * Document-class: Ractor::MovedError
2052 * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
2054 * r = Ractor.new { sleep }
2057 * r.send(ary, move: true)
2059 * # Ractor::MovedError (can not send any methods to a moved object)
2064 * Document-class: Ractor::MovedObject
2066 * A special object which replaces any value that was moved to another ractor in Ractor#send
2067 * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
2069 * r = Ractor.new { receive }
2072 * r.send(ary, move: true)
2073 * p Ractor::MovedObject === ary
2076 * # Ractor::MovedError (can not send any methods to a moved object)
2079 // Main docs are in ractor.rb, but without this clause there are weird artifacts
2080 // in their rendering.
2082 * Document-class: Ractor
2089 rb_cRactor
= rb_define_class("Ractor", rb_cObject
);
2090 rb_undef_alloc_func(rb_cRactor
);
2092 rb_eRactorError
= rb_define_class_under(rb_cRactor
, "Error", rb_eRuntimeError
);
2093 rb_eRactorIsolationError
= rb_define_class_under(rb_cRactor
, "IsolationError", rb_eRactorError
);
2094 rb_eRactorRemoteError
= rb_define_class_under(rb_cRactor
, "RemoteError", rb_eRactorError
);
2095 rb_eRactorMovedError
= rb_define_class_under(rb_cRactor
, "MovedError", rb_eRactorError
);
2096 rb_eRactorClosedError
= rb_define_class_under(rb_cRactor
, "ClosedError", rb_eStopIteration
);
2097 rb_eRactorUnsafeError
= rb_define_class_under(rb_cRactor
, "UnsafeError", rb_eRactorError
);
2099 rb_cRactorMovedObject
= rb_define_class_under(rb_cRactor
, "MovedObject", rb_cBasicObject
);
2100 rb_undef_alloc_func(rb_cRactorMovedObject
);
2101 rb_define_method(rb_cRactorMovedObject
, "method_missing", ractor_moved_missing
, -1);
2103 // override methods defined in BasicObject
2104 rb_define_method(rb_cRactorMovedObject
, "__send__", ractor_moved_missing
, -1);
2105 rb_define_method(rb_cRactorMovedObject
, "!", ractor_moved_missing
, -1);
2106 rb_define_method(rb_cRactorMovedObject
, "==", ractor_moved_missing
, -1);
2107 rb_define_method(rb_cRactorMovedObject
, "!=", ractor_moved_missing
, -1);
2108 rb_define_method(rb_cRactorMovedObject
, "__id__", ractor_moved_missing
, -1);
2109 rb_define_method(rb_cRactorMovedObject
, "equal?", ractor_moved_missing
, -1);
2110 rb_define_method(rb_cRactorMovedObject
, "instance_eval", ractor_moved_missing
, -1);
2111 rb_define_method(rb_cRactorMovedObject
, "instance_exec", ractor_moved_missing
, -1);
2115 rb_ractor_dump(void)
2117 rb_vm_t
*vm
= GET_VM();
2120 list_for_each(&vm
->ractor
.set
, r
, vmlr_node
) {
2121 if (r
!= vm
->ractor
.main_ractor
) {
2122 fprintf(stderr
, "r:%u (%s)\n", r
->pub
.id
, ractor_status_str(r
->status_
));
2128 rb_ractor_stdin(void)
2130 if (rb_ractor_main_p()) {
2134 rb_ractor_t
*cr
= GET_RACTOR();
2140 rb_ractor_stdout(void)
2142 if (rb_ractor_main_p()) {
2146 rb_ractor_t
*cr
= GET_RACTOR();
2147 return cr
->r_stdout
;
2152 rb_ractor_stderr(void)
2154 if (rb_ractor_main_p()) {
2158 rb_ractor_t
*cr
= GET_RACTOR();
2159 return cr
->r_stderr
;
2164 rb_ractor_stdin_set(VALUE in
)
2166 if (rb_ractor_main_p()) {
2170 rb_ractor_t
*cr
= GET_RACTOR();
2171 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stdin
, in
);
2176 rb_ractor_stdout_set(VALUE out
)
2178 if (rb_ractor_main_p()) {
2182 rb_ractor_t
*cr
= GET_RACTOR();
2183 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stdout
, out
);
2188 rb_ractor_stderr_set(VALUE err
)
2190 if (rb_ractor_main_p()) {
2194 rb_ractor_t
*cr
= GET_RACTOR();
2195 RB_OBJ_WRITE(cr
->pub
.self
, &cr
->r_stderr
, err
);
2200 rb_ractor_hooks(rb_ractor_t
*cr
)
2202 return &cr
->pub
.hooks
;
2205 /// traverse function
2211 enum obj_traverse_iterator_result
{
2217 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func
)(VALUE obj
);
2218 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func
)(VALUE obj
);
2219 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func
)(VALUE obj
);
2221 static enum obj_traverse_iterator_result
null_leave(VALUE obj
);
2223 struct obj_traverse_data
{
2224 rb_obj_traverse_enter_func enter_func
;
2225 rb_obj_traverse_leave_func leave_func
;
2232 struct obj_traverse_callback_data
{
2234 struct obj_traverse_data
*data
;
2237 static int obj_traverse_i(VALUE obj
, struct obj_traverse_data
*data
);
2240 obj_hash_traverse_i(VALUE key
, VALUE val
, VALUE ptr
)
2242 struct obj_traverse_callback_data
*d
= (struct obj_traverse_callback_data
*)ptr
;
2244 if (obj_traverse_i(key
, d
->data
)) {
2249 if (obj_traverse_i(val
, d
->data
)) {
2258 obj_traverse_reachable_i(VALUE obj
, void *ptr
)
2260 struct obj_traverse_callback_data
*d
= (struct obj_traverse_callback_data
*)ptr
;
2262 if (obj_traverse_i(obj
, d
->data
)) {
2267 static struct st_table
*
2268 obj_traverse_rec(struct obj_traverse_data
*data
)
2270 if (UNLIKELY(!data
->rec
)) {
2271 data
->rec_hash
= rb_ident_hash_new();
2272 data
->rec
= rb_hash_st_table(data
->rec_hash
);
2278 obj_traverse_i(VALUE obj
, struct obj_traverse_data
*data
)
2280 if (RB_SPECIAL_CONST_P(obj
)) return 0;
2282 switch (data
->enter_func(obj
)) {
2283 case traverse_cont
: break;
2284 case traverse_skip
: return 0; // skip children
2285 case traverse_stop
: return 1; // stop search
2288 if (UNLIKELY(st_insert(obj_traverse_rec(data
), obj
, 1))) {
2289 // already traversed
2293 if (UNLIKELY(FL_TEST_RAW(obj
, FL_EXIVAR
))) {
2294 struct gen_ivtbl
*ivtbl
;
2295 rb_ivar_generic_ivtbl_lookup(obj
, &ivtbl
);
2296 for (uint32_t i
= 0; i
< ivtbl
->numiv
; i
++) {
2297 VALUE val
= ivtbl
->ivptr
[i
];
2298 if (val
!= Qundef
&& obj_traverse_i(val
, data
)) return 1;
2302 switch (BUILTIN_TYPE(obj
)) {
2315 uint32_t len
= ROBJECT_NUMIV(obj
);
2316 VALUE
*ptr
= ROBJECT_IVPTR(obj
);
2318 for (uint32_t i
=0; i
<len
; i
++) {
2320 if (val
!= Qundef
&& obj_traverse_i(val
, data
)) return 1;
2327 for (int i
= 0; i
< RARRAY_LENINT(obj
); i
++) {
2328 VALUE e
= rb_ary_entry(obj
, i
);
2329 if (obj_traverse_i(e
, data
)) return 1;
2336 if (obj_traverse_i(RHASH_IFNONE(obj
), data
)) return 1;
2338 struct obj_traverse_callback_data d
= {
2342 rb_hash_foreach(obj
, obj_hash_traverse_i
, (VALUE
)&d
);
2343 if (d
.stop
) return 1;
2349 long len
= RSTRUCT_LEN(obj
);
2350 const VALUE
*ptr
= RSTRUCT_CONST_PTR(obj
);
2352 for (long i
=0; i
<len
; i
++) {
2353 if (obj_traverse_i(ptr
[i
], data
)) return 1;
2359 if (obj_traverse_i(RRATIONAL(obj
)->num
, data
)) return 1;
2360 if (obj_traverse_i(RRATIONAL(obj
)->den
, data
)) return 1;
2363 if (obj_traverse_i(RCOMPLEX(obj
)->real
, data
)) return 1;
2364 if (obj_traverse_i(RCOMPLEX(obj
)->imag
, data
)) return 1;
2370 struct obj_traverse_callback_data d
= {
2374 RB_VM_LOCK_ENTER_NO_BARRIER();
2376 rb_objspace_reachable_objects_from(obj
, obj_traverse_reachable_i
, &d
);
2378 RB_VM_LOCK_LEAVE_NO_BARRIER();
2379 if (d
.stop
) return 1;
2389 rb_bug("unreachable");
2392 if (data
->leave_func(obj
) == traverse_stop
) {
2400 struct rb_obj_traverse_final_data
{
2401 rb_obj_traverse_final_func final_func
;
2406 obj_traverse_final_i(st_data_t key
, st_data_t val
, st_data_t arg
)
2408 struct rb_obj_traverse_final_data
*data
= (void *)arg
;
2409 if (data
->final_func(key
)) {
2419 rb_obj_traverse(VALUE obj
,
2420 rb_obj_traverse_enter_func enter_func
,
2421 rb_obj_traverse_leave_func leave_func
,
2422 rb_obj_traverse_final_func final_func
)
2424 struct obj_traverse_data data
= {
2425 .enter_func
= enter_func
,
2426 .leave_func
= leave_func
,
2430 if (obj_traverse_i(obj
, &data
)) return 1;
2431 if (final_func
&& data
.rec
) {
2432 struct rb_obj_traverse_final_data f
= {final_func
, 0};
2433 st_foreach(data
.rec
, obj_traverse_final_i
, (st_data_t
)&f
);
2440 frozen_shareable_p(VALUE obj
, bool *made_shareable
)
2442 if (!RB_TYPE_P(obj
, T_DATA
)) {
2445 else if (RTYPEDDATA_P(obj
)) {
2446 const rb_data_type_t
*type
= RTYPEDDATA_TYPE(obj
);
2447 if (type
->flags
& RUBY_TYPED_FROZEN_SHAREABLE
) {
2450 else if (made_shareable
&& rb_obj_is_proc(obj
)) {
2451 // special path to make shareable Proc.
2452 rb_proc_ractor_make_shareable(obj
);
2453 *made_shareable
= true;
2454 VM_ASSERT(RB_OBJ_SHAREABLE_P(obj
));
2462 static enum obj_traverse_iterator_result
2463 make_shareable_check_shareable(VALUE obj
)
2465 VM_ASSERT(!SPECIAL_CONST_P(obj
));
2466 bool made_shareable
= false;
2468 if (rb_ractor_shareable_p(obj
)) {
2469 return traverse_skip
;
2471 else if (!frozen_shareable_p(obj
, &made_shareable
)) {
2472 if (made_shareable
) {
2473 return traverse_skip
;
2476 rb_raise(rb_eRactorError
, "can not make shareable object for %"PRIsVALUE
, obj
);
2480 if (!RB_OBJ_FROZEN_RAW(obj
)) {
2481 rb_funcall(obj
, idFreeze
, 0);
2483 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj
))) {
2484 rb_raise(rb_eRactorError
, "#freeze does not freeze object correctly");
2487 if (RB_OBJ_SHAREABLE_P(obj
)) {
2488 return traverse_skip
;
2492 return traverse_cont
;
2495 static enum obj_traverse_iterator_result
2496 mark_shareable(VALUE obj
)
2498 FL_SET_RAW(obj
, RUBY_FL_SHAREABLE
);
2499 return traverse_cont
;
2503 rb_ractor_make_shareable(VALUE obj
)
2505 rb_obj_traverse(obj
,
2506 make_shareable_check_shareable
,
2507 null_leave
, mark_shareable
);
2512 rb_ractor_make_shareable_copy(VALUE obj
)
2514 VALUE copy
= ractor_copy(obj
);
2515 rb_obj_traverse(copy
,
2516 make_shareable_check_shareable
,
2517 null_leave
, mark_shareable
);
2522 rb_ractor_ensure_shareable(VALUE obj
, VALUE name
)
2524 if (!rb_ractor_shareable_p(obj
)) {
2525 VALUE message
= rb_sprintf("cannot assign unshareable object to %"PRIsVALUE
,
2527 rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError
, message
));
2532 static enum obj_traverse_iterator_result
2533 shareable_p_enter(VALUE obj
)
2535 if (RB_OBJ_SHAREABLE_P(obj
)) {
2536 return traverse_skip
;
2538 else if (RB_TYPE_P(obj
, T_CLASS
) ||
2539 RB_TYPE_P(obj
, T_MODULE
) ||
2540 RB_TYPE_P(obj
, T_ICLASS
)) {
2542 mark_shareable(obj
);
2543 return traverse_skip
;
2545 else if (RB_OBJ_FROZEN_RAW(obj
) &&
2546 frozen_shareable_p(obj
, NULL
)) {
2547 return traverse_cont
;
2550 return traverse_stop
; // fail
2553 MJIT_FUNC_EXPORTED
bool
2554 rb_ractor_shareable_p_continue(VALUE obj
)
2556 if (rb_obj_traverse(obj
,
2557 shareable_p_enter
, null_leave
,
2566 #if RACTOR_CHECK_MODE > 0
2567 static enum obj_traverse_iterator_result
2568 reset_belonging_enter(VALUE obj
)
2570 if (rb_ractor_shareable_p(obj
)) {
2571 return traverse_skip
;
2574 rb_ractor_setup_belonging(obj
);
2575 return traverse_cont
;
2580 static enum obj_traverse_iterator_result
2581 null_leave(VALUE obj
)
2583 return traverse_cont
;
2587 ractor_reset_belonging(VALUE obj
)
2589 #if RACTOR_CHECK_MODE > 0
2590 rb_obj_traverse(obj
, reset_belonging_enter
, null_leave
, NULL
);
2596 /// traverse and replace function
2602 struct obj_traverse_replace_data
;
2603 static int obj_traverse_replace_i(VALUE obj
, struct obj_traverse_replace_data
*data
);
2604 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func
)(VALUE obj
, struct obj_traverse_replace_data
*data
);
2605 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func
)(VALUE obj
, struct obj_traverse_replace_data
*data
);
2607 struct obj_traverse_replace_data
{
2608 rb_obj_traverse_replace_enter_func enter_func
;
2609 rb_obj_traverse_replace_leave_func leave_func
;
2618 struct obj_traverse_replace_callback_data
{
2621 struct obj_traverse_replace_data
*data
;
2625 obj_hash_traverse_replace_foreach_i(st_data_t key
, st_data_t value
, st_data_t argp
, int error
)
2631 obj_hash_traverse_replace_i(st_data_t
*key
, st_data_t
*val
, st_data_t ptr
, int exists
)
2633 struct obj_traverse_replace_callback_data
*d
= (struct obj_traverse_replace_callback_data
*)ptr
;
2634 struct obj_traverse_replace_data
*data
= d
->data
;
2636 if (obj_traverse_replace_i(*key
, data
)) {
2640 else if (*key
!= data
->replacement
) {
2641 VALUE v
= *key
= data
->replacement
;
2642 RB_OBJ_WRITTEN(d
->src
, Qundef
, v
);
2645 if (obj_traverse_replace_i(*val
, data
)) {
2649 else if (*val
!= data
->replacement
) {
2650 VALUE v
= *val
= data
->replacement
;
2651 RB_OBJ_WRITTEN(d
->src
, Qundef
, v
);
2657 static struct st_table
*
2658 obj_traverse_replace_rec(struct obj_traverse_replace_data
*data
)
2660 if (UNLIKELY(!data
->rec
)) {
2661 data
->rec_hash
= rb_ident_hash_new();
2662 data
->rec
= rb_hash_st_table(data
->rec_hash
);
2667 #if USE_TRANSIENT_HEAP
2668 void rb_ary_transient_heap_evacuate(VALUE ary
, int promote
);
2669 void rb_obj_transient_heap_evacuate(VALUE obj
, int promote
);
2670 void rb_hash_transient_heap_evacuate(VALUE hash
, int promote
);
2671 void rb_struct_transient_heap_evacuate(VALUE st
, int promote
);
2675 obj_refer_only_shareables_p_i(VALUE obj
, void *ptr
)
2677 int *pcnt
= (int *)ptr
;
2679 if (!rb_ractor_shareable_p(obj
)) {
2685 obj_refer_only_shareables_p(VALUE obj
)
2688 RB_VM_LOCK_ENTER_NO_BARRIER();
2690 rb_objspace_reachable_objects_from(obj
, obj_refer_only_shareables_p_i
, &cnt
);
2692 RB_VM_LOCK_LEAVE_NO_BARRIER();
2697 obj_traverse_replace_i(VALUE obj
, struct obj_traverse_replace_data
*data
)
2701 if (RB_SPECIAL_CONST_P(obj
)) {
2702 data
->replacement
= obj
;
2706 switch (data
->enter_func(obj
, data
)) {
2707 case traverse_cont
: break;
2708 case traverse_skip
: return 0; // skip children
2709 case traverse_stop
: return 1; // stop search
2712 replacement
= data
->replacement
;
2714 if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data
), (st_data_t
)obj
, (st_data_t
*)&replacement
))) {
2715 data
->replacement
= replacement
;
2719 st_insert(obj_traverse_replace_rec(data
), (st_data_t
)obj
, (st_data_t
)replacement
);
2726 #define CHECK_AND_REPLACE(v) do { \
2728 if (obj_traverse_replace_i(_val, data)) { return 1; } \
2729 else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
2732 if (UNLIKELY(FL_TEST_RAW(obj
, FL_EXIVAR
))) {
2733 struct gen_ivtbl
*ivtbl
;
2734 rb_ivar_generic_ivtbl_lookup(obj
, &ivtbl
);
2735 for (uint32_t i
= 0; i
< ivtbl
->numiv
; i
++) {
2736 if (ivtbl
->ivptr
[i
] != Qundef
) {
2737 CHECK_AND_REPLACE(ivtbl
->ivptr
[i
]);
2742 switch (BUILTIN_TYPE(obj
)) {
2752 rb_str_make_independent(obj
);
2757 #if USE_TRANSIENT_HEAP
2758 if (data
->move
) rb_obj_transient_heap_evacuate(obj
, TRUE
);
2761 uint32_t len
= ROBJECT_NUMIV(obj
);
2762 VALUE
*ptr
= ROBJECT_IVPTR(obj
);
2764 for (uint32_t i
=0; i
<len
; i
++) {
2765 if (ptr
[i
] != Qundef
) {
2766 CHECK_AND_REPLACE(ptr
[i
]);
2774 rb_ary_cancel_sharing(obj
);
2775 #if USE_TRANSIENT_HEAP
2776 if (data
->move
) rb_ary_transient_heap_evacuate(obj
, TRUE
);
2779 for (int i
= 0; i
< RARRAY_LENINT(obj
); i
++) {
2780 VALUE e
= rb_ary_entry(obj
, i
);
2782 if (obj_traverse_replace_i(e
, data
)) {
2785 else if (e
!= data
->replacement
) {
2786 RARRAY_ASET(obj
, i
, data
->replacement
);
2795 #if USE_TRANSIENT_HEAP
2796 if (data
->move
) rb_hash_transient_heap_evacuate(obj
, TRUE
);
2798 struct obj_traverse_replace_callback_data d
= {
2803 rb_hash_stlike_foreach_with_replace(obj
,
2804 obj_hash_traverse_replace_foreach_i
,
2805 obj_hash_traverse_replace_i
,
2807 if (d
.stop
) return 1;
2808 // TODO: rehash here?
2810 VALUE ifnone
= RHASH_IFNONE(obj
);
2811 if (obj_traverse_replace_i(ifnone
, data
)) {
2814 else if (ifnone
!= data
->replacement
) {
2815 RHASH_SET_IFNONE(obj
, data
->replacement
);
2822 #if USE_TRANSIENT_HEAP
2823 if (data
->move
) rb_struct_transient_heap_evacuate(obj
, TRUE
);
2825 long len
= RSTRUCT_LEN(obj
);
2826 const VALUE
*ptr
= RSTRUCT_CONST_PTR(obj
);
2828 for (long i
=0; i
<len
; i
++) {
2829 CHECK_AND_REPLACE(ptr
[i
]);
2835 CHECK_AND_REPLACE(RRATIONAL(obj
)->num
);
2836 CHECK_AND_REPLACE(RRATIONAL(obj
)->den
);
2839 CHECK_AND_REPLACE(RCOMPLEX(obj
)->real
);
2840 CHECK_AND_REPLACE(RCOMPLEX(obj
)->imag
);
2844 if (!data
->move
&& obj_refer_only_shareables_p(obj
)) {
2848 rb_raise(rb_eRactorError
, "can not %s %"PRIsVALUE
" object.",
2849 data
->move
? "move" : "copy", rb_class_of(obj
));
2853 // not supported yet
2862 rb_bug("unreachable");
2865 data
->replacement
= replacement
;
2867 if (data
->leave_func(obj
, data
) == traverse_stop
) {
2878 rb_obj_traverse_replace(VALUE obj
,
2879 rb_obj_traverse_replace_enter_func enter_func
,
2880 rb_obj_traverse_replace_leave_func leave_func
,
2883 struct obj_traverse_replace_data data
= {
2884 .enter_func
= enter_func
,
2885 .leave_func
= leave_func
,
2887 .replacement
= Qundef
,
2891 if (obj_traverse_replace_i(obj
, &data
)) {
2895 return data
.replacement
;
2907 static const VALUE fl_users
= FL_USER1
| FL_USER2
| FL_USER3
|
2908 FL_USER4
| FL_USER5
| FL_USER6
| FL_USER7
|
2909 FL_USER8
| FL_USER9
| FL_USER10
| FL_USER11
|
2910 FL_USER12
| FL_USER13
| FL_USER14
| FL_USER15
|
2911 FL_USER16
| FL_USER17
| FL_USER18
| FL_USER19
;
2914 ractor_moved_bang(VALUE obj
)
2916 // invalidate src object
2917 struct RVALUE
*rv
= (void *)obj
;
2919 rv
->klass
= rb_cRactorMovedObject
;
2923 rv
->flags
= rv
->flags
& ~fl_users
;
2925 // TODO: record moved location
2928 static enum obj_traverse_iterator_result
2929 move_enter(VALUE obj
, struct obj_traverse_replace_data
*data
)
2931 if (rb_ractor_shareable_p(obj
)) {
2932 data
->replacement
= obj
;
2933 return traverse_skip
;
2936 data
->replacement
= rb_obj_alloc(RBASIC_CLASS(obj
));
2937 return traverse_cont
;
2941 void rb_replace_generic_ivar(VALUE clone
, VALUE obj
); // variable.c
2943 static enum obj_traverse_iterator_result
2944 move_leave(VALUE obj
, struct obj_traverse_replace_data
*data
)
2946 VALUE v
= data
->replacement
;
2947 struct RVALUE
*dst
= (struct RVALUE
*)v
;
2948 struct RVALUE
*src
= (struct RVALUE
*)obj
;
2950 dst
->flags
= (dst
->flags
& ~fl_users
) | (src
->flags
& fl_users
);
2956 if (UNLIKELY(FL_TEST_RAW(obj
, FL_EXIVAR
))) {
2957 rb_replace_generic_ivar(v
, obj
);
2960 // TODO: generic_ivar
2962 ractor_moved_bang(obj
);
2963 return traverse_cont
;
2967 ractor_move(VALUE obj
)
2969 VALUE val
= rb_obj_traverse_replace(obj
, move_enter
, move_leave
, true);
2970 if (val
!= Qundef
) {
2974 rb_raise(rb_eRactorError
, "can not move the object");
2978 static enum obj_traverse_iterator_result
2979 copy_enter(VALUE obj
, struct obj_traverse_replace_data
*data
)
2981 if (rb_ractor_shareable_p(obj
)) {
2982 data
->replacement
= obj
;
2983 return traverse_skip
;
2986 data
->replacement
= rb_obj_clone(obj
);
2987 return traverse_cont
;
2991 static enum obj_traverse_iterator_result
2992 copy_leave(VALUE obj
, struct obj_traverse_replace_data
*data
)
2994 return traverse_cont
;
2998 ractor_copy(VALUE obj
)
3000 VALUE val
= rb_obj_traverse_replace(obj
, copy_enter
, copy_leave
, false);
3001 if (val
!= Qundef
) {
3005 rb_raise(rb_eRactorError
, "can not copy the object");
3009 // Ractor local storage
3011 struct rb_ractor_local_key_struct
{
3012 const struct rb_ractor_local_storage_type
*type
;
3016 static struct freed_ractor_local_keys_struct
{
3019 rb_ractor_local_key_t
*keys
;
3020 } freed_ractor_local_keys
;
3023 ractor_local_storage_mark_i(st_data_t key
, st_data_t val
, st_data_t dmy
)
3025 struct rb_ractor_local_key_struct
*k
= (struct rb_ractor_local_key_struct
*)key
;
3026 if (k
->type
->mark
) (*k
->type
->mark
)((void *)val
);
3030 static enum rb_id_table_iterator_result
3031 idkey_local_storage_mark_i(ID id
, VALUE val
, void *dmy
)
3034 return ID_TABLE_CONTINUE
;
3038 ractor_local_storage_mark(rb_ractor_t
*r
)
3040 if (r
->local_storage
) {
3041 st_foreach(r
->local_storage
, ractor_local_storage_mark_i
, 0);
3043 for (int i
=0; i
<freed_ractor_local_keys
.cnt
; i
++) {
3044 rb_ractor_local_key_t key
= freed_ractor_local_keys
.keys
[i
];
3046 if (st_delete(r
->local_storage
, (st_data_t
*)&key
, &val
) &&
3048 (*key
->type
->free
)((void *)val
);
3053 if (r
->idkey_local_storage
) {
3054 rb_id_table_foreach(r
->idkey_local_storage
, idkey_local_storage_mark_i
, NULL
);
3059 ractor_local_storage_free_i(st_data_t key
, st_data_t val
, st_data_t dmy
)
3061 struct rb_ractor_local_key_struct
*k
= (struct rb_ractor_local_key_struct
*)key
;
3062 if (k
->type
->free
) (*k
->type
->free
)((void *)val
);
3067 ractor_local_storage_free(rb_ractor_t
*r
)
3069 if (r
->local_storage
) {
3070 st_foreach(r
->local_storage
, ractor_local_storage_free_i
, 0);
3071 st_free_table(r
->local_storage
);
3074 if (r
->idkey_local_storage
) {
3075 rb_id_table_free(r
->idkey_local_storage
);
3080 rb_ractor_local_storage_value_mark(void *ptr
)
3082 rb_gc_mark((VALUE
)ptr
);
3085 static const struct rb_ractor_local_storage_type ractor_local_storage_type_null
= {
3090 const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free
= {
3095 static const struct rb_ractor_local_storage_type ractor_local_storage_type_value
= {
3096 rb_ractor_local_storage_value_mark
,
3100 rb_ractor_local_key_t
3101 rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type
*type
)
3103 rb_ractor_local_key_t key
= ALLOC(struct rb_ractor_local_key_struct
);
3104 key
->type
= type
? type
: &ractor_local_storage_type_null
;
3105 key
->main_cache
= (void *)Qundef
;
3109 rb_ractor_local_key_t
3110 rb_ractor_local_storage_value_newkey(void)
3112 return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value
);
3116 rb_ractor_local_storage_delkey(rb_ractor_local_key_t key
)
3120 if (freed_ractor_local_keys
.cnt
== freed_ractor_local_keys
.capa
) {
3121 freed_ractor_local_keys
.capa
= freed_ractor_local_keys
.capa
? freed_ractor_local_keys
.capa
* 2 : 4;
3122 REALLOC_N(freed_ractor_local_keys
.keys
, rb_ractor_local_key_t
, freed_ractor_local_keys
.capa
);
3124 freed_ractor_local_keys
.keys
[freed_ractor_local_keys
.cnt
++] = key
;
3130 ractor_local_ref(rb_ractor_local_key_t key
, void **pret
)
3132 if (rb_ractor_main_p()) {
3133 if ((VALUE
)key
->main_cache
!= Qundef
) {
3134 *pret
= key
->main_cache
;
3142 rb_ractor_t
*cr
= GET_RACTOR();
3144 if (cr
->local_storage
&& st_lookup(cr
->local_storage
, (st_data_t
)key
, (st_data_t
*)pret
)) {
3154 ractor_local_set(rb_ractor_local_key_t key
, void *ptr
)
3156 rb_ractor_t
*cr
= GET_RACTOR();
3158 if (cr
->local_storage
== NULL
) {
3159 cr
->local_storage
= st_init_numtable();
3162 st_insert(cr
->local_storage
, (st_data_t
)key
, (st_data_t
)ptr
);
3164 if (rb_ractor_main_p()) {
3165 key
->main_cache
= ptr
;
3170 rb_ractor_local_storage_value(rb_ractor_local_key_t key
)
3173 if (ractor_local_ref(key
, (void **)&val
)) {
3182 rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key
, VALUE
*val
)
3184 if (ractor_local_ref(key
, (void **)val
)) {
3193 rb_ractor_local_storage_value_set(rb_ractor_local_key_t key
, VALUE val
)
3195 ractor_local_set(key
, (void *)val
);
3199 rb_ractor_local_storage_ptr(rb_ractor_local_key_t key
)
3202 if (ractor_local_ref(key
, &ret
)) {
3211 rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key
, void *ptr
)
3213 ractor_local_set(key
, ptr
);
3216 #define DEFAULT_KEYS_CAPA 0x10
3219 rb_ractor_finish_marking(void)
3221 for (int i
=0; i
<freed_ractor_local_keys
.cnt
; i
++) {
3222 ruby_xfree(freed_ractor_local_keys
.keys
[i
]);
3224 freed_ractor_local_keys
.cnt
= 0;
3225 if (freed_ractor_local_keys
.capa
> DEFAULT_KEYS_CAPA
) {
3226 freed_ractor_local_keys
.capa
= DEFAULT_KEYS_CAPA
;
3227 REALLOC_N(freed_ractor_local_keys
.keys
, rb_ractor_local_key_t
, DEFAULT_KEYS_CAPA
);
3232 ractor_local_value(rb_execution_context_t
*ec
, VALUE self
, VALUE sym
)
3234 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
3235 ID id
= rb_check_id(&sym
);
3236 struct rb_id_table
*tbl
= cr
->idkey_local_storage
;
3239 if (id
&& tbl
&& rb_id_table_lookup(tbl
, id
, &val
)) {
3248 ractor_local_value_set(rb_execution_context_t
*ec
, VALUE self
, VALUE sym
, VALUE val
)
3250 rb_ractor_t
*cr
= rb_ec_ractor_ptr(ec
);
3251 ID id
= SYM2ID(rb_to_symbol(sym
));
3252 struct rb_id_table
*tbl
= cr
->idkey_local_storage
;
3255 tbl
= cr
->idkey_local_storage
= rb_id_table_create(2);
3257 rb_id_table_insert(tbl
, id
, val
);
3261 #include "ractor.rbinc"