From 3eec2323551d842099bf1c43120fe917aee66eff Mon Sep 17 00:00:00 2001 From: MenTaLguY Date: Sun, 29 Jul 2007 11:25:23 -0400 Subject: [PATCH] stub Volatile using Atomic, and import list stuff from fastthread --- ext/concurrent/primitives/primitives.c | 235 +++++++++++++++++++++++---------- lib/concurrent/primitives.rb | 6 + 2 files changed, 174 insertions(+), 67 deletions(-) diff --git a/ext/concurrent/primitives/primitives.c b/ext/concurrent/primitives/primitives.c index 91b4192..1921ba9 100644 --- a/ext/concurrent/primitives/primitives.c +++ b/ext/concurrent/primitives/primitives.c @@ -14,104 +14,210 @@ static VALUE mPrimitives; static VALUE cChannel; static VALUE cSemaphore; static VALUE cLatch; -static VALUE cVolatile; static VALUE cAtomic; -typedef struct _Queue { +typedef struct Entry_ { + VALUE value; + struct Entry_ *next; +} Entry; + +typedef struct Queue_ { + Entry *entries; + Entry *last_entry; + Entry *entry_pool; + unsigned long size; } Queue; static void queue_init(Queue *queue) { + queue->entries = NULL; + queue->last_entry = NULL; + queue->entry_pool = NULL; + queue->size = 0; } static void queue_mark(Queue *queue) { + Entry *entry; + for (entry = queue->entries; entry; entry = entry->next) { + rb_gc_mark(entry->value); + } } -static void queue_finalize(Queue *queue) { +static void free_entries(Entry *first) { + Entry *next; + while (first) { + next = first->next; + xfree(first); + first = next; + } } -static void queue_put(Queue *queue, VALUE value) { +static void queue_finalize(Queue *queue) { + free_entries(queue->entries); + free_entries(queue->entry_pool); } -static VALUE queue_get(Queue *queue) { - return Qnil; -} +static void +queue_put(Queue *queue, VALUE value) +{ + Entry *entry; -static void queue_remove(Queue *queue, VALUE value) { + if (queue->entry_pool) { + entry = queue->entry_pool; + queue->entry_pool = entry->next; + } else { + entry = ALLOC(Entry); + } + + entry->value = value; + entry->next = NULL; + + if (queue->last_entry) { + queue->last_entry->next = entry; + } else { + queue->entries = entry; + } + queue->last_entry = entry; + + ++queue->size; } -static int queue_is_empty(Queue *queue) { - return 1; +static void recycle_entries(Queue *queue, Entry *first_entry, Entry *last_entry) +{ +#ifdef USE_MEM_POOLS + last_entry->next = queue->entry_pool; + queue->entry_pool = first_entry; +#else + last_entry->next = NULL; + free_entries(first_entry); +#endif } -typedef struct _WaitQueue { - Queue waiting; -} WaitQueue; +static VALUE queue_get(Queue *queue) { + Entry *entry; + VALUE value; -typedef int (*WaitCondition)(void *data); + entry = queue->entries; + if (!entry) return Qundef; -static VALUE set_critical(VALUE critical) { - rb_thread_critical = (int)critical; - return Qnil; -} + queue->entries = entry->next; + if (entry == queue->last_entry) { + queue->last_entry = NULL; + } + + --queue->size; -static void wait_queue_init(WaitQueue *queue) { - queue_init(&queue->waiting); + value = entry->value; + recycle_entries(queue, entry, entry); + + return value; } -static void wait_queue_mark(WaitQueue *queue) { - queue_mark(&queue->waiting); +static void queue_remove(Queue *queue, VALUE value) { + Entry **ref; + Entry *prev; + Entry *entry; + + for (ref = &queue->entries, prev = NULL, entry = queue->entries; + entry != NULL; + ref = &entry->next, prev = entry, entry = entry->next) + { + if (entry->value == value) { + *ref = entry->next; + queue->size--; + if (!entry->next) { + queue->last_entry = prev; + } + recycle_entries(queue, entry, entry); + break; + } + } } -static void wait_queue_finalize(WaitQueue *queue) { - queue_finalize(&queue->waiting); +static int queue_is_empty(Queue *queue) { + return !queue->entries; } static VALUE wake_thread(VALUE thread) { return rb_rescue2(rb_thread_wakeup, thread, - NULL, Qundef, - rb_eThreadError, 0); + NULL, Qundef, rb_eThreadError, 0); } -static VALUE wait_queue_wake_one(WaitQueue *queue) { - return Qnil; +static VALUE queue_wake_one(Queue *queue) { + int saved_critical; + VALUE waking; + + waking = Qnil; + saved_critical = rb_thread_critical; + rb_thread_critical = 1; + while (queue->entries && !RTEST(waking)) { + waking = wake_thread(queue_get(queue)); + } + rb_thread_critical = saved_critical; + + return waking; } -static VALUE wait_queue_wake_all(WaitQueue *queue) { +static VALUE queue_wake_all(Queue *queue) { + int saved_critical; + saved_critical = rb_thread_critical; + rb_thread_critical = 1; + while (queue->entries) { + queue_wake_one(queue); + } + rb_thread_critical = saved_critical; return Qnil; } -static VALUE on_wakeup(WaitQueue *queue) { - queue_remove(&queue->waiting, rb_thread_current()); +static VALUE queue_wait_inner(Queue *queue) { + queue_put(queue, rb_thread_current()); + rb_thread_stop(); return Qnil; } -static VALUE go_to_sleep(VALUE unused) { - rb_thread_stop(); +static VALUE queue_wait_cleanup(Queue *queue) { + /* cleanup in case of spurious wakeups */ + queue_remove(queue, rb_thread_current()); return Qnil; } -static void wait_queue_wait(WaitQueue *queue, VALUE timeout, - WaitCondition condition, void *data) +typedef int (*WaitCondition)(void *data); + +static void queue_wait(Queue *queue, VALUE timeout, + WaitCondition condition, void *data) { + int saved_critical; + saved_critical = rb_thread_critical; + rb_thread_critical = 1; while (condition(data)) { - queue_put(&queue->waiting, rb_thread_current()); - rb_ensure(go_to_sleep, Qnil, on_wakeup, (VALUE)queue); + rb_ensure(queue_wait_inner, (VALUE)queue, queue_wait_cleanup, (VALUE)queue); + rb_thread_critical = 1; } + rb_thread_critical = saved_critical; } -typedef struct _Channel { +static void assert_no_survivors(Queue *waiting, const char *label, void *addr) { + Entry *entry; + for (entry = waiting->entries; entry; entry = entry->next) { + if (RTEST(wake_thread(entry->value))) { + rb_bug("%s %p freed with live thread(s) waiting", label, addr); + } + } +} + +typedef struct Channel_ { Queue written; - WaitQueue readers; + Queue readers; } Channel; static void channel_mark(Channel *channel) { queue_mark(&channel->written); - wait_queue_mark(&channel->readers); + queue_mark(&channel->readers); } static void channel_free(Channel *channel) { + assert_no_survivors(&channel->readers, "channel", channel); queue_finalize(&channel->written); - wait_queue_finalize(&channel->readers); + queue_finalize(&channel->readers); xfree(channel); } @@ -119,7 +225,7 @@ static VALUE s_channel_alloc(VALUE klass) { Channel *channel; channel = ALLOC(Channel); queue_init(&channel->written); - wait_queue_init(&channel->readers); + queue_init(&channel->readers); return Data_Wrap_Struct(klass, channel_mark, channel_free, channel); } @@ -127,7 +233,7 @@ static VALUE m_channel_send(VALUE self, VALUE value) { Channel *channel; Data_Get_Struct(self, Channel, channel); queue_put(&channel->written, value); - wait_queue_wake_one(&channel->readers); + queue_wake_one(&channel->readers); return self; } @@ -136,22 +242,23 @@ static VALUE m_channel_receive(int argc, VALUE *argv, VALUE self) { VALUE timeout=Qnil; rb_scan_args(argc, argv, "01", &timeout); Data_Get_Struct(self, Channel, channel); - wait_queue_wait(&channel->readers, timeout, - (WaitCondition)queue_is_empty, &channel->written); + queue_wait(&channel->readers, timeout, + (WaitCondition)queue_is_empty, &channel->written); return queue_get(&channel->written); } -typedef struct _Semaphore { +typedef struct Semaphore_ { unsigned long count; - WaitQueue waiting; + Queue waiting; } Semaphore; static void semaphore_mark(Semaphore *semaphore) { - wait_queue_mark(&semaphore->waiting); + queue_mark(&semaphore->waiting); } static void semaphore_free(Semaphore *semaphore) { - wait_queue_finalize(&semaphore->waiting); + assert_no_survivors(&semaphore->waiting, "semaphore", semaphore); + queue_finalize(&semaphore->waiting); xfree(semaphore); } @@ -159,7 +266,7 @@ static VALUE s_semaphore_alloc(VALUE klass) { Semaphore *semaphore; semaphore = ALLOC(Semaphore); semaphore->count = 0; - wait_queue_init(&semaphore->waiting); + queue_init(&semaphore->waiting); return Data_Wrap_Struct(klass, semaphore_mark, semaphore_free, semaphore); } @@ -177,7 +284,7 @@ static VALUE m_semaphore_up(VALUE self) { Semaphore *semaphore; Data_Get_Struct(self, Semaphore, semaphore); semaphore->count++; - wait_queue_wake_one(&semaphore->waiting); + queue_wake_one(&semaphore->waiting); return self; } @@ -191,30 +298,31 @@ static VALUE m_semaphore_down(int argc, VALUE *argv, VALUE self) { timeout = Qnil; rb_scan_args(argc, argv, "01", &timeout); Data_Get_Struct(self, Semaphore, semaphore); - wait_queue_wait(&semaphore->waiting, timeout, + queue_wait(&semaphore->waiting, timeout, (WaitCondition)semaphore_count_is_zero, semaphore); semaphore->count--; return self; } -typedef struct _Latch { +typedef struct Latch_ { int is_set; - WaitQueue waiting; + Queue waiting; } Latch; static void latch_mark(Latch *latch) { - wait_queue_mark(&latch->waiting); + queue_mark(&latch->waiting); } static void latch_free(Latch *latch) { - wait_queue_mark(&latch->waiting); + assert_no_survivors(&latch->waiting, "latch", latch); + queue_mark(&latch->waiting); } static VALUE s_latch_alloc(VALUE klass) { Latch *latch; latch = ALLOC(Latch); latch->is_set = 0; - wait_queue_init(&latch->waiting); + queue_init(&latch->waiting); return Data_Wrap_Struct(klass, latch_mark, latch_free, latch); } @@ -222,7 +330,7 @@ static VALUE m_latch_set(VALUE self) { Latch *latch; Data_Get_Struct(self, Latch, latch); latch->is_set = 1; - wait_queue_wake_all(&latch->waiting); + queue_wake_all(&latch->waiting); return self; } @@ -236,12 +344,11 @@ static VALUE m_latch_wait(int argc, VALUE *argv, VALUE self) { timeout = Qnil; rb_scan_args(argc, argv, "01", &timeout); Data_Get_Struct(self, Latch, latch); - wait_queue_wait(&latch->waiting, timeout, - (WaitCondition)latch_is_unset, latch); + queue_wait(&latch->waiting, timeout, (WaitCondition)latch_is_unset, latch); return self; } -typedef struct _Atomic { +typedef struct Atomic_ { VALUE value; } Atomic; @@ -305,7 +412,6 @@ void Init_primitives() { cChannel = rb_define_class_under(mPrimitives, "Channel", rb_cObject); cSemaphore = rb_define_class_under(mPrimitives, "Semaphore", rb_cObject); cLatch = rb_define_class_under(mPrimitives, "Latch", rb_cObject); - cVolatile = rb_define_class_under(mPrimitives, "Volatile", rb_cObject); cAtomic = rb_define_class_under(mPrimitives, "Atomic", rb_cObject); rb_define_alloc_func(cChannel, s_channel_alloc); @@ -321,11 +427,6 @@ void Init_primitives() { rb_define_method(cLatch, "set", m_latch_set, 0); rb_define_method(cLatch, "wait", m_latch_wait, -1); - rb_define_alloc_func(cVolatile, s_atomic_alloc); - rb_define_private_method(cVolatile, "initialize", m_atomic_initialize, -1); - rb_define_method(cVolatile, "value", m_atomic_value, 0); - rb_define_method(cVolatile, "value=", m_atomic_value_set, 1); - rb_define_alloc_func(cAtomic, s_atomic_alloc); rb_define_private_method(cAtomic, "initialize", m_atomic_initialize, -1); rb_define_method(cAtomic, "value", m_atomic_value, 0); diff --git a/lib/concurrent/primitives.rb b/lib/concurrent/primitives.rb index 4574662..2632623 100644 --- a/lib/concurrent/primitives.rb +++ b/lib/concurrent/primitives.rb @@ -36,10 +36,16 @@ end class Latch end +unless defined? self::Volatile +Volatile = Atomic.dup class Volatile + undef swap + undef set_if_equal +end end class Atomic + alias exchange swap end end -- 2.11.4.GIT