Use xcalloc for allocating shape tree
[ruby.git] / ractor.c
blobf6b2b22c9ae953de3ba03ed03bda3abc5985e3a6
1 // Ractor implementation
3 #include "ruby/ruby.h"
4 #include "ruby/thread.h"
5 #include "ruby/ractor.h"
6 #include "ruby/thread_native.h"
7 #include "vm_core.h"
8 #include "eval_intern.h"
9 #include "vm_sync.h"
10 #include "ractor_core.h"
11 #include "internal/complex.h"
12 #include "internal/error.h"
13 #include "internal/gc.h"
14 #include "internal/hash.h"
15 #include "internal/rational.h"
16 #include "internal/struct.h"
17 #include "internal/thread.h"
18 #include "variable.h"
19 #include "yjit.h"
20 #include "rjit.h"
22 VALUE rb_cRactor;
23 static VALUE rb_cRactorSelector;
25 VALUE rb_eRactorUnsafeError;
26 VALUE rb_eRactorIsolationError;
27 static VALUE rb_eRactorError;
28 static VALUE rb_eRactorRemoteError;
29 static VALUE rb_eRactorMovedError;
30 static VALUE rb_eRactorClosedError;
31 static VALUE rb_cRactorMovedObject;
33 static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
35 // Ractor locking
37 static void
38 ASSERT_ractor_unlocking(rb_ractor_t *r)
40 #if RACTOR_CHECK_MODE > 0
41 // GET_EC is NULL in an RJIT worker
42 if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
43 rb_bug("recursive ractor locking");
45 #endif
48 static void
49 ASSERT_ractor_locking(rb_ractor_t *r)
51 #if RACTOR_CHECK_MODE > 0
52 // GET_EC is NULL in an RJIT worker
53 if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
54 rp(r->sync.locked_by);
55 rb_bug("ractor lock is not acquired.");
57 #endif
60 static void
61 ractor_lock(rb_ractor_t *r, const char *file, int line)
63 RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
65 ASSERT_ractor_unlocking(r);
66 rb_native_mutex_lock(&r->sync.lock);
68 #if RACTOR_CHECK_MODE > 0
69 if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an RJIT worker
70 rb_ractor_t *cr = rb_current_ractor_raw(false);
71 r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef;
73 #endif
75 RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
78 static void
79 ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
81 VM_ASSERT(cr == GET_RACTOR());
82 #if RACTOR_CHECK_MODE > 0
83 VM_ASSERT(cr->sync.locked_by != cr->pub.self);
84 #endif
85 ractor_lock(cr, file, line);
88 static void
89 ractor_unlock(rb_ractor_t *r, const char *file, int line)
91 ASSERT_ractor_locking(r);
92 #if RACTOR_CHECK_MODE > 0
93 r->sync.locked_by = Qnil;
94 #endif
95 rb_native_mutex_unlock(&r->sync.lock);
97 RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
100 static void
101 ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
103 VM_ASSERT(cr == GET_RACTOR());
104 #if RACTOR_CHECK_MODE > 0
105 VM_ASSERT(cr->sync.locked_by == cr->pub.self);
106 #endif
107 ractor_unlock(cr, file, line);
110 #define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
111 #define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
112 #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
113 #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
115 void
116 rb_ractor_lock_self(rb_ractor_t *r)
118 RACTOR_LOCK_SELF(r);
121 void
122 rb_ractor_unlock_self(rb_ractor_t *r)
124 RACTOR_UNLOCK_SELF(r);
127 // Ractor status
129 static const char *
130 ractor_status_str(enum ractor_status status)
132 switch (status) {
133 case ractor_created: return "created";
134 case ractor_running: return "running";
135 case ractor_blocking: return "blocking";
136 case ractor_terminated: return "terminated";
138 rb_bug("unreachable");
141 static void
142 ractor_status_set(rb_ractor_t *r, enum ractor_status status)
144 RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status));
146 // check 1
147 if (r->status_ != ractor_created) {
148 VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed.
149 ASSERT_vm_locking();
152 // check2: transition check. assume it will be vanished on non-debug build.
153 switch (r->status_) {
154 case ractor_created:
155 VM_ASSERT(status == ractor_blocking);
156 break;
157 case ractor_running:
158 VM_ASSERT(status == ractor_blocking||
159 status == ractor_terminated);
160 break;
161 case ractor_blocking:
162 VM_ASSERT(status == ractor_running);
163 break;
164 case ractor_terminated:
165 rb_bug("unreachable");
166 break;
169 r->status_ = status;
172 static bool
173 ractor_status_p(rb_ractor_t *r, enum ractor_status status)
175 return rb_ractor_status_p(r, status);
178 // Ractor data/mark/free
180 static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
181 static void ractor_local_storage_mark(rb_ractor_t *r);
182 static void ractor_local_storage_free(rb_ractor_t *r);
184 static void
185 ractor_queue_mark(struct rb_ractor_queue *rq)
187 for (int i=0; i<rq->cnt; i++) {
188 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
189 rb_gc_mark(b->sender);
191 switch (b->type.e) {
192 case basket_type_yielding:
193 case basket_type_take_basket:
194 case basket_type_deleted:
195 case basket_type_reserved:
196 // ignore
197 break;
198 default:
199 rb_gc_mark(b->p.send.v);
204 static void
205 ractor_mark(void *ptr)
207 rb_ractor_t *r = (rb_ractor_t *)ptr;
209 ractor_queue_mark(&r->sync.recv_queue);
210 ractor_queue_mark(&r->sync.takers_queue);
212 rb_gc_mark(r->receiving_mutex);
214 rb_gc_mark(r->loc);
215 rb_gc_mark(r->name);
216 rb_gc_mark(r->r_stdin);
217 rb_gc_mark(r->r_stdout);
218 rb_gc_mark(r->r_stderr);
219 rb_hook_list_mark(&r->pub.hooks);
221 if (r->threads.cnt > 0) {
222 rb_thread_t *th = 0;
223 ccan_list_for_each(&r->threads.set, th, lt_node) {
224 VM_ASSERT(th != NULL);
225 rb_gc_mark(th->self);
229 ractor_local_storage_mark(r);
232 static void
233 ractor_queue_free(struct rb_ractor_queue *rq)
235 free(rq->baskets);
238 static void
239 ractor_free(void *ptr)
241 rb_ractor_t *r = (rb_ractor_t *)ptr;
242 RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
243 rb_native_mutex_destroy(&r->sync.lock);
244 #ifdef RUBY_THREAD_WIN32_H
245 rb_native_cond_destroy(&r->sync.cond);
246 #endif
247 ractor_queue_free(&r->sync.recv_queue);
248 ractor_queue_free(&r->sync.takers_queue);
249 ractor_local_storage_free(r);
250 rb_hook_list_free(&r->pub.hooks);
251 ruby_xfree(r);
254 static size_t
255 ractor_queue_memsize(const struct rb_ractor_queue *rq)
257 return sizeof(struct rb_ractor_basket) * rq->size;
260 static size_t
261 ractor_memsize(const void *ptr)
263 rb_ractor_t *r = (rb_ractor_t *)ptr;
265 // TODO: more correct?
266 return sizeof(rb_ractor_t) +
267 ractor_queue_memsize(&r->sync.recv_queue) +
268 ractor_queue_memsize(&r->sync.takers_queue);
271 static const rb_data_type_t ractor_data_type = {
272 "ractor",
274 ractor_mark,
275 ractor_free,
276 ractor_memsize,
277 NULL, // update
279 0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */
282 bool
283 rb_ractor_p(VALUE gv)
285 if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) {
286 return true;
288 else {
289 return false;
293 static inline rb_ractor_t *
294 RACTOR_PTR(VALUE self)
296 VM_ASSERT(rb_ractor_p(self));
297 rb_ractor_t *r = DATA_PTR(self);
298 return r;
301 static rb_atomic_t ractor_last_id;
303 #if RACTOR_CHECK_MODE > 0
304 uint32_t
305 rb_ractor_current_id(void)
307 if (GET_THREAD()->ractor == NULL) {
308 return 1; // main ractor
310 else {
311 return rb_ractor_id(GET_RACTOR());
314 #endif
316 // Ractor queue
318 static void
319 ractor_queue_setup(struct rb_ractor_queue *rq)
321 rq->size = 2;
322 rq->cnt = 0;
323 rq->start = 0;
324 rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
327 static struct rb_ractor_basket *
328 ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
330 if (r != NULL) ASSERT_ractor_locking(r);
331 return &rq->baskets[rq->start];
334 static struct rb_ractor_basket *
335 ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
337 if (r != NULL) ASSERT_ractor_locking(r);
338 return &rq->baskets[(rq->start + i) % rq->size];
341 static void
342 ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
344 ASSERT_ractor_locking(r);
346 if (rq->reserved_cnt == 0) {
347 rq->cnt--;
348 rq->start = (rq->start + 1) % rq->size;
349 rq->serial++;
351 else {
352 ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
356 static bool
357 ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
359 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
360 return basket_type_p(b, basket_type_deleted) ||
361 basket_type_p(b, basket_type_reserved);
364 static void
365 ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
367 ASSERT_ractor_locking(r);
369 while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
370 ractor_queue_advance(r, rq);
374 static bool
375 ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
377 ASSERT_ractor_locking(r);
379 if (rq->cnt == 0) {
380 return true;
383 ractor_queue_compact(r, rq);
385 for (int i=0; i<rq->cnt; i++) {
386 if (!ractor_queue_skip_p(r, rq, i)) {
387 return false;
391 return true;
394 static bool
395 ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
397 ASSERT_ractor_locking(r);
399 for (int i=0; i<rq->cnt; i++) {
400 if (!ractor_queue_skip_p(r, rq, i)) {
401 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
402 *basket = *b;
404 // remove from queue
405 b->type.e = basket_type_deleted;
406 ractor_queue_compact(r, rq);
407 return true;
411 return false;
414 static void
415 ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
417 ASSERT_ractor_locking(r);
419 if (rq->size <= rq->cnt) {
420 rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
421 for (int i=rq->size - rq->start; i<rq->cnt; i++) {
422 rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
424 rq->size *= 2;
426 rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
427 // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
430 static void
431 ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
433 basket->type.e = basket_type_deleted;
436 // Ractor basket
438 static VALUE ractor_reset_belonging(VALUE obj); // in this file
440 static VALUE
441 ractor_basket_value(struct rb_ractor_basket *b)
443 switch (b->type.e) {
444 case basket_type_ref:
445 break;
446 case basket_type_copy:
447 case basket_type_move:
448 case basket_type_will:
449 b->type.e = basket_type_ref;
450 b->p.send.v = ractor_reset_belonging(b->p.send.v);
451 break;
452 default:
453 rb_bug("unreachable");
456 return b->p.send.v;
459 static VALUE
460 ractor_basket_accept(struct rb_ractor_basket *b)
462 VALUE v = ractor_basket_value(b);
464 if (b->p.send.exception) {
465 VALUE cause = v;
466 VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
467 rb_ivar_set(err, rb_intern("@ractor"), b->sender);
468 rb_ec_setup_exception(NULL, err, cause);
469 rb_exc_raise(err);
472 return v;
475 // Ractor synchronizations
477 #if USE_RUBY_DEBUG_LOG
478 static const char *
479 wait_status_str(enum rb_ractor_wait_status wait_status)
481 switch ((int)wait_status) {
482 case wait_none: return "none";
483 case wait_receiving: return "receiving";
484 case wait_taking: return "taking";
485 case wait_yielding: return "yielding";
486 case wait_receiving|wait_taking: return "receiving|taking";
487 case wait_receiving|wait_yielding: return "receiving|yielding";
488 case wait_taking|wait_yielding: return "taking|yielding";
489 case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
491 rb_bug("unreachable");
494 static const char *
495 wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
497 switch (wakeup_status) {
498 case wakeup_none: return "none";
499 case wakeup_by_send: return "by_send";
500 case wakeup_by_yield: return "by_yield";
501 case wakeup_by_take: return "by_take";
502 case wakeup_by_close: return "by_close";
503 case wakeup_by_interrupt: return "by_interrupt";
504 case wakeup_by_retry: return "by_retry";
506 rb_bug("unreachable");
509 static const char *
510 basket_type_name(enum rb_ractor_basket_type type)
512 switch (type) {
513 case basket_type_none: return "none";
514 case basket_type_ref: return "ref";
515 case basket_type_copy: return "copy";
516 case basket_type_move: return "move";
517 case basket_type_will: return "will";
518 case basket_type_deleted: return "deleted";
519 case basket_type_reserved: return "reserved";
520 case basket_type_take_basket: return "take_basket";
521 case basket_type_yielding: return "yielding";
523 VM_ASSERT(0);
524 return NULL;
526 #endif // USE_RUBY_DEBUG_LOG
528 static bool
529 ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
531 return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
534 #ifdef RUBY_THREAD_PTHREAD_H
535 // thread_*.c
536 void rb_ractor_sched_wakeup(rb_ractor_t *r);
537 #else
539 static void
540 rb_ractor_sched_wakeup(rb_ractor_t *r)
542 rb_native_cond_broadcast(&r->sync.cond);
544 #endif
547 static bool
548 ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
550 ASSERT_ractor_locking(r);
552 RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
553 rb_ractor_id(r),
554 wait_status_str(r->sync.wait.status),
555 wait_status_str(wait_status),
556 wakeup_status_str(wakeup_status));
558 if (ractor_sleeping_by(r, wait_status)) {
559 r->sync.wait.wakeup_status = wakeup_status;
560 rb_ractor_sched_wakeup(r);
561 return true;
563 else {
564 return false;
568 static void
569 ractor_sleep_interrupt(void *ptr)
571 rb_ractor_t *r = ptr;
573 RACTOR_LOCK(r);
575 ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
577 RACTOR_UNLOCK(r);
580 typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
582 static void
583 ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
585 if (cr->sync.wait.status != wait_none) {
586 enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
587 cr->sync.wait.status = wait_none;
588 cr->sync.wait.wakeup_status = wakeup_by_interrupt;
590 RACTOR_UNLOCK(cr);
592 if (cf_func) {
593 enum ruby_tag_type state;
594 EC_PUSH_TAG(ec);
595 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
596 rb_thread_check_ints();
598 EC_POP_TAG();
600 if (state) {
601 (*cf_func)(cr, cf_data);
602 EC_JUMP_TAG(ec, state);
605 else {
606 rb_thread_check_ints();
610 // reachable?
611 RACTOR_LOCK(cr);
612 cr->sync.wait.status = prev_wait_status;
616 #ifdef RUBY_THREAD_PTHREAD_H
617 void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
618 #else
620 // win32
621 static void
622 ractor_cond_wait(rb_ractor_t *r)
624 #if RACTOR_CHECK_MODE > 0
625 VALUE locked_by = r->sync.locked_by;
626 r->sync.locked_by = Qnil;
627 #endif
628 rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
630 #if RACTOR_CHECK_MODE > 0
631 r->sync.locked_by = locked_by;
632 #endif
635 static void *
636 ractor_sleep_wo_gvl(void *ptr)
638 rb_ractor_t *cr = ptr;
639 RACTOR_LOCK_SELF(cr);
641 VM_ASSERT(cr->sync.wait.status != wait_none);
642 if (cr->sync.wait.wakeup_status == wakeup_none) {
643 ractor_cond_wait(cr);
645 cr->sync.wait.status = wait_none;
647 RACTOR_UNLOCK_SELF(cr);
648 return NULL;
651 static void
652 rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
654 RACTOR_UNLOCK(cr);
656 rb_nogvl(ractor_sleep_wo_gvl, cr,
657 ubf, cr,
658 RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
660 RACTOR_LOCK(cr);
662 #endif
664 static enum rb_ractor_wakeup_status
665 ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
666 ractor_sleep_cleanup_function cf_func, void *cf_data)
668 enum rb_ractor_wakeup_status wakeup_status;
669 VM_ASSERT(GET_RACTOR() == cr);
671 // TODO: multi-threads
672 VM_ASSERT(cr->sync.wait.status == wait_none);
673 VM_ASSERT(wait_status != wait_none);
674 cr->sync.wait.status = wait_status;
675 cr->sync.wait.wakeup_status = wakeup_none;
677 // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
678 // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
680 RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
682 while (cr->sync.wait.wakeup_status == wakeup_none) {
683 rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
684 ractor_check_ints(ec, cr, cf_func, cf_data);
687 cr->sync.wait.status = wait_none;
689 // TODO: multi-thread
690 wakeup_status = cr->sync.wait.wakeup_status;
691 cr->sync.wait.wakeup_status = wakeup_none;
693 RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
695 return wakeup_status;
698 static enum rb_ractor_wakeup_status
699 ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status)
701 return ractor_sleep_with_cleanup(ec, cr, wait_status, 0, NULL);
704 // Ractor.receive
706 static void
707 ractor_recursive_receive_if(rb_ractor_t *r)
709 if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
710 rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
714 static VALUE
715 ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
717 struct rb_ractor_basket basket;
718 ractor_recursive_receive_if(cr);
719 bool received = false;
721 RACTOR_LOCK_SELF(cr);
723 RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
724 received = ractor_queue_deq(cr, rq, &basket);
726 RACTOR_UNLOCK_SELF(cr);
728 if (!received) {
729 if (cr->sync.incoming_port_closed) {
730 rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
732 return Qundef;
734 else {
735 return ractor_basket_accept(&basket);
739 static void
740 ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
742 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
743 ractor_recursive_receive_if(cr);
745 RACTOR_LOCK(cr);
747 while (ractor_queue_empty_p(cr, rq)) {
748 ractor_sleep(ec, cr, wait_receiving);
751 RACTOR_UNLOCK(cr);
754 static VALUE
755 ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
757 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
758 VALUE v;
759 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
761 while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
762 ractor_wait_receive(ec, cr, rq);
765 return v;
768 #if 0
769 static void
770 rq_dump(struct rb_ractor_queue *rq)
772 bool bug = false;
773 for (int i=0; i<rq->cnt; i++) {
774 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
775 fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
776 (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
777 if (basket_type_p(b, basket_type_reserved) bug = true;
779 if (bug) rb_bug("!!");
781 #endif
783 struct receive_block_data {
784 rb_ractor_t *cr;
785 struct rb_ractor_queue *rq;
786 VALUE v;
787 int index;
788 bool success;
791 static void
792 ractor_receive_if_lock(rb_ractor_t *cr)
794 VALUE m = cr->receiving_mutex;
795 if (m == Qfalse) {
796 m = cr->receiving_mutex = rb_mutex_new();
798 rb_mutex_lock(m);
801 static VALUE
802 receive_if_body(VALUE ptr)
804 struct receive_block_data *data = (struct receive_block_data *)ptr;
806 ractor_receive_if_lock(data->cr);
807 VALUE block_result = rb_yield(data->v);
808 rb_ractor_t *cr = data->cr;
810 RACTOR_LOCK_SELF(cr);
812 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
813 VM_ASSERT(basket_type_p(b, basket_type_reserved));
814 data->rq->reserved_cnt--;
816 if (RTEST(block_result)) {
817 ractor_queue_delete(cr, data->rq, b);
818 ractor_queue_compact(cr, data->rq);
820 else {
821 b->type.e = basket_type_ref;
824 RACTOR_UNLOCK_SELF(cr);
826 data->success = true;
828 if (RTEST(block_result)) {
829 return data->v;
831 else {
832 return Qundef;
836 static VALUE
837 receive_if_ensure(VALUE v)
839 struct receive_block_data *data = (struct receive_block_data *)v;
840 rb_ractor_t *cr = data->cr;
842 if (!data->success) {
843 RACTOR_LOCK_SELF(cr);
845 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
846 VM_ASSERT(basket_type_p(b, basket_type_reserved));
847 b->type.e = basket_type_deleted;
848 data->rq->reserved_cnt--;
850 RACTOR_UNLOCK_SELF(cr);
853 rb_mutex_unlock(cr->receiving_mutex);
854 return Qnil;
857 static VALUE
858 ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
860 if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
862 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
863 unsigned int serial = (unsigned int)-1;
864 int index = 0;
865 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
867 while (1) {
868 VALUE v = Qundef;
870 ractor_wait_receive(ec, cr, rq);
872 RACTOR_LOCK_SELF(cr);
874 if (serial != rq->serial) {
875 serial = rq->serial;
876 index = 0;
879 // check newer version
880 for (int i=index; i<rq->cnt; i++) {
881 if (!ractor_queue_skip_p(cr, rq, i)) {
882 struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
883 v = ractor_basket_value(b);
884 b->type.e = basket_type_reserved;
885 rq->reserved_cnt++;
886 index = i;
887 break;
891 RACTOR_UNLOCK_SELF(cr);
893 if (!UNDEF_P(v)) {
894 struct receive_block_data data = {
895 .cr = cr,
896 .rq = rq,
897 .v = v,
898 .index = index,
899 .success = false,
902 VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
903 receive_if_ensure, (VALUE)&data);
905 if (!UNDEF_P(result)) return result;
906 index++;
909 RUBY_VM_CHECK_INTS(ec);
913 static void
914 ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
916 bool closed = false;
918 RACTOR_LOCK(r);
920 if (r->sync.incoming_port_closed) {
921 closed = true;
923 else {
924 ractor_queue_enq(r, &r->sync.recv_queue, b);
925 ractor_wakeup(r, wait_receiving, wakeup_by_send);
928 RACTOR_UNLOCK(r);
930 if (closed) {
931 rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
935 // Ractor#send
937 static VALUE ractor_move(VALUE obj); // in this file
938 static VALUE ractor_copy(VALUE obj); // in this file
940 static void
941 ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype)
943 VALUE v;
944 enum rb_ractor_basket_type type;
946 if (rb_ractor_shareable_p(obj)) {
947 type = basket_type_ref;
948 v = obj;
950 else if (!RTEST(move)) {
951 v = ractor_copy(obj);
952 type = basket_type_copy;
954 else {
955 type = basket_type_move;
956 v = ractor_move(obj);
959 *pobj = v;
960 *ptype = type;
963 static void
964 ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
966 VM_ASSERT(cr == GET_RACTOR());
968 basket->sender = cr->pub.self;
969 basket->p.send.exception = exc;
970 basket->p.send.v = obj;
973 static void
974 ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
976 VALUE v;
977 enum rb_ractor_basket_type type;
978 ractor_basket_prepare_contents(obj, move, &v, &type);
979 ractor_basket_fill_(cr, basket, v, exc);
980 basket->type.e = type;
983 static void
984 ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
986 ractor_basket_fill_(cr, basket, obj, exc);
987 basket->type.e = basket_type_will;
990 static VALUE
991 ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
993 struct rb_ractor_basket basket;
994 // TODO: Ractor local GC
995 ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false);
996 ractor_send_basket(ec, r, &basket);
997 return r->pub.self;
1000 // Ractor#take
1002 static bool
1003 ractor_take_has_will(rb_ractor_t *r)
1005 ASSERT_ractor_locking(r);
1007 return basket_type_p(&r->sync.will_basket, basket_type_will);
1010 static bool
1011 ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
1013 ASSERT_ractor_locking(r);
1015 if (ractor_take_has_will(r)) {
1016 *b = r->sync.will_basket;
1017 r->sync.will_basket.type.e = basket_type_none;
1018 return true;
1020 else {
1021 VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
1022 return false;
1026 static bool
1027 ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
1029 ASSERT_ractor_unlocking(r);
1030 bool taken;
1032 RACTOR_LOCK(r);
1034 taken = ractor_take_will(r, b);
1036 RACTOR_UNLOCK(r);
1038 return taken;
1041 static bool
1042 ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
1043 bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
1045 struct rb_ractor_basket b = {
1046 .type.e = basket_type_take_basket,
1047 .sender = cr->pub.self,
1048 .p = {
1049 .take = {
1050 .basket = take_basket,
1051 .config = config,
1055 bool closed = false;
1057 RACTOR_LOCK(r);
1059 if (is_take && ractor_take_will(r, take_basket)) {
1060 RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
1062 else if (!is_take && ractor_take_has_will(r)) {
1063 RUBY_DEBUG_LOG("has_will");
1064 VM_ASSERT(config != NULL);
1065 config->closed = true;
1067 else if (r->sync.outgoing_port_closed) {
1068 closed = true;
1070 else {
1071 RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
1072 ractor_queue_enq(r, &r->sync.takers_queue, &b);
1074 if (basket_none_p(take_basket)) {
1075 ractor_wakeup(r, wait_yielding, wakeup_by_take);
1079 RACTOR_UNLOCK(r);
1081 if (closed) {
1082 if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1083 return false;
1085 else {
1086 return true;
1090 static bool
1091 ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1093 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1094 bool deleted = false;
1096 RACTOR_LOCK(r);
1098 if (r->sync.outgoing_port_closed) {
1099 // ok
1101 else {
1102 for (int i=0; i<ts->cnt; i++) {
1103 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1104 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
1105 ractor_queue_delete(r, ts, b);
1106 deleted = true;
1109 if (deleted) {
1110 ractor_queue_compact(r, ts);
1114 RACTOR_UNLOCK(r);
1116 return deleted;
1119 static VALUE
1120 ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1122 bool taken;
1124 RACTOR_LOCK_SELF(cr);
1126 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1127 taken = false;
1129 else {
1130 taken = true;
1133 RACTOR_UNLOCK_SELF(cr);
1135 if (taken) {
1136 RUBY_DEBUG_LOG("taken");
1137 if (basket_type_p(take_basket, basket_type_deleted)) {
1138 VM_ASSERT(r->sync.outgoing_port_closed);
1139 rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1141 return ractor_basket_accept(take_basket);
1143 else {
1144 RUBY_DEBUG_LOG("not taken");
1145 return Qundef;
1150 #if VM_CHECK_MODE > 0
1151 static bool
1152 ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
1154 bool ret = false;
1155 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1157 RACTOR_LOCK(r);
1159 for (int i=0; i<ts->cnt; i++) {
1160 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1161 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
1162 ret = true;
1163 break;
1167 RACTOR_UNLOCK(r);
1169 return ret;
1171 #endif
1173 static void
1174 ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
1176 retry:
1177 if (basket_none_p(tb)) { // not yielded yet
1178 if (!ractor_deregister_take(r, tb)) {
1179 // not in r's takers queue
1180 rb_thread_sleep(0);
1181 goto retry;
1184 else {
1185 VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
1189 struct take_wait_take_cleanup_data {
1190 rb_ractor_t *r;
1191 struct rb_ractor_basket *tb;
1194 static void
1195 ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
1197 struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
1198 ractor_take_cleanup(cr, data->r, data->tb);
1201 static void
1202 ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1204 struct take_wait_take_cleanup_data data = {
1205 .r = r,
1206 .tb = take_basket,
1209 RACTOR_LOCK_SELF(cr);
1211 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1212 ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data);
1215 RACTOR_UNLOCK_SELF(cr);
1218 static VALUE
1219 ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
1221 RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r));
1222 VALUE v;
1223 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1225 struct rb_ractor_basket take_basket = {
1226 .type.e = basket_type_none,
1227 .sender = 0,
1230 ractor_register_take(cr, r, &take_basket, true, NULL, false);
1232 while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) {
1233 ractor_wait_take(ec, cr, r, &take_basket);
1236 VM_ASSERT(!basket_none_p(&take_basket));
1237 VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket));
1239 return v;
1242 // Ractor.yield
1244 static bool
1245 ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
1247 ASSERT_ractor_locking(cr);
1249 for (int i=0; i<rs->cnt; i++) {
1250 struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
1251 if (basket_type_p(b, basket_type_take_basket) &&
1252 basket_none_p(b->p.take.basket)) {
1253 return true;
1257 return false;
1260 static bool
1261 ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
1263 ASSERT_ractor_unlocking(cr);
1264 struct rb_ractor_basket *first_tb = NULL;
1265 bool found = false;
1267 RACTOR_LOCK_SELF(cr);
1269 while (ractor_queue_deq(cr, rs, b)) {
1270 if (basket_type_p(b, basket_type_take_basket)) {
1271 struct rb_ractor_basket *tb = b->p.take.basket;
1273 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1274 found = true;
1275 break;
1277 else {
1278 ractor_queue_enq(cr, rs, b);
1279 if (first_tb == NULL) first_tb = tb;
1280 struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
1281 VM_ASSERT(head != NULL);
1282 if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
1283 break; // loop detected
1287 else {
1288 VM_ASSERT(basket_none_p(b));
1292 if (found && b->p.take.config && !b->p.take.config->oneshot) {
1293 ractor_queue_enq(cr, rs, b);
1296 RACTOR_UNLOCK_SELF(cr);
1298 return found;
1301 static bool
1302 ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
1304 ASSERT_ractor_unlocking(cr);
1306 struct rb_ractor_basket b;
1308 if (ractor_deq_take_basket(cr, ts, &b)) {
1309 VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
1310 VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
1312 rb_ractor_t *tr = RACTOR_PTR(b.sender);
1313 struct rb_ractor_basket *tb = b.p.take.basket;
1314 enum rb_ractor_basket_type type;
1316 RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
1318 if (is_will) {
1319 type = basket_type_will;
1321 else {
1322 enum ruby_tag_type state;
1324 // begin
1325 EC_PUSH_TAG(ec);
1326 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1327 // TODO: Ractor local GC
1328 ractor_basket_prepare_contents(obj, move, &obj, &type);
1330 EC_POP_TAG();
1331 // rescue
1332 if (state) {
1333 RACTOR_LOCK_SELF(cr);
1335 b.p.take.basket->type.e = basket_type_none;
1336 ractor_queue_enq(cr, ts, &b);
1338 RACTOR_UNLOCK_SELF(cr);
1339 EC_JUMP_TAG(ec, state);
1343 RACTOR_LOCK(tr);
1345 VM_ASSERT(basket_type_p(tb, basket_type_yielding));
1346 // fill atomic
1347 RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
1348 ractor_basket_fill_(cr, tb, obj, exc);
1349 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
1350 rb_bug("unreachable");
1352 ractor_wakeup(tr, wait_taking, wakeup_by_yield);
1354 RACTOR_UNLOCK(tr);
1356 return true;
1358 else {
1359 RUBY_DEBUG_LOG("no take basket");
1360 return false;
1364 static void
1365 ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
1367 RACTOR_LOCK_SELF(cr);
1369 while (!ractor_check_take_basket(cr, ts)) {
1370 ractor_sleep(ec, cr, wait_yielding);
1373 RACTOR_UNLOCK_SELF(cr);
1376 static VALUE
1377 ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
1379 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1381 while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
1382 ractor_wait_yield(ec, cr, ts);
1385 return Qnil;
1388 // Ractor::Selector
1390 struct rb_ractor_selector {
1391 rb_ractor_t *r;
1392 struct rb_ractor_basket take_basket;
1393 st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
1396 static int
1397 ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
1399 const rb_ractor_t *r = (rb_ractor_t *)key;
1400 rb_gc_mark(r->pub.self);
1401 return ST_CONTINUE;
1404 static void
1405 ractor_selector_mark(void *ptr)
1407 struct rb_ractor_selector *s = ptr;
1409 if (s->take_ractors) {
1410 st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
1413 switch (s->take_basket.type.e) {
1414 case basket_type_ref:
1415 case basket_type_copy:
1416 case basket_type_move:
1417 case basket_type_will:
1418 rb_gc_mark(s->take_basket.sender);
1419 rb_gc_mark(s->take_basket.p.send.v);
1420 break;
1421 default:
1422 break;
1426 static int
1427 ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
1429 struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
1430 struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val;
1432 if (!config->closed) {
1433 ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
1435 free(config);
1436 return ST_CONTINUE;
1439 static void
1440 ractor_selector_free(void *ptr)
1442 struct rb_ractor_selector *s = ptr;
1443 st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
1444 st_free_table(s->take_ractors);
1445 ruby_xfree(ptr);
1448 static size_t
1449 ractor_selector_memsize(const void *ptr)
1451 const struct rb_ractor_selector *s = ptr;
1452 return sizeof(struct rb_ractor_selector) +
1453 st_memsize(s->take_ractors) +
1454 s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
1457 static const rb_data_type_t ractor_selector_data_type = {
1458 "ractor/selector",
1460 ractor_selector_mark,
1461 ractor_selector_free,
1462 ractor_selector_memsize,
1463 NULL, // update
1465 0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
1468 static struct rb_ractor_selector *
1469 RACTOR_SELECTOR_PTR(VALUE selv)
1471 VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
1473 return (struct rb_ractor_selector *)DATA_PTR(selv);
1476 // Ractor::Selector.new
1478 static VALUE
1479 ractor_selector_create(VALUE klass)
1481 struct rb_ractor_selector *s;
1482 VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s);
1483 s->take_basket.type.e = basket_type_reserved;
1484 s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
1485 return selv;
1488 // Ractor::Selector#add(r)
1490 static VALUE
1491 ractor_selector_add(VALUE selv, VALUE rv)
1493 if (!rb_ractor_p(rv)) {
1494 rb_raise(rb_eArgError, "Not a ractor object");
1497 rb_ractor_t *r = RACTOR_PTR(rv);
1498 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1500 if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1501 rb_raise(rb_eArgError, "already added");
1504 struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
1505 VM_ASSERT(config != NULL);
1506 config->closed = false;
1507 config->oneshot = false;
1509 if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
1510 st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
1513 return rv;
1516 // Ractor::Selector#remove(r)
1518 static VALUE
1519 ractor_selector_remove(VALUE selv, VALUE rv)
1521 if (!rb_ractor_p(rv)) {
1522 rb_raise(rb_eArgError, "Not a ractor object");
1525 rb_ractor_t *r = RACTOR_PTR(rv);
1526 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1528 RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
1530 if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1531 rb_raise(rb_eArgError, "not added yet");
1534 ractor_deregister_take(r, &s->take_basket);
1535 struct rb_ractor_selector_take_config *config;
1536 st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
1537 free(config);
1539 return rv;
1542 // Ractor::Selector#clear
1544 struct ractor_selector_clear_data {
1545 VALUE selv;
1546 rb_execution_context_t *ec;
1549 static int
1550 ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
1552 VALUE selv = (VALUE)data;
1553 rb_ractor_t *r = (rb_ractor_t *)key;
1554 ractor_selector_remove(selv, r->pub.self);
1555 return ST_CONTINUE;
1558 static VALUE
1559 ractor_selector_clear(VALUE selv)
1561 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1563 st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
1564 st_clear(s->take_ractors);
1565 return selv;
1568 static VALUE
1569 ractor_selector_empty_p(VALUE selv)
1571 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1572 return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
1575 static int
1576 ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
1578 rb_ractor_t *r = (rb_ractor_t *)key;
1579 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
1580 int ret;
1582 if (!basket_none_p(tb)) {
1583 RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
1584 return ST_STOP;
1587 RACTOR_LOCK(r);
1589 if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
1590 RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
1592 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
1593 ractor_take_will(r, tb);
1594 ret = ST_STOP;
1596 else {
1597 RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
1598 ret = ST_CONTINUE;
1601 else if (r->sync.outgoing_port_closed) {
1602 RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
1604 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
1605 tb->sender = r->pub.self;
1606 ret = ST_STOP;
1608 else {
1609 RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
1610 ret = ST_CONTINUE;
1613 else {
1614 RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
1615 ractor_wakeup(r, wait_yielding, wakeup_by_take);
1616 ret = ST_CONTINUE;
1619 RACTOR_UNLOCK(r);
1621 return ret;
1624 // Ractor::Selector#wait
1626 static void
1627 ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr)
1629 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
1631 RACTOR_LOCK_SELF(cr);
1633 while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0);
1634 // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
1635 tb->type.e = basket_type_reserved;
1637 RACTOR_UNLOCK_SELF(cr);
1640 static VALUE
1641 ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
1643 rb_execution_context_t *ec = GET_EC();
1644 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1645 struct rb_ractor_basket *tb = &s->take_basket;
1646 struct rb_ractor_basket taken_basket;
1647 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1648 bool do_receive = !!RTEST(do_receivev);
1649 bool do_yield = !!RTEST(do_yieldv);
1650 VALUE ret_v, ret_r;
1651 enum rb_ractor_wait_status wait_status;
1652 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
1653 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1655 RUBY_DEBUG_LOG("start");
1657 retry:
1658 RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
1660 // setup wait_status
1661 wait_status = wait_none;
1662 if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
1663 if (do_receive) wait_status |= wait_receiving;
1664 if (do_yield) wait_status |= wait_yielding;
1666 RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
1668 if (wait_status == wait_none) {
1669 rb_raise(rb_eRactorError, "no taking ractors");
1672 // check recv_queue
1673 if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
1674 ret_r = ID2SYM(rb_intern("receive"));
1675 goto success;
1678 // check takers
1679 if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
1680 ret_v = Qnil;
1681 ret_r = ID2SYM(rb_intern("yield"));
1682 goto success;
1685 // check take_basket
1686 VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
1687 s->take_basket.type.e = basket_type_none;
1688 // kick all take target ractors
1689 st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
1691 RACTOR_LOCK_SELF(cr);
1693 retry_waiting:
1694 while (1) {
1695 if (!basket_none_p(tb)) {
1696 RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
1697 tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
1698 break;
1700 if (do_receive && !ractor_queue_empty_p(cr, rq)) {
1701 RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
1702 break;
1704 if (do_yield && ractor_check_take_basket(cr, ts)) {
1705 RUBY_DEBUG_LOG("can yield");
1706 break;
1709 ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
1712 taken_basket = *tb;
1714 // ensure
1715 // tb->type.e = basket_type_reserved # do it atomic in the following code
1716 if (taken_basket.type.e == basket_type_yielding ||
1717 RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
1719 if (basket_type_p(tb, basket_type_yielding)) {
1720 RACTOR_UNLOCK_SELF(cr);
1722 rb_thread_sleep(0);
1724 RACTOR_LOCK_SELF(cr);
1726 goto retry_waiting;
1729 RACTOR_UNLOCK_SELF(cr);
1731 // check the taken resutl
1732 switch (taken_basket.type.e) {
1733 case basket_type_none:
1734 VM_ASSERT(do_receive || do_yield);
1735 goto retry;
1736 case basket_type_yielding:
1737 rb_bug("unreachable");
1738 case basket_type_deleted: {
1739 ractor_selector_remove(selv, taken_basket.sender);
1741 rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
1742 if (ractor_take_will_lock(r, &taken_basket)) {
1743 RUBY_DEBUG_LOG("has_will");
1745 else {
1746 RUBY_DEBUG_LOG("no will");
1747 // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1748 // remove and retry wait
1749 goto retry;
1751 break;
1753 case basket_type_will:
1754 // no more messages
1755 ractor_selector_remove(selv, taken_basket.sender);
1756 break;
1757 default:
1758 break;
1761 RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
1763 ret_v = ractor_basket_accept(&taken_basket);
1764 ret_r = taken_basket.sender;
1765 success:
1766 return rb_ary_new_from_args(2, ret_r, ret_v);
1769 static VALUE
1770 ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
1772 VALUE options;
1773 ID keywords[3];
1774 VALUE values[3];
1776 keywords[0] = rb_intern("receive");
1777 keywords[1] = rb_intern("yield_value");
1778 keywords[2] = rb_intern("move");
1780 rb_scan_args(argc, argv, "0:", &options);
1781 rb_get_kwargs(options, keywords, 0, numberof(values), values);
1782 return ractor_selector__wait(selector,
1783 values[0] == Qundef ? Qfalse : RTEST(values[0]),
1784 values[1] != Qundef, values[1], values[2]);
1787 static VALUE
1788 ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
1790 VALUE selector = ractor_selector_create(klass);
1792 for (int i=0; i<argc; i++) {
1793 ractor_selector_add(selector, ractors[i]);
1796 return selector;
1799 static VALUE
1800 ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move)
1802 VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector);
1803 VALUE result;
1804 int state;
1806 EC_PUSH_TAG(ec);
1807 if ((state = EC_EXEC_TAG() == TAG_NONE)) {
1808 result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move);
1810 else {
1811 // ensure
1812 ractor_selector_clear(selector);
1814 // jump
1815 EC_JUMP_TAG(ec, state);
1817 EC_POP_TAG();
1819 RB_GC_GUARD(ractors);
1820 return result;
1823 // Ractor#close_incoming
1825 static VALUE
1826 ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
1828 VALUE prev;
1830 RACTOR_LOCK(r);
1832 if (!r->sync.incoming_port_closed) {
1833 prev = Qfalse;
1834 r->sync.incoming_port_closed = true;
1835 if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
1836 VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
1837 RUBY_DEBUG_LOG("cancel receiving");
1840 else {
1841 prev = Qtrue;
1844 RACTOR_UNLOCK(r);
1845 return prev;
1848 // Ractor#close_outgoing
1850 static VALUE
1851 ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
1853 VALUE prev;
1855 RACTOR_LOCK(r);
1857 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1858 rb_ractor_t *tr;
1859 struct rb_ractor_basket b;
1861 if (!r->sync.outgoing_port_closed) {
1862 prev = Qfalse;
1863 r->sync.outgoing_port_closed = true;
1865 else {
1866 VM_ASSERT(ractor_queue_empty_p(r, ts));
1867 prev = Qtrue;
1870 // wakeup all taking ractors
1871 while (ractor_queue_deq(r, ts, &b)) {
1872 if (basket_type_p(&b, basket_type_take_basket)) {
1873 tr = RACTOR_PTR(b.sender);
1874 struct rb_ractor_basket *tb = b.p.take.basket;
1876 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1877 b.p.take.basket->sender = r->pub.self;
1878 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
1879 rb_bug("unreachable");
1881 RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
1884 if (b.p.take.config) {
1885 b.p.take.config->closed = true;
1888 // TODO: deadlock-able?
1889 RACTOR_LOCK(tr);
1891 ractor_wakeup(tr, wait_taking, wakeup_by_close);
1893 RACTOR_UNLOCK(tr);
1897 // raising yielding Ractor
1898 ractor_wakeup(r, wait_yielding, wakeup_by_close);
1900 VM_ASSERT(ractor_queue_empty_p(r, ts));
1902 RACTOR_UNLOCK(r);
1903 return prev;
1906 // creation/termination
1908 static uint32_t
1909 ractor_next_id(void)
1911 uint32_t id;
1913 id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1);
1915 return id;
1918 static void
1919 vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
1921 RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
1922 VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
1924 ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
1925 vm->ractor.cnt++;
1928 static void
1929 cancel_single_ractor_mode(void)
1931 // enable multi-ractor mode
1932 RUBY_DEBUG_LOG("enable multi-ractor mode");
1934 VALUE was_disabled = rb_gc_enable();
1936 rb_gc_start();
1938 if (was_disabled) {
1939 rb_gc_disable();
1942 ruby_single_main_ractor = NULL;
1945 static void
1946 vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r)
1948 VM_ASSERT(ractor_status_p(r, ractor_created));
1950 if (rb_multi_ractor_p()) {
1951 RB_VM_LOCK();
1953 vm_insert_ractor0(vm, r, false);
1954 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
1956 RB_VM_UNLOCK();
1958 else {
1959 if (vm->ractor.cnt == 0) {
1960 // main ractor
1961 vm_insert_ractor0(vm, r, true);
1962 ractor_status_set(r, ractor_blocking);
1963 ractor_status_set(r, ractor_running);
1965 else {
1966 cancel_single_ractor_mode();
1967 vm_insert_ractor0(vm, r, true);
1968 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
1973 static void
1974 vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
1976 VM_ASSERT(ractor_status_p(cr, ractor_running));
1977 VM_ASSERT(vm->ractor.cnt > 1);
1978 VM_ASSERT(cr->threads.cnt == 1);
1980 RB_VM_LOCK();
1982 RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
1983 vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
1985 VM_ASSERT(vm->ractor.cnt > 0);
1986 ccan_list_del(&cr->vmlr_node);
1988 if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
1989 rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
1991 vm->ractor.cnt--;
1993 /* Clear the cached freelist to prevent a memory leak. */
1994 rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache);
1996 ractor_status_set(cr, ractor_terminated);
1998 RB_VM_UNLOCK();
2001 static VALUE
2002 ractor_alloc(VALUE klass)
2004 rb_ractor_t *r;
2005 VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
2006 FL_SET_RAW(rv, RUBY_FL_SHAREABLE);
2007 r->pub.self = rv;
2008 VM_ASSERT(ractor_status_p(r, ractor_created));
2009 return rv;
2012 rb_ractor_t *
2013 rb_ractor_main_alloc(void)
2015 rb_ractor_t *r = ruby_mimcalloc(1, sizeof(rb_ractor_t));
2016 if (r == NULL) {
2017 fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
2018 exit(EXIT_FAILURE);
2020 r->pub.id = ++ractor_last_id;
2021 r->loc = Qnil;
2022 r->name = Qnil;
2023 r->pub.self = Qnil;
2024 ruby_single_main_ractor = r;
2026 return r;
2029 #if defined(HAVE_WORKING_FORK)
2030 void
2031 rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
2033 // initialize as a main ractor
2034 vm->ractor.cnt = 0;
2035 vm->ractor.blocking_cnt = 0;
2036 ruby_single_main_ractor = th->ractor;
2037 th->ractor->status_ = ractor_created;
2039 rb_ractor_living_threads_init(th->ractor);
2040 rb_ractor_living_threads_insert(th->ractor, th);
2042 VM_ASSERT(vm->ractor.blocking_cnt == 0);
2043 VM_ASSERT(vm->ractor.cnt == 1);
2045 #endif
2047 void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
2049 void
2050 rb_ractor_living_threads_init(rb_ractor_t *r)
2052 ccan_list_head_init(&r->threads.set);
2053 r->threads.cnt = 0;
2054 r->threads.blocking_cnt = 0;
2057 static void
2058 ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
2060 ractor_queue_setup(&r->sync.recv_queue);
2061 ractor_queue_setup(&r->sync.takers_queue);
2062 rb_native_mutex_initialize(&r->sync.lock);
2063 rb_native_cond_initialize(&r->barrier_wait_cond);
2065 #ifdef RUBY_THREAD_WIN32_H
2066 rb_native_cond_initialize(&r->sync.cond);
2067 rb_native_cond_initialize(&r->barrier_wait_cond);
2068 #endif
2070 // thread management
2071 rb_thread_sched_init(&r->threads.sched, false);
2072 rb_ractor_living_threads_init(r);
2074 // naming
2075 if (!NIL_P(name)) {
2076 rb_encoding *enc;
2077 StringValueCStr(name);
2078 enc = rb_enc_get(name);
2079 if (!rb_enc_asciicompat(enc)) {
2080 rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
2081 rb_enc_name(enc));
2083 name = rb_str_new_frozen(name);
2085 r->name = name;
2086 r->loc = loc;
2089 void
2090 rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
2092 r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
2093 FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
2094 ractor_init(r, Qnil, Qnil);
2095 r->threads.main = th;
2096 rb_ractor_living_threads_insert(r, th);
2099 static VALUE
2100 ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block)
2102 VALUE rv = ractor_alloc(self);
2103 rb_ractor_t *r = RACTOR_PTR(rv);
2104 ractor_init(r, name, loc);
2106 // can block here
2107 r->pub.id = ractor_next_id();
2108 RUBY_DEBUG_LOG("r:%u", r->pub.id);
2110 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2111 r->verbose = cr->verbose;
2112 r->debug = cr->debug;
2114 rb_yjit_before_ractor_spawn();
2115 rb_rjit_before_ractor_spawn();
2116 rb_thread_create_ractor(r, args, block);
2118 RB_GC_GUARD(rv);
2119 return rv;
2122 static void
2123 ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
2125 if (cr->sync.outgoing_port_closed) {
2126 return;
2129 ASSERT_ractor_unlocking(cr);
2131 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
2133 retry:
2134 if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
2135 // OK.
2137 else {
2138 bool retry = false;
2139 RACTOR_LOCK(cr);
2141 if (!ractor_check_take_basket(cr, ts)) {
2142 VM_ASSERT(cr->sync.wait.status == wait_none);
2143 RUBY_DEBUG_LOG("leave a will");
2144 ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc);
2146 else {
2147 RUBY_DEBUG_LOG("rare timing!");
2148 retry = true; // another ractor is waiting for the yield.
2151 RACTOR_UNLOCK(cr);
2153 if (retry) goto retry;
2157 void
2158 rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
2160 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2161 ractor_yield_atexit(ec, cr, result, false);
2164 void
2165 rb_ractor_atexit_exception(rb_execution_context_t *ec)
2167 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2168 ractor_yield_atexit(ec, cr, ec->errinfo, true);
2171 void
2172 rb_ractor_teardown(rb_execution_context_t *ec)
2174 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2175 ractor_close_incoming(ec, cr);
2176 ractor_close_outgoing(ec, cr);
2178 // sync with rb_ractor_terminate_interrupt_main_thread()
2179 RB_VM_LOCK_ENTER();
2181 VM_ASSERT(cr->threads.main != NULL);
2182 cr->threads.main = NULL;
2184 RB_VM_LOCK_LEAVE();
2187 void
2188 rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
2190 for (int i=0; i<len; i++) {
2191 ptr[i] = ractor_receive(ec, r);
2195 void
2196 rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args)
2198 int len = RARRAY_LENINT(args);
2199 for (int i=0; i<len; i++) {
2200 ractor_send(ec, r, RARRAY_AREF(args, i), false);
2204 bool
2205 rb_ractor_main_p_(void)
2207 VM_ASSERT(rb_multi_ractor_p());
2208 rb_execution_context_t *ec = GET_EC();
2209 return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
2212 bool
2213 rb_obj_is_main_ractor(VALUE gv)
2215 if (!rb_ractor_p(gv)) return false;
2216 rb_ractor_t *r = DATA_PTR(gv);
2217 return r == GET_VM()->ractor.main_ractor;
2221 rb_ractor_living_thread_num(const rb_ractor_t *r)
2223 return r->threads.cnt;
2226 // only for current ractor
2227 VALUE
2228 rb_ractor_thread_list(void)
2230 rb_ractor_t *r = GET_RACTOR();
2231 rb_thread_t *th = 0;
2232 VALUE ary = rb_ary_new();
2234 ccan_list_for_each(&r->threads.set, th, lt_node) {
2235 switch (th->status) {
2236 case THREAD_RUNNABLE:
2237 case THREAD_STOPPED:
2238 case THREAD_STOPPED_FOREVER:
2239 rb_ary_push(ary, th->self);
2240 default:
2241 break;
2245 return ary;
2248 void
2249 rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
2251 VM_ASSERT(th != NULL);
2253 RACTOR_LOCK(r);
2255 RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
2256 ccan_list_add_tail(&r->threads.set, &th->lt_node);
2257 r->threads.cnt++;
2259 RACTOR_UNLOCK(r);
2261 // first thread for a ractor
2262 if (r->threads.cnt == 1) {
2263 VM_ASSERT(ractor_status_p(r, ractor_created));
2264 vm_insert_ractor(th->vm, r);
2268 static void
2269 vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line)
2271 ractor_status_set(r, ractor_blocking);
2273 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt);
2274 vm->ractor.blocking_cnt++;
2275 VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
2278 void
2279 rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2281 ASSERT_vm_locking();
2282 VM_ASSERT(GET_RACTOR() == cr);
2283 vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2286 void
2287 rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2289 ASSERT_vm_locking();
2290 VM_ASSERT(GET_RACTOR() == cr);
2292 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt);
2293 VM_ASSERT(vm->ractor.blocking_cnt > 0);
2294 vm->ractor.blocking_cnt--;
2296 ractor_status_set(cr, ractor_running);
2299 static void
2300 ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line)
2302 VM_ASSERT(cr == GET_RACTOR());
2304 RUBY_DEBUG_LOG2(file, line,
2305 "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
2306 cr->threads.cnt, cr->threads.blocking_cnt,
2307 GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt);
2309 VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1);
2311 if (remained_thread_cnt > 0 &&
2312 // will be block
2313 cr->threads.cnt == cr->threads.blocking_cnt + 1) {
2314 // change ractor status: running -> blocking
2315 rb_vm_t *vm = GET_VM();
2316 ASSERT_vm_unlocking();
2318 RB_VM_LOCK();
2320 rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2322 RB_VM_UNLOCK();
2326 void rb_threadptr_remove(rb_thread_t *th);
2328 void
2329 rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
2331 VM_ASSERT(cr == GET_RACTOR());
2332 RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
2333 ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
2335 rb_threadptr_remove(th);
2337 if (cr->threads.cnt == 1) {
2338 vm_remove_ractor(th->vm, cr);
2340 else {
2341 RACTOR_LOCK(cr);
2343 ccan_list_del(&th->lt_node);
2344 cr->threads.cnt--;
2346 RACTOR_UNLOCK(cr);
2350 void
2351 rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line)
2353 RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt);
2355 VM_ASSERT(cr->threads.cnt > 0);
2356 VM_ASSERT(cr == GET_RACTOR());
2358 ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__);
2359 cr->threads.blocking_cnt++;
2362 void
2363 rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
2365 RUBY_DEBUG_LOG2(file, line,
2366 "r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
2367 cr->threads.blocking_cnt, cr->threads.cnt);
2369 VM_ASSERT(cr == GET_RACTOR());
2371 if (cr->threads.cnt == cr->threads.blocking_cnt) {
2372 rb_vm_t *vm = GET_VM();
2374 RB_VM_LOCK_ENTER();
2376 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2378 RB_VM_LOCK_LEAVE();
2381 cr->threads.blocking_cnt--;
2384 void
2385 rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r)
2387 VM_ASSERT(r != GET_RACTOR());
2388 ASSERT_ractor_unlocking(r);
2389 ASSERT_vm_locking();
2391 RACTOR_LOCK(r);
2393 if (ractor_status_p(r, ractor_running)) {
2394 rb_execution_context_t *ec = r->threads.running_ec;
2395 if (ec) {
2396 RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec);
2400 RACTOR_UNLOCK(r);
2403 void
2404 rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
2406 VM_ASSERT(r != GET_RACTOR());
2407 ASSERT_ractor_unlocking(r);
2408 ASSERT_vm_locking();
2410 rb_thread_t *main_th = r->threads.main;
2411 if (main_th) {
2412 if (main_th->status != THREAD_KILLED) {
2413 RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec);
2414 rb_threadptr_interrupt(main_th);
2416 else {
2417 RUBY_DEBUG_LOG("killed (%p)", (void *)main_th);
2422 void rb_thread_terminate_all(rb_thread_t *th); // thread.c
2424 static void
2425 ractor_terminal_interrupt_all(rb_vm_t *vm)
2427 if (vm->ractor.cnt > 1) {
2428 // send terminate notification to all ractors
2429 rb_ractor_t *r = 0;
2430 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2431 if (r != vm->ractor.main_ractor) {
2432 RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r));
2433 rb_ractor_terminate_interrupt_main_thread(r);
2439 void rb_add_running_thread(rb_thread_t *th);
2440 void rb_del_running_thread(rb_thread_t *th);
2442 void
2443 rb_ractor_terminate_all(void)
2445 rb_vm_t *vm = GET_VM();
2446 rb_ractor_t *cr = vm->ractor.main_ractor;
2448 RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt);
2450 VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
2452 if (vm->ractor.cnt > 1) {
2453 RB_VM_LOCK();
2455 ractor_terminal_interrupt_all(vm); // kill all ractors
2457 RB_VM_UNLOCK();
2459 rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
2461 RB_VM_LOCK();
2463 while (vm->ractor.cnt > 1) {
2464 RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting);
2465 vm->ractor.sync.terminate_waiting = true;
2467 // wait for 1sec
2468 rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
2469 rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
2470 rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
2471 rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
2472 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2474 ractor_terminal_interrupt_all(vm);
2477 RB_VM_UNLOCK();
2480 rb_execution_context_t *
2481 rb_vm_main_ractor_ec(rb_vm_t *vm)
2483 /* This code needs to carefully work around two bugs:
2484 * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is
2485 * actually currently running (as opposed to without M:N threading, when
2486 * running_ec will still point to the _last_ thread which ran)
2487 * - Bug #20197: If the main thread is sleeping, setting its postponed job
2488 * interrupt flag is pointless; it won't look at the flag until it stops sleeping
2489 * for some reason. It would be better to set the flag on the running ec, which
2490 * will presumably look at it soon.
2492 * Solution: use running_ec if it's set, otherwise fall back to the main thread ec.
2493 * This is still susceptible to some rare race conditions (what if the last thread
2494 * to run just entered a long-running sleep?), but seems like the best balance of
2495 * robustness and complexity.
2497 rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec;
2498 if (running_ec) { return running_ec; }
2499 return vm->ractor.main_thread->ec;
2502 static VALUE
2503 ractor_moved_missing(int argc, VALUE *argv, VALUE self)
2505 rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
2508 #ifndef USE_RACTOR_SELECTOR
2509 #define USE_RACTOR_SELECTOR 0
2510 #endif
2512 RUBY_SYMBOL_EXPORT_BEGIN
2513 void rb_init_ractor_selector(void);
2514 RUBY_SYMBOL_EXPORT_END
2516 void
2517 rb_init_ractor_selector(void)
2519 rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
2520 rb_undef_alloc_func(rb_cRactorSelector);
2522 rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
2523 rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
2524 rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
2525 rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
2526 rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
2527 rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1);
2528 rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4);
2532 * Document-class: Ractor::ClosedError
2534 * Raised when an attempt is made to send a message to a closed port,
2535 * or to retrieve a message from a closed and empty port.
2536 * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
2537 * and are closed implicitly when a Ractor terminates.
2539 * r = Ractor.new { sleep(500) }
2540 * r.close_outgoing
2541 * r.take # Ractor::ClosedError
2543 * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
2544 * the loops without propagating the error:
2546 * r = Ractor.new do
2547 * loop do
2548 * msg = receive # raises ClosedError and loop traps it
2549 * puts "Received: #{msg}"
2550 * end
2551 * puts "loop exited"
2552 * end
2554 * 3.times{|i| r << i}
2555 * r.close_incoming
2556 * r.take
2557 * puts "Continue successfully"
2559 * This will print:
2561 * Received: 0
2562 * Received: 1
2563 * Received: 2
2564 * loop exited
2565 * Continue successfully
2569 * Document-class: Ractor::RemoteError
2571 * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
2572 * Its +cause+ will contain the original exception, and +ractor+ is the original ractor
2573 * it was raised in.
2575 * r = Ractor.new { raise "Something weird happened" }
2577 * begin
2578 * r.take
2579 * rescue => e
2580 * p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
2581 * p e.ractor == r # => true
2582 * p e.cause # => #<RuntimeError: Something weird happened>
2583 * end
2588 * Document-class: Ractor::MovedError
2590 * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
2592 * r = Ractor.new { sleep }
2594 * ary = [1, 2, 3]
2595 * r.send(ary, move: true)
2596 * ary.inspect
2597 * # Ractor::MovedError (can not send any methods to a moved object)
2602 * Document-class: Ractor::MovedObject
2604 * A special object which replaces any value that was moved to another ractor in Ractor#send
2605 * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
2607 * r = Ractor.new { receive }
2609 * ary = [1, 2, 3]
2610 * r.send(ary, move: true)
2611 * p Ractor::MovedObject === ary
2612 * # => true
2613 * ary.inspect
2614 * # Ractor::MovedError (can not send any methods to a moved object)
2617 // Main docs are in ractor.rb, but without this clause there are weird artifacts
2618 // in their rendering.
2620 * Document-class: Ractor
2624 void
2625 Init_Ractor(void)
2627 rb_cRactor = rb_define_class("Ractor", rb_cObject);
2628 rb_undef_alloc_func(rb_cRactor);
2630 rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError);
2631 rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError);
2632 rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError);
2633 rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError);
2634 rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration);
2635 rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError);
2637 rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject);
2638 rb_undef_alloc_func(rb_cRactorMovedObject);
2639 rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1);
2641 // override methods defined in BasicObject
2642 rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1);
2643 rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1);
2644 rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1);
2645 rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1);
2646 rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1);
2647 rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
2648 rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
2649 rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
2651 #if USE_RACTOR_SELECTOR
2652 rb_init_ractor_selector();
2653 #endif
2656 void
2657 rb_ractor_dump(void)
2659 rb_vm_t *vm = GET_VM();
2660 rb_ractor_t *r = 0;
2662 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2663 if (r != vm->ractor.main_ractor) {
2664 fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
2669 VALUE
2670 rb_ractor_stdin(void)
2672 if (rb_ractor_main_p()) {
2673 return rb_stdin;
2675 else {
2676 rb_ractor_t *cr = GET_RACTOR();
2677 return cr->r_stdin;
2681 VALUE
2682 rb_ractor_stdout(void)
2684 if (rb_ractor_main_p()) {
2685 return rb_stdout;
2687 else {
2688 rb_ractor_t *cr = GET_RACTOR();
2689 return cr->r_stdout;
2693 VALUE
2694 rb_ractor_stderr(void)
2696 if (rb_ractor_main_p()) {
2697 return rb_stderr;
2699 else {
2700 rb_ractor_t *cr = GET_RACTOR();
2701 return cr->r_stderr;
2705 void
2706 rb_ractor_stdin_set(VALUE in)
2708 if (rb_ractor_main_p()) {
2709 rb_stdin = in;
2711 else {
2712 rb_ractor_t *cr = GET_RACTOR();
2713 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in);
2717 void
2718 rb_ractor_stdout_set(VALUE out)
2720 if (rb_ractor_main_p()) {
2721 rb_stdout = out;
2723 else {
2724 rb_ractor_t *cr = GET_RACTOR();
2725 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out);
2729 void
2730 rb_ractor_stderr_set(VALUE err)
2732 if (rb_ractor_main_p()) {
2733 rb_stderr = err;
2735 else {
2736 rb_ractor_t *cr = GET_RACTOR();
2737 RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err);
2741 rb_hook_list_t *
2742 rb_ractor_hooks(rb_ractor_t *cr)
2744 return &cr->pub.hooks;
2747 /// traverse function
2749 // 2: stop search
2750 // 1: skip child
2751 // 0: continue
2753 enum obj_traverse_iterator_result {
2754 traverse_cont,
2755 traverse_skip,
2756 traverse_stop,
2759 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj);
2760 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj);
2761 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj);
2763 static enum obj_traverse_iterator_result null_leave(VALUE obj);
2765 struct obj_traverse_data {
2766 rb_obj_traverse_enter_func enter_func;
2767 rb_obj_traverse_leave_func leave_func;
2769 st_table *rec;
2770 VALUE rec_hash;
2774 struct obj_traverse_callback_data {
2775 bool stop;
2776 struct obj_traverse_data *data;
2779 static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data);
2781 static int
2782 obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
2784 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2786 if (obj_traverse_i(key, d->data)) {
2787 d->stop = true;
2788 return ST_STOP;
2791 if (obj_traverse_i(val, d->data)) {
2792 d->stop = true;
2793 return ST_STOP;
2796 return ST_CONTINUE;
2799 static void
2800 obj_traverse_reachable_i(VALUE obj, void *ptr)
2802 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2804 if (obj_traverse_i(obj, d->data)) {
2805 d->stop = true;
2809 static struct st_table *
2810 obj_traverse_rec(struct obj_traverse_data *data)
2812 if (UNLIKELY(!data->rec)) {
2813 data->rec_hash = rb_ident_hash_new();
2814 data->rec = RHASH_ST_TABLE(data->rec_hash);
2816 return data->rec;
2819 static int
2820 obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr)
2822 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2824 if (obj_traverse_i(val, d->data)) {
2825 d->stop = true;
2826 return ST_STOP;
2829 return ST_CONTINUE;
2832 static int
2833 obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
2835 if (RB_SPECIAL_CONST_P(obj)) return 0;
2837 switch (data->enter_func(obj)) {
2838 case traverse_cont: break;
2839 case traverse_skip: return 0; // skip children
2840 case traverse_stop: return 1; // stop search
2843 if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) {
2844 // already traversed
2845 return 0;
2848 struct obj_traverse_callback_data d = {
2849 .stop = false,
2850 .data = data,
2852 rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d);
2853 if (d.stop) return 1;
2855 switch (BUILTIN_TYPE(obj)) {
2856 // no child node
2857 case T_STRING:
2858 case T_FLOAT:
2859 case T_BIGNUM:
2860 case T_REGEXP:
2861 case T_FILE:
2862 case T_SYMBOL:
2863 case T_MATCH:
2864 break;
2866 case T_OBJECT:
2867 /* Instance variables already traversed. */
2868 break;
2870 case T_ARRAY:
2872 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
2873 VALUE e = rb_ary_entry(obj, i);
2874 if (obj_traverse_i(e, data)) return 1;
2877 break;
2879 case T_HASH:
2881 if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1;
2883 struct obj_traverse_callback_data d = {
2884 .stop = false,
2885 .data = data,
2887 rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d);
2888 if (d.stop) return 1;
2890 break;
2892 case T_STRUCT:
2894 long len = RSTRUCT_LEN(obj);
2895 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
2897 for (long i=0; i<len; i++) {
2898 if (obj_traverse_i(ptr[i], data)) return 1;
2901 break;
2903 case T_RATIONAL:
2904 if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
2905 if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
2906 break;
2907 case T_COMPLEX:
2908 if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1;
2909 if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1;
2910 break;
2912 case T_DATA:
2913 case T_IMEMO:
2915 struct obj_traverse_callback_data d = {
2916 .stop = false,
2917 .data = data,
2919 RB_VM_LOCK_ENTER_NO_BARRIER();
2921 rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
2923 RB_VM_LOCK_LEAVE_NO_BARRIER();
2924 if (d.stop) return 1;
2926 break;
2928 // unreachable
2929 case T_CLASS:
2930 case T_MODULE:
2931 case T_ICLASS:
2932 default:
2933 rp(obj);
2934 rb_bug("unreachable");
2937 if (data->leave_func(obj) == traverse_stop) {
2938 return 1;
2940 else {
2941 return 0;
2945 struct rb_obj_traverse_final_data {
2946 rb_obj_traverse_final_func final_func;
2947 int stopped;
2950 static int
2951 obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg)
2953 struct rb_obj_traverse_final_data *data = (void *)arg;
2954 if (data->final_func(key)) {
2955 data->stopped = 1;
2956 return ST_STOP;
2958 return ST_CONTINUE;
2961 // 0: traverse all
2962 // 1: stopped
2963 static int
2964 rb_obj_traverse(VALUE obj,
2965 rb_obj_traverse_enter_func enter_func,
2966 rb_obj_traverse_leave_func leave_func,
2967 rb_obj_traverse_final_func final_func)
2969 struct obj_traverse_data data = {
2970 .enter_func = enter_func,
2971 .leave_func = leave_func,
2972 .rec = NULL,
2975 if (obj_traverse_i(obj, &data)) return 1;
2976 if (final_func && data.rec) {
2977 struct rb_obj_traverse_final_data f = {final_func, 0};
2978 st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f);
2979 return f.stopped;
2981 return 0;
2984 static int
2985 frozen_shareable_p(VALUE obj, bool *made_shareable)
2987 if (CHILLED_STRING_P(obj)) {
2988 return false;
2990 else if (!RB_TYPE_P(obj, T_DATA)) {
2991 return true;
2993 else if (RTYPEDDATA_P(obj)) {
2994 const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
2995 if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
2996 return true;
2998 else if (made_shareable && rb_obj_is_proc(obj)) {
2999 // special path to make shareable Proc.
3000 rb_proc_ractor_make_shareable(obj);
3001 *made_shareable = true;
3002 VM_ASSERT(RB_OBJ_SHAREABLE_P(obj));
3003 return false;
3007 return false;
3010 static enum obj_traverse_iterator_result
3011 make_shareable_check_shareable(VALUE obj)
3013 VM_ASSERT(!SPECIAL_CONST_P(obj));
3014 bool made_shareable = false;
3016 if (rb_ractor_shareable_p(obj)) {
3017 return traverse_skip;
3019 else if (CHILLED_STRING_P(obj)) {
3020 rb_funcall(obj, idFreeze, 0);
3022 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
3023 rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
3026 if (RB_OBJ_SHAREABLE_P(obj)) {
3027 return traverse_skip;
3030 else if (!frozen_shareable_p(obj, &made_shareable)) {
3031 if (made_shareable) {
3032 return traverse_skip;
3034 else {
3035 rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
3039 if (!RB_OBJ_FROZEN_RAW(obj)) {
3040 rb_funcall(obj, idFreeze, 0);
3042 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
3043 rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
3046 if (RB_OBJ_SHAREABLE_P(obj)) {
3047 return traverse_skip;
3051 return traverse_cont;
3054 static enum obj_traverse_iterator_result
3055 mark_shareable(VALUE obj)
3057 FL_SET_RAW(obj, RUBY_FL_SHAREABLE);
3058 return traverse_cont;
3061 VALUE
3062 rb_ractor_make_shareable(VALUE obj)
3064 rb_obj_traverse(obj,
3065 make_shareable_check_shareable,
3066 null_leave, mark_shareable);
3067 return obj;
3070 VALUE
3071 rb_ractor_make_shareable_copy(VALUE obj)
3073 VALUE copy = ractor_copy(obj);
3074 return rb_ractor_make_shareable(copy);
3077 VALUE
3078 rb_ractor_ensure_shareable(VALUE obj, VALUE name)
3080 if (!rb_ractor_shareable_p(obj)) {
3081 VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE,
3082 name);
3083 rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message));
3085 return obj;
3088 void
3089 rb_ractor_ensure_main_ractor(const char *msg)
3091 if (!rb_ractor_main_p()) {
3092 rb_raise(rb_eRactorIsolationError, "%s", msg);
3096 static enum obj_traverse_iterator_result
3097 shareable_p_enter(VALUE obj)
3099 if (RB_OBJ_SHAREABLE_P(obj)) {
3100 return traverse_skip;
3102 else if (RB_TYPE_P(obj, T_CLASS) ||
3103 RB_TYPE_P(obj, T_MODULE) ||
3104 RB_TYPE_P(obj, T_ICLASS)) {
3105 // TODO: remove it
3106 mark_shareable(obj);
3107 return traverse_skip;
3109 else if (RB_OBJ_FROZEN_RAW(obj) &&
3110 frozen_shareable_p(obj, NULL)) {
3111 return traverse_cont;
3114 return traverse_stop; // fail
3117 bool
3118 rb_ractor_shareable_p_continue(VALUE obj)
3120 if (rb_obj_traverse(obj,
3121 shareable_p_enter, null_leave,
3122 mark_shareable)) {
3123 return false;
3125 else {
3126 return true;
3130 #if RACTOR_CHECK_MODE > 0
3131 static enum obj_traverse_iterator_result
3132 reset_belonging_enter(VALUE obj)
3134 if (rb_ractor_shareable_p(obj)) {
3135 return traverse_skip;
3137 else {
3138 rb_ractor_setup_belonging(obj);
3139 return traverse_cont;
3142 #endif
3144 static enum obj_traverse_iterator_result
3145 null_leave(VALUE obj)
3147 return traverse_cont;
3150 static VALUE
3151 ractor_reset_belonging(VALUE obj)
3153 #if RACTOR_CHECK_MODE > 0
3154 rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL);
3155 #endif
3156 return obj;
3160 /// traverse and replace function
3162 // 2: stop search
3163 // 1: skip child
3164 // 0: continue
3166 struct obj_traverse_replace_data;
3167 static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data);
3168 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data);
3169 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data);
3171 struct obj_traverse_replace_data {
3172 rb_obj_traverse_replace_enter_func enter_func;
3173 rb_obj_traverse_replace_leave_func leave_func;
3175 st_table *rec;
3176 VALUE rec_hash;
3178 VALUE replacement;
3179 bool move;
3182 struct obj_traverse_replace_callback_data {
3183 bool stop;
3184 VALUE src;
3185 struct obj_traverse_replace_data *data;
3188 static int
3189 obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error)
3191 return ST_REPLACE;
3194 static int
3195 obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists)
3197 struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
3198 struct obj_traverse_replace_data *data = d->data;
3200 if (obj_traverse_replace_i(*key, data)) {
3201 d->stop = true;
3202 return ST_STOP;
3204 else if (*key != data->replacement) {
3205 VALUE v = *key = data->replacement;
3206 RB_OBJ_WRITTEN(d->src, Qundef, v);
3209 if (obj_traverse_replace_i(*val, data)) {
3210 d->stop = true;
3211 return ST_STOP;
3213 else if (*val != data->replacement) {
3214 VALUE v = *val = data->replacement;
3215 RB_OBJ_WRITTEN(d->src, Qundef, v);
3218 return ST_CONTINUE;
3221 static int
3222 obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x)
3224 return ST_REPLACE;
3227 static int
3228 obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists)
3230 struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
3231 struct obj_traverse_replace_data *data = d->data;
3233 if (obj_traverse_replace_i(*(VALUE *)val, data)) {
3234 d->stop = true;
3235 return ST_STOP;
3237 else if (*(VALUE *)val != data->replacement) {
3238 VALUE v = *(VALUE *)val = data->replacement;
3239 RB_OBJ_WRITTEN(d->src, Qundef, v);
3242 return ST_CONTINUE;
3245 static struct st_table *
3246 obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
3248 if (UNLIKELY(!data->rec)) {
3249 data->rec_hash = rb_ident_hash_new();
3250 data->rec = RHASH_ST_TABLE(data->rec_hash);
3252 return data->rec;
3255 static void
3256 obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
3258 int *pcnt = (int *)ptr;
3260 if (!rb_ractor_shareable_p(obj)) {
3261 ++*pcnt;
3265 static int
3266 obj_refer_only_shareables_p(VALUE obj)
3268 int cnt = 0;
3269 RB_VM_LOCK_ENTER_NO_BARRIER();
3271 rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
3273 RB_VM_LOCK_LEAVE_NO_BARRIER();
3274 return cnt == 0;
3277 static int
3278 obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
3280 st_data_t replacement;
3282 if (RB_SPECIAL_CONST_P(obj)) {
3283 data->replacement = obj;
3284 return 0;
3287 switch (data->enter_func(obj, data)) {
3288 case traverse_cont: break;
3289 case traverse_skip: return 0; // skip children
3290 case traverse_stop: return 1; // stop search
3293 replacement = (st_data_t)data->replacement;
3295 if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) {
3296 data->replacement = (VALUE)replacement;
3297 return 0;
3299 else {
3300 st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
3303 if (!data->move) {
3304 obj = replacement;
3307 #define CHECK_AND_REPLACE(v) do { \
3308 VALUE _val = (v); \
3309 if (obj_traverse_replace_i(_val, data)) { return 1; } \
3310 else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
3311 } while (0)
3313 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3314 struct gen_ivtbl *ivtbl;
3315 rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
3317 if (UNLIKELY(rb_shape_obj_too_complex(obj))) {
3318 struct obj_traverse_replace_callback_data d = {
3319 .stop = false,
3320 .data = data,
3321 .src = obj,
3323 rb_st_foreach_with_replace(
3324 ivtbl->as.complex.table,
3325 obj_iv_hash_traverse_replace_foreach_i,
3326 obj_iv_hash_traverse_replace_i,
3327 (st_data_t)&d
3329 if (d.stop) return 1;
3331 else {
3332 for (uint32_t i = 0; i < ivtbl->as.shape.numiv; i++) {
3333 if (!UNDEF_P(ivtbl->as.shape.ivptr[i])) {
3334 CHECK_AND_REPLACE(ivtbl->as.shape.ivptr[i]);
3340 switch (BUILTIN_TYPE(obj)) {
3341 // no child node
3342 case T_FLOAT:
3343 case T_BIGNUM:
3344 case T_REGEXP:
3345 case T_FILE:
3346 case T_SYMBOL:
3347 case T_MATCH:
3348 break;
3349 case T_STRING:
3350 rb_str_make_independent(obj);
3351 break;
3353 case T_OBJECT:
3355 if (rb_shape_obj_too_complex(obj)) {
3356 struct obj_traverse_replace_callback_data d = {
3357 .stop = false,
3358 .data = data,
3359 .src = obj,
3361 rb_st_foreach_with_replace(
3362 ROBJECT_IV_HASH(obj),
3363 obj_iv_hash_traverse_replace_foreach_i,
3364 obj_iv_hash_traverse_replace_i,
3365 (st_data_t)&d
3367 if (d.stop) return 1;
3369 else {
3370 uint32_t len = ROBJECT_IV_COUNT(obj);
3371 VALUE *ptr = ROBJECT_IVPTR(obj);
3373 for (uint32_t i = 0; i < len; i++) {
3374 CHECK_AND_REPLACE(ptr[i]);
3378 break;
3380 case T_ARRAY:
3382 rb_ary_cancel_sharing(obj);
3384 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
3385 VALUE e = rb_ary_entry(obj, i);
3387 if (obj_traverse_replace_i(e, data)) {
3388 return 1;
3390 else if (e != data->replacement) {
3391 RARRAY_ASET(obj, i, data->replacement);
3394 RB_GC_GUARD(obj);
3396 break;
3397 case T_HASH:
3399 struct obj_traverse_replace_callback_data d = {
3400 .stop = false,
3401 .data = data,
3402 .src = obj,
3404 rb_hash_stlike_foreach_with_replace(obj,
3405 obj_hash_traverse_replace_foreach_i,
3406 obj_hash_traverse_replace_i,
3407 (VALUE)&d);
3408 if (d.stop) return 1;
3409 // TODO: rehash here?
3411 VALUE ifnone = RHASH_IFNONE(obj);
3412 if (obj_traverse_replace_i(ifnone, data)) {
3413 return 1;
3415 else if (ifnone != data->replacement) {
3416 RHASH_SET_IFNONE(obj, data->replacement);
3419 break;
3421 case T_STRUCT:
3423 long len = RSTRUCT_LEN(obj);
3424 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
3426 for (long i=0; i<len; i++) {
3427 CHECK_AND_REPLACE(ptr[i]);
3430 break;
3432 case T_RATIONAL:
3433 CHECK_AND_REPLACE(RRATIONAL(obj)->num);
3434 CHECK_AND_REPLACE(RRATIONAL(obj)->den);
3435 break;
3436 case T_COMPLEX:
3437 CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
3438 CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
3439 break;
3441 case T_DATA:
3442 if (!data->move && obj_refer_only_shareables_p(obj)) {
3443 break;
3445 else {
3446 rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.",
3447 data->move ? "move" : "copy", rb_class_of(obj));
3450 case T_IMEMO:
3451 // not supported yet
3452 return 1;
3454 // unreachable
3455 case T_CLASS:
3456 case T_MODULE:
3457 case T_ICLASS:
3458 default:
3459 rp(obj);
3460 rb_bug("unreachable");
3463 data->replacement = (VALUE)replacement;
3465 if (data->leave_func(obj, data) == traverse_stop) {
3466 return 1;
3468 else {
3469 return 0;
3473 // 0: traverse all
3474 // 1: stopped
3475 static VALUE
3476 rb_obj_traverse_replace(VALUE obj,
3477 rb_obj_traverse_replace_enter_func enter_func,
3478 rb_obj_traverse_replace_leave_func leave_func,
3479 bool move)
3481 struct obj_traverse_replace_data data = {
3482 .enter_func = enter_func,
3483 .leave_func = leave_func,
3484 .rec = NULL,
3485 .replacement = Qundef,
3486 .move = move,
3489 if (obj_traverse_replace_i(obj, &data)) {
3490 return Qundef;
3492 else {
3493 return data.replacement;
3497 struct RVALUE {
3498 VALUE flags;
3499 VALUE klass;
3500 VALUE v1;
3501 VALUE v2;
3502 VALUE v3;
3505 static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 |
3506 FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 |
3507 FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 |
3508 FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 |
3509 FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19;
3511 static void
3512 ractor_moved_bang(VALUE obj)
3514 // invalidate src object
3515 struct RVALUE *rv = (void *)obj;
3517 rv->klass = rb_cRactorMovedObject;
3518 rv->v1 = 0;
3519 rv->v2 = 0;
3520 rv->v3 = 0;
3521 rv->flags = rv->flags & ~fl_users;
3523 if (BUILTIN_TYPE(obj) == T_OBJECT) ROBJECT_SET_SHAPE_ID(obj, ROOT_SHAPE_ID);
3525 // TODO: record moved location
3528 static enum obj_traverse_iterator_result
3529 move_enter(VALUE obj, struct obj_traverse_replace_data *data)
3531 if (rb_ractor_shareable_p(obj)) {
3532 data->replacement = obj;
3533 return traverse_skip;
3535 else {
3536 VALUE moved = rb_obj_alloc(RBASIC_CLASS(obj));
3537 rb_shape_set_shape(moved, rb_shape_get_shape(obj));
3538 data->replacement = moved;
3539 return traverse_cont;
3543 void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
3545 static enum obj_traverse_iterator_result
3546 move_leave(VALUE obj, struct obj_traverse_replace_data *data)
3548 VALUE v = data->replacement;
3549 struct RVALUE *dst = (struct RVALUE *)v;
3550 struct RVALUE *src = (struct RVALUE *)obj;
3552 dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users);
3554 dst->v1 = src->v1;
3555 dst->v2 = src->v2;
3556 dst->v3 = src->v3;
3558 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3559 rb_replace_generic_ivar(v, obj);
3562 // TODO: generic_ivar
3564 ractor_moved_bang(obj);
3565 return traverse_cont;
3568 static VALUE
3569 ractor_move(VALUE obj)
3571 VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
3572 if (!UNDEF_P(val)) {
3573 return val;
3575 else {
3576 rb_raise(rb_eRactorError, "can not move the object");
3580 static enum obj_traverse_iterator_result
3581 copy_enter(VALUE obj, struct obj_traverse_replace_data *data)
3583 if (rb_ractor_shareable_p(obj)) {
3584 data->replacement = obj;
3585 return traverse_skip;
3587 else {
3588 data->replacement = rb_obj_clone(obj);
3589 return traverse_cont;
3593 static enum obj_traverse_iterator_result
3594 copy_leave(VALUE obj, struct obj_traverse_replace_data *data)
3596 return traverse_cont;
3599 static VALUE
3600 ractor_copy(VALUE obj)
3602 VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
3603 if (!UNDEF_P(val)) {
3604 return val;
3606 else {
3607 rb_raise(rb_eRactorError, "can not copy the object");
3611 // Ractor local storage
3613 struct rb_ractor_local_key_struct {
3614 const struct rb_ractor_local_storage_type *type;
3615 void *main_cache;
3618 static struct freed_ractor_local_keys_struct {
3619 int cnt;
3620 int capa;
3621 rb_ractor_local_key_t *keys;
3622 } freed_ractor_local_keys;
3624 static int
3625 ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
3627 struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
3628 if (k->type->mark) (*k->type->mark)((void *)val);
3629 return ST_CONTINUE;
3632 static enum rb_id_table_iterator_result
3633 idkey_local_storage_mark_i(ID id, VALUE val, void *dmy)
3635 rb_gc_mark(val);
3636 return ID_TABLE_CONTINUE;
3639 static void
3640 ractor_local_storage_mark(rb_ractor_t *r)
3642 if (r->local_storage) {
3643 st_foreach(r->local_storage, ractor_local_storage_mark_i, 0);
3645 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3646 rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
3647 st_data_t val, k = (st_data_t)key;
3648 if (st_delete(r->local_storage, &k, &val) &&
3649 (key = (rb_ractor_local_key_t)k)->type->free) {
3650 (*key->type->free)((void *)val);
3655 if (r->idkey_local_storage) {
3656 rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
3660 static int
3661 ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy)
3663 struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
3664 if (k->type->free) (*k->type->free)((void *)val);
3665 return ST_CONTINUE;
3668 static void
3669 ractor_local_storage_free(rb_ractor_t *r)
3671 if (r->local_storage) {
3672 st_foreach(r->local_storage, ractor_local_storage_free_i, 0);
3673 st_free_table(r->local_storage);
3676 if (r->idkey_local_storage) {
3677 rb_id_table_free(r->idkey_local_storage);
3681 static void
3682 rb_ractor_local_storage_value_mark(void *ptr)
3684 rb_gc_mark((VALUE)ptr);
3687 static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = {
3688 NULL,
3689 NULL,
3692 const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free = {
3693 NULL,
3694 ruby_xfree,
3697 static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = {
3698 rb_ractor_local_storage_value_mark,
3699 NULL,
3702 rb_ractor_local_key_t
3703 rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type)
3705 rb_ractor_local_key_t key = ALLOC(struct rb_ractor_local_key_struct);
3706 key->type = type ? type : &ractor_local_storage_type_null;
3707 key->main_cache = (void *)Qundef;
3708 return key;
3711 rb_ractor_local_key_t
3712 rb_ractor_local_storage_value_newkey(void)
3714 return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value);
3717 void
3718 rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
3720 RB_VM_LOCK_ENTER();
3722 if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
3723 freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
3724 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
3726 freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
3728 RB_VM_LOCK_LEAVE();
3731 static bool
3732 ractor_local_ref(rb_ractor_local_key_t key, void **pret)
3734 if (rb_ractor_main_p()) {
3735 if (!UNDEF_P((VALUE)key->main_cache)) {
3736 *pret = key->main_cache;
3737 return true;
3739 else {
3740 return false;
3743 else {
3744 rb_ractor_t *cr = GET_RACTOR();
3746 if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) {
3747 return true;
3749 else {
3750 return false;
3755 static void
3756 ractor_local_set(rb_ractor_local_key_t key, void *ptr)
3758 rb_ractor_t *cr = GET_RACTOR();
3760 if (cr->local_storage == NULL) {
3761 cr->local_storage = st_init_numtable();
3764 st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr);
3766 if (rb_ractor_main_p()) {
3767 key->main_cache = ptr;
3771 VALUE
3772 rb_ractor_local_storage_value(rb_ractor_local_key_t key)
3774 void *val;
3775 if (ractor_local_ref(key, &val)) {
3776 return (VALUE)val;
3778 else {
3779 return Qnil;
3783 bool
3784 rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val)
3786 if (ractor_local_ref(key, (void **)val)) {
3787 return true;
3789 else {
3790 return false;
3794 void
3795 rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val)
3797 ractor_local_set(key, (void *)val);
3800 void *
3801 rb_ractor_local_storage_ptr(rb_ractor_local_key_t key)
3803 void *ret;
3804 if (ractor_local_ref(key, &ret)) {
3805 return ret;
3807 else {
3808 return NULL;
3812 void
3813 rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr)
3815 ractor_local_set(key, ptr);
3818 #define DEFAULT_KEYS_CAPA 0x10
3820 void
3821 rb_ractor_finish_marking(void)
3823 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3824 ruby_xfree(freed_ractor_local_keys.keys[i]);
3826 freed_ractor_local_keys.cnt = 0;
3827 if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) {
3828 freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA;
3829 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA);
3833 static VALUE
3834 ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym)
3836 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3837 ID id = rb_check_id(&sym);
3838 struct rb_id_table *tbl = cr->idkey_local_storage;
3839 VALUE val;
3841 if (id && tbl && rb_id_table_lookup(tbl, id, &val)) {
3842 return val;
3844 else {
3845 return Qnil;
3849 static VALUE
3850 ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val)
3852 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3853 ID id = SYM2ID(rb_to_symbol(sym));
3854 struct rb_id_table *tbl = cr->idkey_local_storage;
3856 if (tbl == NULL) {
3857 tbl = cr->idkey_local_storage = rb_id_table_create(2);
3859 rb_id_table_insert(tbl, id, val);
3860 return val;
3863 #include "ractor.rbinc"