From b11b075c313907150e6e39a639f4efb239306ece Mon Sep 17 00:00:00 2001 From: MenTaLguY Date: Wed, 14 Feb 2007 00:33:40 -0500 Subject: [PATCH] start from fresh tree --- Rakefile | 100 +++++++ examples/array-blocking-queue.rb | 123 +++++++++ examples/atomic-retry-with.rb | 39 +++ ext/concurrent/futures/extconf.rb | 2 + ext/concurrent/futures/futures.c | 153 +++++++++++ java/concurrent/FuturesService.java | 373 ++++++++++++++++++++++++++ lib/concurrent/actors.rb | 342 ++++++++++++++++++++++++ lib/concurrent/futures.rb | 147 +++++++++++ lib/concurrent/joins.rb | 191 ++++++++++++++ lib/concurrent/parallel.rb | 127 +++++++++ lib/concurrent/stm.rb | 507 ++++++++++++++++++++++++++++++++++++ lib/concurrent/synchronized.rb | 266 +++++++++++++++++++ test/test_actors.rb | 11 + test/test_all.rb | 6 + test/test_futures.rb | 51 ++++ test/test_joins.rb | 8 + test/test_parallel.rb | 8 + test/test_stm.rb | 305 ++++++++++++++++++++++ test/test_synchronized.rb | 10 + 19 files changed, 2769 insertions(+) create mode 100644 Rakefile create mode 100644 examples/array-blocking-queue.rb create mode 100644 examples/atomic-retry-with.rb create mode 100644 ext/concurrent/futures/extconf.rb create mode 100644 ext/concurrent/futures/futures.c create mode 100644 java/concurrent/FuturesService.java create mode 100644 lib/concurrent/actors.rb create mode 100644 lib/concurrent/futures.rb create mode 100644 lib/concurrent/joins.rb create mode 100644 lib/concurrent/parallel.rb create mode 100644 lib/concurrent/stm.rb create mode 100644 lib/concurrent/synchronized.rb create mode 100644 test/test_actors.rb create mode 100644 test/test_all.rb create mode 100644 test/test_futures.rb create mode 100644 test/test_joins.rb create mode 100644 test/test_parallel.rb create mode 100644 test/test_stm.rb create mode 100644 test/test_synchronized.rb diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..84f8365 --- /dev/null +++ b/Rakefile @@ -0,0 +1,100 @@ +require 'rake' +require 'rake/testtask' +require 'rake/gempackagetask' + +desc "Remove build products" +task :clean do + rm Dir.glob( "lib/**/*.{jar,#{ Config::CONFIG['DLEXT'] }}" ) + rm Dir.glob( "ext/**/Makefile" ) + rm Dir.glob( "ext/**/*.{o,#{ Config::CONFIG['DLEXT'] }}" ) + rm Dir.glob( "java/**/*.{class,jar}" ) +end + +def setup_extension( dir, lib_dir, extension ) + ext = File.join( "ext", dir ) + so_name = "#{ extension }.#{ Config::CONFIG['DLEXT'] }" + ext_so = File.join( ext, so_name ) + lib_so = File.join( "lib", lib_dir, so_name ) + ext_files = FileList[ File.join( ext, "*.{c,h}" ) ] + ext_makefile = File.join( ext, "Makefile" ) + extconf_rb = File.join( ext, "extconf.rb" ) + + file ext_makefile => [ extconf_rb ] do + Dir.chdir ext do + ruby "extconf.rb" + end + end + + file ext_so => ext_files + [ ext_makefile ] do + Dir.chdir ext do + case PLATFORM + when /win32/ + sh 'nmake' + else + sh 'make' + end + end + end + + file lib_so => [ ext_so ] do + cp ext_so, lib_so + end + + task :compile => [ lib_so ] +end + +case RUBY_PLATFORM +when /java/ + file 'lib/concurrent/futures.jar' => [ 'java/concurrent/FuturesService.java' ] do + Dir.chdir( 'java' ) do + sh 'javac', '-classpath', "#{ ENV['JRUBY_HOME'] }/lib/jruby.jar", 'concurrent/FuturesService.java' + sh 'jar', 'cf', 'concurrent/futures.jar', *Dir.glob( 'concurrent/**/*.class' ) + end + cp 'java/concurrent/futures.jar', 'lib/concurrent/futures.jar' + end + task :compile => [ "lib/concurrent/futures.jar" ] +else + setup_extension( 'concurrent/futures', 'concurrent', 'futures' ) +end + +desc "Compile extensions" +task :compile + +task :test => [ :compile ] +Rake::TestTask.new do |task| + task.libs << 'lib' + task.libs << 'test' + task.test_files = [ "test/test_all.rb" ] + task.verbose = true +end + +gemspec = Gem::Specification.new do |gemspec| + gemspec.name = "concurrent" + gemspec.version = "0.0.1" + gemspec.author = "MenTaLguY " + gemspec.summary = "Omnibus concurrency library for Ruby" + gemspec.test_file = 'test/test_all.rb' + gemspec.files = FileList[ 'Rakefile', 'test/*.rb', 'ext/**/*.{c,h,rb}', + 'java/**/*.java', + "lib/**/*.{rb,jar,#{ Config::CONFIG['DLEXT'] }}" ] + gemspec.require_paths = [ 'lib' ] + + case RUBY_PLATFORM + when /java/ + gemspec.platform = 'jruby' + when /win32/ + gemspec.platform = Gem::Platform::WIN32 + else + gemspec.platform = Gem::Platform::RUBY + gemspec.extensions = FileList[ 'ext/**/extconf.rb' ] + end +end + +task :package => [ :clean, :test ] +Rake::GemPackageTask.new( gemspec ) do |task| + task.gem_spec = gemspec + task.need_tar = true +end + +task :default => [ :clean, :test ] + diff --git a/examples/array-blocking-queue.rb b/examples/array-blocking-queue.rb new file mode 100644 index 0000000..cf685b1 --- /dev/null +++ b/examples/array-blocking-queue.rb @@ -0,0 +1,123 @@ +# Ruby implementation of ArrayBlockingQueue, per +# "Lock-Free Data Structures using STMs in Haskell" +# by Discolo, et al. +# +# http://research.microsoft.com/~simonpj/papers/stm/lock-free-flops06.pdf +# + +require 'concurrent/stm' + +STM = Concurrent::STM + +class ArrayBlockingQueue < STM::Struct :head, :tail, :used + def initialize( len ) + self.head = self.tail = self.used = 0 + @len = len + @a = Array.new( len ) { STM::Ref.new } + end + + # takeSTM + def poll + do_read( true, true ) + end + + # putSTM + def put( value ) + do_put( true, value ) + self + end + + # peekSTM + def peek( default_value=nil ) + do_read( false, false, default_value ) + end + + # offerSTM + def offer( value ) + do_put( false, value ) + end + + # pollSTM + def poll( default_value=nil ) + do_read( true, false, default_value ) + end + + # pollTimeoutSTM + def poll_timeout( timeout, default_value=nil ) + begin + STM.timeout( timeout ) do + do_read( true, true ) + end + rescue STM::TimeoutError + default_value + end + end + + # clearSTM + def clear + STM.atomic do + self.head = self.tail = self.used = 0 + @a.each { |tv| tv[] = nil } + end + self + end + + # containsSTM + def contains?( value ) + to_a.each { |e| return true if e == value } + false + end + + # remainingCapacitySTM + def remaining_capacity + @len - used + end + + # sizeSTM + def size ; @len ; end + + # toArraySTM + def to_a + STM.atomic do + array = [] + t = tail + (0...used).each { |i| array << @a[( i + t ) % @len][] } + array.reverse! + end + end + +private + # putTailElementSTM (?) + def do_put( block, value ) + STM.atomic do + u = used + if u == @len + STM.retry if block + return false + end + t = ( tail + 1 ) % @len + @a[t][] = value + self.tail = t + self.used = u + 1 + true + end + end + + # readHeadElementSTM + def do_read( remove, block, default_value=nil ) + STM.atomic do + u = used + if u.zero? + STM.retry if block + return default_value + end + h = head + value = @a[h][] + if remove + self.head = ( h + 1 ) % @len + self.used = u - 1 + end + value + end + end +end diff --git a/examples/atomic-retry-with.rb b/examples/atomic-retry-with.rb new file mode 100644 index 0000000..938c015 --- /dev/null +++ b/examples/atomic-retry-with.rb @@ -0,0 +1,39 @@ +# A Ruby port of +# http://svn.openfoundry.org/pugs/examples/concurrency/stm-retry-with.pl + +require 'concurrent/stm' + +STM = Concurrent::STM + +a = STM::Ref.new 0 +still_running = STM::Ref.new 2 + +t1 = Thread.new { + puts "Thread 1 started" + STM.atomic { + STM.retry until a[] > 5 + a[] = -1000 + still_running[] -= 1 + } + puts "Thread 1 finished: a is >5 and reset to -1000" +} + +t2 = Thread.new { + puts "Thread 2 started" + STM.atomic { + if STM.retries? { STM.retry until a[] > 100 } + STM.retry until a[] < -100 + end + still_running[] -= 1 + } + puts "Thread 2 finished: a is now < -100" +} + +value = a[] +while still_running[].nonzero? + puts STM.atomic { saved = a[] ; a[] += 1 ; saved } + sleep 1 +end + +t1.join +t2.join diff --git a/ext/concurrent/futures/extconf.rb b/ext/concurrent/futures/extconf.rb new file mode 100644 index 0000000..e0dff10 --- /dev/null +++ b/ext/concurrent/futures/extconf.rb @@ -0,0 +1,2 @@ +require 'mkmf' +create_makefile('concurrent/futures') diff --git a/ext/concurrent/futures/futures.c b/ext/concurrent/futures/futures.c new file mode 100644 index 0000000..e74f0dc --- /dev/null +++ b/ext/concurrent/futures/futures.c @@ -0,0 +1,153 @@ +/* + * concurrent/futures - futures and lazy evaluation for Ruby + * + * Copyright (C) 2007 MenTaLguY + * + * This file is made available under the same terms as Ruby. + */ + +#include "ruby.h" +#include "rubysig.h" +#include "intern.h" + +static VALUE mConcurrent; +static VALUE mFutures; +static VALUE eAsyncError; +static VALUE cThunk; + +static ID value_id; +static ID inspect_id; +static ID respond_to_p_id; + +typedef struct { + VALUE source; + VALUE value; +} Thunk; + +static void thunk_copy_atomic(Thunk *to, Thunk const *from) { + int saved_critical; + saved_critical = rb_thread_critical; + rb_thread_critical = 1; + *to = *from; + rb_thread_critical = saved_critical; +} + +static VALUE thunk_value(VALUE obj, int evaluate) { + VALUE original; + + original = obj; + + while ( CLASS_OF(obj) == cThunk ) { + Thunk *thunk; + Thunk copy; + + Data_Get_Struct(obj, Thunk, thunk); + thunk_copy_atomic(©, thunk); + + if (RTEST(copy.source)) { + if (evaluate) { + copy.value = rb_funcall(copy.source, value_id, 0); + copy.source = Qnil; + thunk_copy_atomic(thunk, ©); + } + + if ( obj != original ) { + Thunk *original_thunk; + Data_Get_Struct(original, Thunk, original_thunk); + thunk_copy_atomic(original_thunk, ©); + } + + if (!evaluate) { + break; + } + } + + obj = copy.value; + } + + return obj; +} + +static VALUE thunk_eval(VALUE thunk) { + return thunk_value(thunk, 1); +} + +static VALUE thunk_ground(VALUE thunk) { + return thunk_value(thunk, 0); +} + +void thunk_mark(Thunk const *thunk) { + rb_gc_mark(thunk->source); + rb_gc_mark(thunk->value); +} + +void thunk_free(Thunk *thunk) { + free(thunk); +} + +static VALUE rb_thunk_new(VALUE klass, VALUE source) { + Thunk *thunk; + + thunk = (Thunk *)malloc(sizeof(Thunk)); + + thunk->source = source; + thunk->value = Qnil; + + return Data_Wrap_Struct(cThunk, thunk_mark, thunk_free, thunk); +} + +static VALUE wrap_exception(VALUE unused, VALUE ex) { + rb_exc_raise(rb_funcall(eAsyncError, rb_intern("new"), 1, ex)); +} + +static VALUE rb_thunk_method_missing(int argc, VALUE *argv, VALUE self) { + ID name; + + name = SYM2ID(argv[0]); + self = thunk_ground(self); + + if ( CLASS_OF(self) == cThunk ) { + if ( name == inspect_id && argc == 1 ) { + Thunk *thunk; + Data_Get_Struct(self, Thunk, thunk); + /* FIXME: thunk->source might be nil by the time we get here */ + return rb_str_plus(rb_str_plus(rb_str_new2("#source, inspect_id, 0)), rb_str_new2(">")); + } else if ( name == respond_to_p_id && argc == 2 ) { + if ( ID2SYM(inspect_id) == argv[1] || + ID2SYM(respond_to_p_id) == argv[1] ) + { + return Qtrue; + } + } + } + + self = rb_rescue2(thunk_eval, self, wrap_exception, Qnil, rb_cObject, 0); + + return rb_funcall3(self, name, argc-1, argv+1); +} + +static VALUE rb_thunk_value(VALUE self, VALUE thunk_v) { + return thunk_eval(thunk_v); +} + +void Init_futures() { + value_id = rb_intern("value"); + inspect_id = rb_intern("inspect"); + respond_to_p_id = rb_intern("respond_to?"); + + mConcurrent = rb_define_module("Concurrent"); + mFutures = rb_define_module_under(mConcurrent, "Futures"); + + eAsyncError = rb_define_class_under(mFutures, "AsyncError", rb_eRuntimeError); + + cThunk = rb_class_boot(0); /* not Qnil */ + rb_singleton_class(cThunk); + rb_undef_alloc_func(cThunk); + rb_const_set(mFutures, rb_intern("Thunk"), cThunk); + + rb_define_singleton_method(cThunk, "new", rb_thunk_new, 1); + rb_define_singleton_method(cThunk, "value", rb_thunk_value, 1); + rb_define_private_method(cThunk, "method_missing", + rb_thunk_method_missing, -1); +} + diff --git a/java/concurrent/FuturesService.java b/java/concurrent/FuturesService.java new file mode 100644 index 0000000..59246e7 --- /dev/null +++ b/java/concurrent/FuturesService.java @@ -0,0 +1,373 @@ +/***** BEGIN LICENSE BLOCK**** + * Version: CPL 1.0/GPL 2.0/LGPL 2.1 + * + * The contents of this file are subject to the Common Public + * License Version 1.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.eclipse.org/legal/cpl-v10.html + * + * Software distributed under the License is distributed on an "AS + * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or + * implied. See the License for the specific language governing + * rights and limitations under the License. + * + * Copyright (C) 2007 MenTaLguY + * + * Alternatively, the contents of this file may be used under the terms of + * either of the GNU General Public License Version 2 or later (the "GPL"), + * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), + * in which case the provisions of the GPL or the LGPL are applicable instead + * of those above. If you wish to allow use of your version of this file only + * under the terms of either the GPL or the LGPL, and not to allow others to + * use your version of this file under the terms of the CPL, indicate your + * decision by deleting the provisions above and replace them with the notice + * and other provisions required by the GPL or the LGPL. If you do not delete + * the provisions above, a recipient may use your version of this file under + * the terms of any one of the CPL, the GPL or the LGPL. + ***** END LICENSE BLOCK *****/ + +package concurrent; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.jruby.IRuby; +import org.jruby.RubyArray; +import org.jruby.RubyClass; +import org.jruby.RubyException; +import org.jruby.RubyFixnum; +import org.jruby.RubyFloat; +import org.jruby.RubyInteger; +import org.jruby.RubyModule; +import org.jruby.RubyString; +import org.jruby.ast.Node; +import org.jruby.exceptions.RaiseException; +import org.jruby.runtime.Block; +import org.jruby.runtime.CallbackFactory; +import org.jruby.runtime.callback.Callback; +import org.jruby.runtime.CallType; +import org.jruby.runtime.ObjectAllocator; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.load.BasicLibraryService; +import org.jruby.runtime.builtin.IRubyObject; + +public class FuturesService implements BasicLibraryService { + public boolean basicLoad(final IRuby runtime) throws IOException { + Thunk.setup(runtime); + return true; + } + + public static class Thunk implements IRubyObject { + private IRuby runtime; + private RubyClass klass; + private IRubyObject source; + private IRubyObject value; + + Thunk(IRuby runtime, RubyClass klass, IRubyObject source) { + this.runtime = runtime; + this.klass = klass; + this.source = source; + this.value = null; + } + + public static IRubyObject newInstance(IRubyObject recv, IRubyObject source) { + return new Thunk(recv.getRuntime(), (RubyClass)recv, source); + } + + public static void setup(final IRuby runtime) throws IOException { + RubyClass cThunk = runtime.getOrCreateModule("Concurrent").defineModuleUnder("Futures").defineClassUnder("Thunk", null, ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR); + CallbackFactory cb = runtime.callbackFactory(Thunk.class); + cThunk.getMetaClass().defineMethod("new", cb.getSingletonMethod("newInstance", IRubyObject.class)); + cThunk.defineMethod("value", cb.getSingletonMethod("value", IRubyObject.class)); + } + + public static IRubyObject thunkValue(IRubyObject obj, boolean evaluate) { + IRubyObject original=obj; + + while (obj instanceof Thunk) { + Thunk thunk=(Thunk)obj; + + synchronized (thunk) { + if ( thunk.source != null ) { + if (evaluate) { + thunk.value = thunk.source.callMethod(thunk.source.getRuntime().getCurrentContext(), "value"); + thunk.source = null; + } + + if ( obj != original ) { + Thunk original_thunk = (Thunk)original; + synchronized (original_thunk) { + original_thunk.value = thunk.value; + } + } + + if (!evaluate) { + break; + } + } + + obj = thunk.value; + } + } + + return obj; + } + + public static IRubyObject evalThunk(IRubyObject obj) { + try { + return thunkValue(obj, true); + } catch (RaiseException e) { + RubyClass cAsyncError = obj.getRuntime().getModule("Concurrent").defineModuleUnder("Futures").getClass("AsyncError"); + RubyException e2 = (RubyException)cAsyncError.callMethod(obj.getRuntime().getCurrentContext(), "new", e.getException()); + throw new RaiseException(e2); + } + } + + public static IRubyObject value(IRubyObject recv, IRubyObject obj) { + return thunkValue(obj, true); + } + + public int getNativeTypeIndex() { + return evalThunk(this).getNativeTypeIndex(); + } + + public Map safeGetInstanceVariables() { + return evalThunk(this).safeGetInstanceVariables(); + } + + public boolean safeHasInstanceVariables() { + return evalThunk(this).safeHasInstanceVariables(); + } + + public IRubyObject getInstanceVariable(String name) { + return evalThunk(this).getInstanceVariable(name); + } + + public IRubyObject setInstanceVariable(String name, IRubyObject value) { + return evalThunk(this).setInstanceVariable(name, value); + } + + public Map getInstanceVariables() { + return evalThunk(this).getInstanceVariables(); + } + + public Map getInstanceVariablesSnapshot() { + return evalThunk(this).getInstanceVariablesSnapshot(); + } + + public IRubyObject callMethod(ThreadContext context, RubyModule rubyclass, String name, IRubyObject[] args, CallType callType, Block block) { + return evalThunk(this).callMethod(context, rubyclass, name, args, callType, block); + } + + public IRubyObject callMethod(ThreadContext context, RubyModule rubyclass, byte switchValue, String name, IRubyObject[] args, CallType callType) { + return evalThunk(this).callMethod(context, rubyclass, switchValue, name, args, callType); + } + + public IRubyObject callMethod(ThreadContext context, byte switchValue, String name, IRubyObject[] args, CallType callType, Block block) { + return evalThunk(this).callMethod(context, switchValue, name, args, callType, block); + } + + public IRubyObject callMethod(ThreadContext context, String name, IRubyObject[] args, CallType callType) { + return evalThunk(this).callMethod(context, name, args, callType); + } + + public IRubyObject callMethod(ThreadContext context, String name, IRubyObject[] args, CallType callType, Block block) { + return evalThunk(this).callMethod(context, name, args, callType, block); + } + + public IRubyObject callMethod(ThreadContext context, String name) { + return evalThunk(this).callMethod(context, name); + } + + public IRubyObject callMethod(ThreadContext context, String name, Block block) { + return evalThunk(this).callMethod(context, name, block); + } + + public IRubyObject callMethod(ThreadContext context, String name, IRubyObject arg) { + return evalThunk(this).callMethod(context, name, arg); + } + + public IRubyObject callMethod(ThreadContext context, String name, IRubyObject[] args) { + return evalThunk(this).callMethod(context, name, args); + } + + public IRubyObject callMethod(ThreadContext context, String name, IRubyObject[] args, Block block) { + return evalThunk(this).callMethod(context, name, args, block); + } + + public boolean isNil() { + return evalThunk(this).isNil(); + } + + public boolean isTrue() { + return evalThunk(this).isTrue(); + } + + public boolean isTaint() { + return evalThunk(this).isTaint(); + } + + public boolean isFrozen() { + return evalThunk(this).isFrozen(); + } + + public boolean isImmediate() { + return evalThunk(this).isImmediate(); + } + + public RubyClass getMetaClass() { + return evalThunk(this).getMetaClass(); + } + + public void setMetaClass(RubyClass metaClass) { + evalThunk(this).setMetaClass(metaClass); + } + + public RubyClass getSingletonClass() { + return evalThunk(this).getSingletonClass(); + } + + public RubyClass getType() { + return evalThunk(this).getType(); + } + + public boolean isKindOf(RubyModule rubyClass) { + return evalThunk(this).isKindOf(rubyClass); + } + + public boolean respondsTo(String method) { + return evalThunk(this).respondsTo(method); + } + + public IRuby getRuntime() { + return this.runtime; + } + + public Class getJavaClass() { + return evalThunk(this).getJavaClass(); + } + + public IRubyObject eval(Node iNode) { + return evalThunk(this).eval(iNode); + } + + public IRubyObject evalWithBinding(ThreadContext context, IRubyObject evalString, IRubyObject binding, String file) { + return evalThunk(this).evalWithBinding(context, evalString, binding, file); + } + + public IRubyObject evalSimple(ThreadContext context, IRubyObject evalString, String file) { + return evalThunk(this).evalSimple(context, evalString, file); + } + + public void extendObject(RubyModule rubyModule) { + evalThunk(this).extendObject(rubyModule); + } + + public String asSymbol() { + return evalThunk(this).asSymbol(); + } + + public RubyArray convertToArray() { + return evalThunk(this).convertToArray(); + } + public RubyFloat convertToFloat() { + return evalThunk(this).convertToFloat(); + } + public RubyInteger convertToInteger() { + return evalThunk(this).convertToInteger(); + } + public RubyString convertToString() { + return evalThunk(this).convertToString(); + } + + public IRubyObject convertToType(String targetType, String convertMethod, boolean raiseOnError) { + return evalThunk(this).convertToType(targetType, convertMethod, raiseOnError); + } + + public IRubyObject convertToTypeWithCheck(String targetType, String convertMethod) { + return evalThunk(this).convertToTypeWithCheck(targetType, convertMethod); + } + + public void setTaint(boolean b) { + evalThunk(this).setTaint(b); + } + + public void checkSafeString() { + evalThunk(this).checkSafeString(); + } + + public IRubyObject convertType(Class type, String string, String string1) { + return evalThunk(this).convertType(type, string, string1); + } + + public IRubyObject dup() { + return evalThunk(this).dup(); + } + + public void initCopy(IRubyObject original) { + evalThunk(this).initCopy(original); + } + + public void setFrozen(boolean b) { + evalThunk(this).setFrozen(b); + } + + public IRubyObject inspect() { + return evalThunk(this).inspect(); + } + + public int checkArgumentCount(IRubyObject[] arguments, int minimum, int maximum) { + return evalThunk(this).checkArgumentCount(arguments, minimum, maximum); + } + + public IRubyObject rbClone() { + return evalThunk(this).rbClone(); + } + + public void callInit(IRubyObject[] args, Block block) { + evalThunk(this).callInit(args, block); + } + + public void defineSingletonMethod(String name, Callback callback) { + evalThunk(this).defineSingletonMethod(name, callback); + } + + public boolean singletonMethodsAllowed() { + return evalThunk(this).singletonMethodsAllowed(); + } + + public boolean isSingleton() { + return evalThunk(this).isSingleton(); + } + + public Iterator instanceVariableNames() { + return evalThunk(this).instanceVariableNames(); + } + + public IRubyObject[] scanArgs(IRubyObject[] args, int required, int optional) { + return evalThunk(this).scanArgs(args, required, optional); + } + + public void dataWrapStruct(Object obj) { + evalThunk(this).dataWrapStruct(obj); + } + + public Object dataGetStruct() { + return evalThunk(this).dataGetStruct(); + } + + public RubyFixnum id() { + return evalThunk(this).id(); + } + + public IRubyObject anyToString() { + return evalThunk(this).anyToString(); + } + + public IRubyObject checkStringType() { + return evalThunk(this).checkStringType(); + } + } +} + diff --git a/lib/concurrent/actors.rb b/lib/concurrent/actors.rb new file mode 100644 index 0000000..0e5c691 --- /dev/null +++ b/lib/concurrent/actors.rb @@ -0,0 +1,342 @@ +# +# concurrent/actors - actors for Ruby +# +# Copyright (C) 2007 MenTaLguY +# +# This file is made available under the same terms as Ruby. +# + +require 'set' +require 'thread' +require 'drb/drb' + +module Concurrent +module Actors + +class ActorError < ThreadError +end + +class ExitError < ActorError + attr_reader :reason + attr_reader :actor + + def initialize( reason, actor ) + super( "actor #{ actor.inspect } exited" ) + @reason = reason + @actor = actor + end +end + +class DeadActorError < ActorError + attr_reader :actor + + def initialize( actor ) + super( "actor #{ actor.inspect } died" ) + @actor = actor + end +end + +class ReceiveFilter + def initialize( pattern, &body ) + @patterns = [] + upon( pattern, body ) + end + + def upon( pattern, &body ) + @patterns.push [ pattern, body ] + self + end + + def receive( timeout = nil ) + Actor.receive( @patterns, timeout ) + end +end + +Any = Object.new +def Any.===( other ) + true +end + +class Message < Struct + def self.new( *args ) + args = [ :unused ] if args.empty? + super( *args ) + end + + def ===( other ) + if other.class == self.class + zip( other ) { |a, b| return false unless a === b } + true + else + false + end + end +end + +ExitMessage = Message.new :reason, :actor + +class Actor + include DRb::DRbUndumped + + module SingletonMethods + def spawn( linked=false, &block ) + Actor.new( linked, &block ) + end + + def spawn_linked( &block ) + spawn( true, &block ) + end + + def current + Thread.current[:concurrent_actor] || Actor.current + end + + def hook_receipt( &block ) + current.__hook_receipt__ &block + end + + def link( actor ) + actor = current + other.__link__ actor + actor.__link__ other + self + end + + def unlink( other ) + actor = current + actor.__unlink__ other + other.__unlink__ actor + self + end + + def upon( pattern, &body ) + ReceiveFilter.new( pattern, &body ) + end + + def receive( patterns, timeout=nil ) + current.__receive__( patterns, timeout ) + self + end + + def notify_exit( actor, reason ) + actor.__notify_exit__( reason, current ) + end + + def trap_exit + current.__trap_exit__ { yield } + end + end + class << self + alias private_new new + private :private_new + + def new( linked=false, &block ) + lock = Mutex.new + actor = nil + lock.synchronize do + Thread.new do + lock.synchronize do + raise ActorError, "Actor not created for thread" unless actor + Thread.current[:concurrent_actor] = actor + end + block.call + end + actor = private_new( thread, linked ? current : nil ) + end + end + alias spawn new + + def new_linked( &block ) + new( true, &block ) + end + alias spawn_linked new_linked + + def current + Thread.current[:concurrent_actor] ||= private_new( Thread.current, nil ) + end + + include SingletonMethods + end + + def initialize( thread, to_link ) + @lock = Mutex.new + @queue = MessageQueue.new + @trap_exit = 0 + @thread = thread + @links = Set.new + + if to_link + to_link.__link__ self + @links.add to_link + end + + @watchdog = Thread.new do + reason = nil + begin + thread.join + rescue Exception => reason + end + __finish__ reason + end + end + + def __finish__( reason ) + links = nil + @lock.synchronize do + links, @links = @links, Set.new + links.each do |actor| + actor.__unlink__( self ) + end + end + links.map do |actor| + Thread.new { actor.__notify_exit__( reason, self ) } + end + end + + def __post__( message ) + raise DeadActorError, self unless @thread.alive? + @queue.post( message ) + self + end + alias post __post__ + alias << post + + def __hook_receipt__( &block ) + ( class << self ; self ; end ).class_eval do + define_method( :post ) do |message| + __post__ message + Thread.new &block + self + end + alias << post + end + Thread.new &block + end + + def __link__( actor ) + @lock.synchronize do + raise DeadActorError, self unless @thread.alive? + @links.add( actor ) + end + end + + def __unlink__( actor ) + @lock.synchronize do + @links.delete( actor ) + end + end + + def __notify_exit__( reason, actor ) + @lock.synchronize do + return unless @thread.alive? + if @trap_exit.nonzero? + post ExitMessage[ reason, actor ] + elsif reason + @thread.raise ExitError.new( reason, actor ) + end + end + end + + def __receive__( patterns, timeout = nil ) + unless Thread.current == @thread + raise ActorError, "receive called from foreign thread" + end + + patterns = patterns.to_a + + begin + block = nil + message = nil + message = @queue.receive( timeout ) do |message| + patterns.find { |pattern, block| pattern === message } + end + block.call message unless message.nil? + rescue MessageQueue::TimeoutError + end + end + + def __trap_exit__ + unless Thread.current == @thread + raise ActorError, "trap_exit called from foreign thread" + end + + begin + @lock.synchronize { @trap_exit += 1 } + yield + ensure + @lock.synchronize { @trap_exit -= 1 } + end + end + + class MessageQueue + class TimeoutMessage ; end + + class TimeoutError < RuntimeError + end + + def initialize + @queue = Queue.new + @held = [] + end + + def post( message ) + @queue.push message + self + end + + def receive( timeout = nil ) + message = nil + + found = nil + @held.each_with_index do |message, index| + if yield message + found = index + break + end + end + + if found + @held.delete_at found + message + else + if timeout + timeout_message = TimeoutMessage.new + timeout_lock = Mutex.new + if timeout.nonzero? + timeout_thread = Thread.new do + sleep timeout + timeout_lock.synchronize { post timeout_message } + end + else + post timeout_message + end + end + + loop do + message = @queue.shift + case message + when timeout_message + raise TimeoutError + when TimeoutMessage + # discard + else + if yield message + if timeout_thread + timeout_lock.synchronize { timeout_thread.kill } + end + break message + else + @held.push message + end + end + end + end + end + end +end + +extend Actor::SingletonMethods + +end +end + diff --git a/lib/concurrent/futures.rb b/lib/concurrent/futures.rb new file mode 100644 index 0000000..04c2999 --- /dev/null +++ b/lib/concurrent/futures.rb @@ -0,0 +1,147 @@ +# +# concurrent/futures - futures and lazy evaluation for Ruby +# +# Copyright (C) 2007 MenTaLguY +# +# This file is made available under the same terms as Ruby. +# + +require 'thread' +require 'thwait' + +module Concurrent +module Futures +extend self + +Future = self + +class AsyncError < RuntimeError + attr_reader :reason + + def initialize( reason, desc=nil ) + super( desc ) + @reason = reason + end +end + +require 'concurrent/futures.so' # provides Thunk + +def async( &block ) + Thunk.new( Thread.new( &block ) ) +end +alias new async +alias spawn async +alias future async + +def await( future ) + Thunk.value future +end + +def await_any( *futures ) + threads = futures.map { |future| Thread.new { Futures::await future } } + begin + threads.index WaitThreads.new( *threads ).wait_next + ensure + threads.each { |thread| thread.raise RuntimeError } + end +end + +AlreadyFulfilledError = Class.new StandardError + +class Promise + def initialize + @lock = Mutex.new + @ready = ConditionVariable.new + end + + def future + @future ||= Thunk.new self + end + + def fulfill( value ) + @lock.synchronize do + if defined? @value + raise AlreadyFulfilledError, "promise already fulfilled" + end + @value = value + @ready.broadcast + end + self + end + + def fulfilled? + @lock.synchronize { defined? @value } + end + + def value + @lock.synchronize do + @ready.wait @lock until defined? @value + @value + end + end + + def fail( ex ) + @lock.synchronize do + @value = Thunk.new( Thread.new { raise ex } ) + end + self + end +end + +class Lazy + def initialize( &block ) + @lock = Mutex.new + end + + def value + @lock.synchronize do + if @block + Support::TerminateWaitLock.synchronize do + @value = Thunk.new Thread.new( &block ) + @block = nil + end + end + @value + end + end + + def inspect + @lock.synchronize do + if @block + "#" + else + "#" + end + end + end +end + +def lazy( &block ) + Thunk.new( Lazy.new( &block ) ) +end + +class Ref + def initialize( value=nil ) + @lock = Mutex.new + @value = value + end + + def exchange( value ) + @lock.synchronize do + result = @value + @value = value + result + end + end + + def modify( &block ) + @lock.synchronize do + value = @value + @value = Future.async { block.call value } + end + end +end + +end +end + diff --git a/lib/concurrent/joins.rb b/lib/concurrent/joins.rb new file mode 100644 index 0000000..552b7a8 --- /dev/null +++ b/lib/concurrent/joins.rb @@ -0,0 +1,191 @@ +# +# concurrent/joins - joins for Ruby +# +# Copyright (C) 2007 MenTaLguY +# +# This file is made available under the same terms as Ruby. +# + +require 'thread' + +module Concurrent +module Joins + +class Join + class Impl + ChannelDef = Struct.new :sync? + ChordDef = Struct.new :channels, :body + Chord = Struct.new :mask, :indices, :body + Callback = Struct.new :condition, :value, :args, :body + + def initialize + @lock = Mutex.new + @channels = [] + @chords = [] + @available = 0 + end + + def inner_send( index, value ) + @channel[index].push value + @available |= ( 1 << index ) + chord = @chords[i].find do |chord| + mask = chord.mask + mask == ( @available & mask ) + end + if chord + chord.body.call( *chord.indices.map do |i| + channel = channel[i] + arg = channel.shift + @available ^= ( 1 << i ) if channel.empty? + arg + end ) + end + self + end + + def async_send( index, value ) + @lock.synchronize do + inner_send( index, value ) + end + self + end + + def sync_send( index, value ) + @lock.synchronize do + callback = Callback.new( ConditionVariable.new, value ) + inner_send( index, callback ) + callback.condition.wait @lock until callback.args + callback.body.call( *callback.args ) + end + end + + def inner_chord( *indices, &body ) + mask = indices.inject( 0 ) { |a, i| a |= ( 1 << i ) } + chord = Chord.new( mask, indices, body ) + indices.each do |index| + @channels[index] ||= Queue.new + ( @chords[index] ||= [] ).push chord + end + self + end + + def async_chord( *indices, &body ) + @lock.synchronize do + inner_chord( *indices ) do |*args| + Thread.new { body.call( *args ) } + end + end + self + end + + def sync_chord( *indices, &body ) + @lock.synchronize do + inner_chord( *indices ) do |*args| + callback = args[0] + args[0] = callback.value + callback.args = args + callback.body = body + callback.condition.signal + end + end + self + end + end + + class << self + def channel( name ) + ancestors.each do |ancestor| + ancestor.class_eval do + if @join_channels and @join_channels.has_key? name + return @join_channels[name] + end + end + break if Join == ancestor + end + nil + end + + def chords + chords = [] + ancestors.each do |ancestor| + ancestor.class_eval do + chords.push *@join_chords if @join_chords + end + break if Join == ancestor + end + chords + end + + def add_channel( name, is_sync ) + name = name.to_sym + channel = Impl::ChannelDef.new( is_sync ) + ( @join_channels ||= {} )[name] = channel + id = channel.object_id + if is_sync + define_method( name ) do |value| + @join_impl.sync_send @join_indices[id], value + end + else + define_method( name ) do |value| + @join_impl.async_send @join_indices[id], value + self + end + end + self + end + private :add_channel + + def sync( name ) + add_channel( name, true ) + end + private :sync + + def async( name ) + add_channel( name, false ) + end + private :async + + def chord( first_name, *names, &body ) + first_name = first_name.to_sym + names = names.map { |n| n.to_sym } + first_channel = channel( first_name ) + channels = names.map { |n| channel( n ) } + + if channels.find { |c| c.sync? } + raise "Only the first channel may be synchronous" + end + + channels.unshift first_channel + + ( @join_chords ||= [] ).unshift( ChordDef.new( channels, body ) ) + end + private :chord + end + + def initialize + @join_impl = Join::Impl.new + @join_indices = {} + next_index = -1 + self.class.chords.each do |chord| + indices = chord.channels.map do |channel| + @join_indices[channel.object_id] ||= ( next_index += 1 ) + end + body = ( class << self ; self ; end ).class_eval do + define_method( :join_chord_method, &chord.body ) + m = instance_method( :join_chord_method ) + remove_method( :join_chord_method ) + m.bind( self ) + end + if chord.channels.first.sync? + @join_impl.sync_chord( *indices, &body ) + else + @join_impl.async_chord( *indices, &body ) + end + end + end + +end + +end +end + diff --git a/lib/concurrent/parallel.rb b/lib/concurrent/parallel.rb new file mode 100644 index 0000000..2f9b8fd --- /dev/null +++ b/lib/concurrent/parallel.rb @@ -0,0 +1,127 @@ +# +# concurrent/parallel - data-parallel programming for Ruby +# +# Copyright (C) 2007 MenTaLguY +# +# This file is made available under the same terms as Ruby. +# + +require 'thwait' + +class ThreadsWait + include Enumerable + + def each + until empty? + yield wait_next + end + end +end + +module Enumerable + def parallel_each( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.each &block } + end.each do |thread| + thread.join + end + self + end + + def parallel_map( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.map &block } + end.inject( [] ) do |a, thread| + a.push *thread.value + end + end + + def parallel_select( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.select &block } + end.inject( [] ) do |a, results| + a.push *thread.value + end + end + + def parallel_reject( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.reject &block } + end.inject( [] ) do |a, thread| + a.push *thread.value + end + end + + def parallel_max( n ) + parallel_subset( n ).map do |slice| + Thread.new { slice.max } + end.map { |t| t.value }.max + end + + def parallel_min( n ) + parallel_subset( n ).map do |slice| + Thread.new { slice.min } + end.map { |t| t.value }.min + end + + def parallel_partition( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.partition &block } + end.inject( [ [], [] ] ) do |acc, thread| + pair = thread.value + acc[0].push *pair[0] + acc[1].push *pair[1] + acc + end + end + + def parallel_grep( re, n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { slice.grep( re, &block ) } + end.inject( [] ) do |acc, thread| + acc.push *thread.value + end + end + + def parallel_conjoin( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { block.call slice } + end.inject( true ) do |acc, thread| + acc && thread.value + end + end + + def parallel_disjoin( n, &block ) + parallel_subset( n ).map do |slice| + Thread.new { block.call slice } + end.inject( false ) do |acc, thread| + acc || thread.value + end + end + + def parallel_all?( n, &block ) + parallel_conjoin( n ) { |slice| slice.all? &block } + end + + def parallel_any?( n, &block ) + parallel_disjoin( n ) { |slice| slice.any? &block } + end + + def parallel_include?( n, obj ) + parallel_disjoin( n ) { |slice| slice.include? obj } + end + + def parallel_subset( n ) + to_a.parallel_subset( n ) + end +end + +class Array + def parallel_subset( n ) + slice_size = [ size / n, 20 ].max + (0...(( size.to_f / slice_size ).ceil)).map do |i| + self[i*slice_size, slice_size] + end + end +end + diff --git a/lib/concurrent/stm.rb b/lib/concurrent/stm.rb new file mode 100644 index 0000000..c91d80a --- /dev/null +++ b/lib/concurrent/stm.rb @@ -0,0 +1,507 @@ +require 'thread' + +module Concurrent + +module STM + +module Methods +private + +def atomic + Transaction.with_current_or_new { yield } +end + +def atomic_timeout( timeout ) + timed_out = Ref.new + Thread.new { sleep timeout ; timed_out.value = true } + Transaction.with_new( "timeout" ) do + result = nil + if retries? { result = yield } + STM.retry unless timed_out.value + raise TimeoutError, "transaction timed out" + end + result + end +end + +def atomic_retry + throw :atomic_retry, true +end + +def atomic_retries? + Transaction.with_current_or_new do |transaction| + catch :atomic_retry do + transaction.perform_nested { yield } + false + end + end +end + +end + +class << self + include Methods + public :atomic, :atomic_retries?, :atomic_retry, :atomic_timeout + alias transaction atomic + alias retry atomic_retry + alias retries? atomic_retries? + alias timeout atomic_timeout +end + +class TransactionError < StandardError +end + +class RetryNoReadsError < TransactionError +end + +class TimeoutError < TransactionError +end + +class InvalidNestingError < TransactionError +end + +class Transaction + + LOCK = Mutex.new + + class Descriptor < ::Struct.new :owner, :object, :waiting, :checkpoint + def initialize( owner, object ) + super( owner, object, [], nil ) + end + + def state + if checkpoint + checkpoint.state + else + object.__atomic_state__ + end + end + end + + Checkpoint = ::Struct.new :ancestor, :depth, :state + + class CommitDescriptor < ::Struct.new :object, :waiting + def initialize( object ) + super( object, [] ) + end + end + + @commits = {} + + def self.get_commit_descriptor( object ) + @commits[object.__id__] + end + + def self.get_or_create_commit_descriptor( object ) + @commits[object.__id__] ||= CommitDescriptor.new( object ) + end + + def self.cleanup_commit_descriptor( descriptor ) + @commits.delete descriptor.object.__id__ if descriptor.waiting.empty? + self + end + + def self.with_new( label ) + thread = Thread.current + if thread[:atomic_transaction] + raise InvalidNestingError, "#{ label } cannot be nested" + else + transaction = new + thread[:atomic_transaction] = transaction + begin + transaction.perform { yield transaction } + ensure + thread[:atomic_transaction] = nil + end + end + end + + def self.with_current_or_new + thread = Thread.current + current = thread[:atomic_transaction] + if current + yield current + else + transaction = new + thread[:atomic_transaction] = transaction + begin + transaction.perform { yield transaction } + ensure + thread[:atomic_transaction] = nil + end + end + end + + def perform + begin + result = nil + retried = false + deadlocked = catch :atomic_deadlock do + retried = catch :atomic_retry do + result = yield + false + end + false + end + if deadlocked + Thread.pass + retry + elsif retried + rollback( true ) # wait for writes + retry + else + commit + end + result + ensure + rollback( false ) # noop if already committed or rolled back + end + end + + def perform_nested + begin + @depth += 1 + result = yield + ok = true + false + ensure + finish_nested( ok ) + @depth -= 1 + end + result + end + + attr_reader :awaiting_lock + + def initialize + @depth = 0 + @descriptors = {} + @reads = {} + @wakeup = ConditionVariable.new + end + + def commit + LOCK.synchronize do + @descriptors.each_value do |descriptor| + checkpoint = descriptor.checkpoint + if checkpoint + object = descriptor.object + object.__atomic_state__ = checkpoint.state + release descriptor + commit = Transaction.get_commit_descriptor object + commit.waiting.each { |transaction| transaction.wake } if commit + else + release descriptor + end + end + end + @descriptors.clear + @reads.clear + end + + def rollback( wait ) + LOCK.synchronize do + if wait + rollback_inner do |reads| + raise RetryNoReadsError, "retrying transaction without reads would block forever" if reads.empty? + objects = reads.map { |descriptor| descriptor.object } + commits = objects.map { |object| + Transaction.get_or_create_commit_descriptor object + } + wait_for *commits + commits.each do |commit| + Transaction.cleanup_commit_descriptor commit + end + end + else + rollback_inner + end + end + end + + def rollback_inner + @descriptors.each_value { |descriptor| release descriptor } + @descriptors.clear + if block_given? + reads = @reads.values + @reads.clear + yield reads + else + @reads.clear + end + end + + def finish_nested( keep ) + @descriptors.each_value do |descriptor| + checkpoint = descriptor.checkpoint + if checkpoint && checkpoint.depth == @depth + ancestor = checkpoint.ancestor + if keep + if ancestor && ( @depth - ancestor.depth ) == 1 + # replace ancestor + checkpoint.ancestor = ancestor.ancestor + else + # move checkpoint up a level + checkpoint.depth -= 1 + end + else + # discard checkpoint + descriptor.checkpoint = ancestor + end + end + end + end + + def read( object ) + descriptor = acquire object + @reads[object.__id__] = descriptor + descriptor.state + end + + def update( object ) + descriptor = acquire object + checkpoint = descriptor.checkpoint + if !checkpoint || checkpoint.depth < @depth + state = descriptor.state.dup + descriptor.checkpoint = Checkpoint.new checkpoint, @depth, state + state + else + checkpoint.state + end + end + + def write( object, state ) + descriptor = acquire object + checkpoint = descriptor.checkpoint + if !checkpoint || checkpoint.depth < @depth + descriptor.checkpoint = Checkpoint.new checkpoint, @depth, state + state + else + checkpoint.state = state + end + end + + def wake + @wakeup.signal + self + end + +private + def acquire( object ) + id = object.__id__ + @descriptors[id] ||= + LOCK.synchronize do + descriptor = object.__atomic_descriptor__ + while descriptor && descriptor.owner != self + if deadlocked_with? descriptor.owner + rollback_inner # permit other transaction to proceed + throw :atomic_deadlock, true + end + begin + @awaiting_lock = descriptor + wait_for descriptor + ensure + @awaiting_lock = nil + end + descriptor = object.__atomic_descriptor__ + end + unless descriptor + descriptor = @reads[id] || Descriptor.new( self, object ) + object.__atomic_descriptor__ = descriptor + end + descriptor + end + end + + def deadlocked_with?( transaction ) + # deadlock if self is at the root of transaction's dependency tree + while transaction.awaiting_lock + transaction = transaction.awaiting_lock.owner + end + transaction == self + end + + def wait_for( *descriptors ) + begin + descriptors.each { |descriptor| descriptor.waiting.push self } + @wakeup.wait LOCK + ensure + descriptors.each { |descriptor| descriptor.waiting.delete self } + end + end + + def release( descriptor ) + descriptor.checkpoint = nil + if descriptor.waiting.empty? + descriptor.object.__atomic_descriptor__ = nil + else + # give lock to next transaction in line rather than + # making all of the waiting transactions fight for it + transaction = descriptor.waiting.first + descriptor.owner = transaction + transaction.wake + end + end +end + +class Proc + def initialize( &block ) + @block = block + end + + def arity ; @block.arity ; end + + def binding ; @block.binding ; end + + def call( *args, &block ) + Transaction.with_current_or_new { @block.call( *args, &block ) } + end + alias [] call + + def==( other ) + @block == other.block + end + + def to_proc + @proc ||= method( :call ).to_proc + end + + attr_reader :block + protected :block +end + +class Base + attr_accessor :__atomic_state__ + attr_accessor :__atomic_descriptor__ + + def initialize( state ) + @__atomic_state__ = state + end + + class << self + def proxy_read( *names ) + options = {} + options.replace names.pop if names.last.is_a? Hash + _proxy( :read, names, options ) + end + + def proxy_write( *names ) + options = {} + options.replace names.pop if names.last.is_a? Hash + _proxy( :update, names, options ) + end + + private + def _proxy( type, names, options ) + names.each do |name| + name = name.to_sym + class_eval %Q{ def #{ name }( *args, &block ) + Transaction.with_current_or_new do |transaction| + transaction.#{ type }( self ).send( #{ name.inspect }, *args, &block ) + end + end } + end + end + end +end + +class Ref < Base + def initialize( value=nil ) + super value + end + + def value + Transaction.with_current_or_new do |transaction| + transaction.read( self ) + end + end + def value=( object ) + Transaction.with_current_or_new do |transaction| + transaction.write( self, object ) + end + end + + def exchange( new_value ) + Transaction.with_current_or_new do |transaction| + old_value = transaction.read( self ) + transaction.write( self, new_value ) + old_value + end + end + + def update + Transaction.with_current_or_new do |transaction| + old_value = transaction.read( self ) + transaction.write( self, yield( old_value ) ) + old_value + end + end + + alias [] value + alias []= value= +end + +class Struct < Base + include Enumerable + + class << self + alias __new__ new + + def new( *fields ) + subclass = Class.new self + state_class = ::Struct.new( *fields ) + + subclass.class_eval do + class << self + alias new __new__ + alias [] new + end + + fields.each do |field| + field = field.to_sym + proxy_read field + proxy_write "#{ field }=" + end + + define_method :initialize do |*values| + super state_class.new( *values ) + end + end + + fields = nil + + subclass + end + end + + def each + Transaction.with_current_or_new do |transaction| + transaction.read( self ).each { |*obj| yield *obj } + end + end + + proxy_read :[] + proxy_write :[]= + + def exchange( field, value ) + Transaction.with_current_or_new do |transaction| + s = transaction.update( self )[field] + old_value = s[field] + s[field] = value + old_value + end + end + + def update( field=nil ) + Transaction.with_current_or_new do |transaction| + s = transaction.update( self )[field] + old_value = s[field] + s[field] = yield( s[field] ) + old_value + end + end +end + +end + +end diff --git a/lib/concurrent/synchronized.rb b/lib/concurrent/synchronized.rb new file mode 100644 index 0000000..c12ef1c --- /dev/null +++ b/lib/concurrent/synchronized.rb @@ -0,0 +1,266 @@ +# +# concurrent/synchronized - assorted synchronization primitives for Ruby +# +# Copyright (C) 2007 MenTaLguY +# +# This file is made available under the same terms as Ruby. +# + +require 'thread' + +module Concurrent +module Synchronized + +class Lock + def initialize + @lock = Mutex.new + @available = ConditionVariable.new + @owner = nil + @count = 0 + end + + def lock( timeout = nil ) + @lock.synchronize do + if @owner != Thread.current + if timeout + raise NotImplementedError, "nonzero timeouts not implemented" \ + unless timeout == 0 + return nil + else + @available.wait @lock while @owner + end + end + @count += 1 + end + self + end + + def unlock + @lock.synchronize do + if @owner + @count -= 1 + @owner = nil if @count.zero? + end + end + self + end + + def synchronize + begin + lock + yield + ensure + unlock + end + end + + def __signal__( condvar ) + @lock.synchronize do + raise ThreadError, "not owner" unless @owner == Thread.current + condvar.signal + end + end + + def __broadcast__( condvar ) + @lock.synchronize do + raise ThreadError, "not owner" unless @owner == Thread.current + condvar.broadcast + end + end + + def __wait__( condvar ) + @lock.synchronize do + raise ThreadError, "not owner" unless @owner == Thread.current + begin + saved = @count + @count = 0 + @owner = nil + condvar.wait @lock + ensure + @available.wait while @owner + @owner = Thread.current + @count = saved + end + end + end +end + +class Condition + def initialize( lock=nil ) + @lock = lock || Lock.new + @condvar = ConditionVariable.new + end + + def lock( timeout = nil ) + @lock.lock( timeout ) + self + end + + def unlock + @lock.unlock + self + end + + def synchronize + @lock.synchronize { yield } + end + + def signal + @lock.__signal__ @condvar + self + end + + def broadcast + @lock.__broadcast__ @condvar + self + end + + def wait + @lock.__wait__ @condvar + self + end +end + +class Semaphore + def initialize( value=1 ) + @lock = Mutex.new + @ready = ConditionVariable.new + @value = value + end + + def get( timeout = nil ) + @lock.synchronize do + if timeout + raise NotImplementedError, "nonzero timeouts not implemented" \ + unless timeout == 0 + return nil unless @value.nonzero? + else + @ready.wait @lock until @value.nonzero? + end + @value -= 1 + end + self + end + + def put + @lock.synchronize do + @value += 1 + @ready.signal + end + self + end +end + +class BoundedSemaphore + def initialize( value, limit=nil ) + @lock = Mutex.new + @available = ConditionVariable.new + @headroom = ConditionVariable.new + @limit = limit || value + @value = value + end + + def get( timeout = nil ) + @lock.synchronize do + if timeout + raise NotImplementedError, "nonzero timeouts not implemented" \ + unless timeout == 0 + return nil unless @value.nonzero? + else + @available.wait @lock until @value.nonzero? + end + @value -= 1 + @headroom.signal + end + self + end + + def put( timeout = nil ) + @lock.synchronize do + if timeout + raise NotImplementedError, "nonzero timeouts not implemented" \ + unless timeout == 0 + return nil unless @value < @limit + else + @available.wait @lock until @value < @limit + end + @value += 1 + @available.signal + end + self + end +end + +class Event + def initialize + @lock = Mutex.new + @ready = ConditionVariable.new + @set = false + end + + def set? ; @lock.synchronize { @set } ; end + + def set + @lock.synchronize do + unless @set + @set = true + edge true + end + end + self + end + + def clear + @lock.synchronize do + if @set + @set = false + edge false + end + end + self + end + + def edge( rising ) + @ready.broadcast if rising + end + private :edge + + def wait( timeout = nil ) + @lock.synchronize do + if timeout + raise NotImplementedError, "nonzero timeouts not implemented" \ + unless timeout == 0 + return nil unless @set + else + @ready.wait @lock until @set + end + end + self + end +end + +class IOEvent < Event + def initialize + @read, @write = IO.pipe + end + + def edge( rising ) + if rising + @write.write 'x' + else + @read.read 1 + end + end + + def wait( timeout=nil ) + IO.select( [ @read ], nil, nil, timeout ) + self + end + + def to_io + @read + end +end + +end +end diff --git a/test/test_actors.rb b/test/test_actors.rb new file mode 100644 index 0000000..dbbebcd --- /dev/null +++ b/test/test_actors.rb @@ -0,0 +1,11 @@ +require 'test/unit' +require 'concurrent/actors' +require 'thread' + +include Concurrent::Actors + +class TestActors < Test::Unit::TestCase + def test_current + assert_instance_of Actor, Actor.current + end +end diff --git a/test/test_all.rb b/test/test_all.rb new file mode 100644 index 0000000..c5205fe --- /dev/null +++ b/test/test_all.rb @@ -0,0 +1,6 @@ +require 'test_actors.rb' +require 'test_futures.rb' +require 'test_joins.rb' +require 'test_parallel.rb' +require 'test_stm.rb' +require 'test_synchronized.rb' diff --git a/test/test_futures.rb b/test/test_futures.rb new file mode 100644 index 0000000..ad7b54f --- /dev/null +++ b/test/test_futures.rb @@ -0,0 +1,51 @@ +require 'test/unit' +require 'concurrent/futures' +require 'thread' + +include Concurrent::Futures + +class FuturesTests < Test::Unit::TestCase + def test_promise_fulfill + promise = Promise.new + assert !promise.fulfilled? + promise.fulfill 10 + assert promise.fulfilled? + assert_equal 10, promise.future + end + + def test_promise_fulfill_twice + promise = Promise.new + promise.fulfill 10 + assert_raise( AlreadyFulfilledError ) do + promise.fulfill 20 + end + end + + def test_promise_fail + promise = Promise.new + promise.fail( EOFError.new ) + value = promise.future + assert_raise( EOFError ) do + Future.await promise.value + end + assert_raise( AsyncError ) do + value + 1 + end + end + + def test_future + f = Future.future { 3 } + assert_equal 3, f + end + + def test_future_raise + f = Future.future { raise EOFError, "blah" } + assert_raise( EOFError ) do + Future.await f + end + assert_raise( AsyncError ) do + f + 1 + end + end +end + diff --git a/test/test_joins.rb b/test/test_joins.rb new file mode 100644 index 0000000..a2bffa7 --- /dev/null +++ b/test/test_joins.rb @@ -0,0 +1,8 @@ +require 'test/unit' +require 'concurrent/joins' +require 'thread' + +class TestJoins < Test::Unit::TestCase + def test_dummy + end +end diff --git a/test/test_parallel.rb b/test/test_parallel.rb new file mode 100644 index 0000000..ae954b7 --- /dev/null +++ b/test/test_parallel.rb @@ -0,0 +1,8 @@ +require 'test/unit' +require 'concurrent/parallel' +require 'thread' + +class TestParallel < Test::Unit::TestCase + def test_dummy + end +end diff --git a/test/test_stm.rb b/test/test_stm.rb new file mode 100644 index 0000000..59779dc --- /dev/null +++ b/test/test_stm.rb @@ -0,0 +1,305 @@ +require 'test/unit' +require 'concurrent/stm' + +STM = Concurrent::STM + +class STMTests < Test::Unit::TestCase + + def test_ref + s = STM::Ref.new 0 + assert_instance_of STM::Ref, s + assert_equal 0, s.value + assert_equal 0, s[] + s.value = 1 + assert_equal 1, s.value + assert_equal 1, s[] + s[] = 2 + assert_equal 2, s.value + assert_equal 2, s[] + end + + def test_ref_exchange + s = STM::Ref.new 0 + assert_equal 0, s.exchange( 1 ) + assert_equal 1, s.value + end + + def test_ref_update + s = STM::Ref.new 0 + assert_equal 0, s.update { |n| n + 1 } + assert_equal 1, s.value + end + + def test_struct + c = STM::Struct.new :a, :b + assert_kind_of Class, c + assert_operator c, :<, STM::Struct + for s in [ c.new( 1, 2 ), c[1, 2] ] + assert_instance_of c, s + assert_equal 1, s.a + assert_equal 2, s.b + assert_equal 1, s[:a] + assert_equal 2, s[:b] + s.a = 3 + assert_equal 3, s.a + s[:a] = 4 + assert_equal 4, s.a + assert_equal 4, s[:a] + end + end + + def test_simple_transaction + s = STM::Ref.new 1 + assert_equal 1, s.value + STM.atomic { + assert_equal 1, s.value + s.value = 2 + assert_equal 2, s.value + s.value = 3 + assert_equal 3, s.value + } + assert_equal 3, s.value + end + + def test_double_struct_write + c = STM::Struct.new :foo + s = c.new 0 + STM.atomic { + assert_equal 0, s.foo + s.foo = 1 + assert_equal 1, s.foo + s.foo = 2 + assert_equal 2, s.foo + } + assert_equal 2, s.foo + end + + def test_simple_transaction_exception + s = STM::Ref.new 1 + assert_equal 1, s.value + begin + STM.atomic { + assert_equal 1, s.value + s.value = 2 + assert_equal 2, s.value + raise ScriptError + } + rescue ScriptError + end + assert_equal 1, s.value + end + + def test_retry + s = STM::Ref.new 0 + count = 0 + Thread.new { STM.atomic { + s.value += 1 + count += 1 + STM.retry if count < 2 + } } + sleep 1 + assert_equal 1, count + STM.atomic { s.value = s.value } + sleep 1 + assert_equal 2, count + assert_equal 1, s.value + end + + def test_readless_retry + s = STM::Ref.new false + count = 0 + raised = false + t = Thread.new { + begin + STM.atomic { + count += 1 + s.value = true + STM.retry + } + rescue STM::RetryNoReadsError + raised = true + end + } + Thread.new { sleep 5 ; t.kill rescue nil } # watchdog + t.join + assert !s.value + assert_equal 1, count + end + + def test_or_else_no_retry + a = STM::Ref.new false + b = STM::Ref.new false + c = STM::Ref.new false + count = 0 + t = Thread.new { + STM.atomic { + count += 1 + a.value = true + if STM.retries? { b.value = true } + c.value = true + end + } + } + Thread.new { sleep 5 ; t.kill rescue nil } # watchdog + t.join + assert ( count == 1 ), "transaction should not retry" + assert a.value + assert b.value + assert !c.value + end + + def test_or_else_retry_first + a = STM::Ref.new false + b = STM::Ref.new false + c = STM::Ref.new false + count = 0 + t = Thread.new { + STM.atomic { + count += 1 + a.value = true + if STM.retries? { b.value = true ; STM.retry } + c.value = true + end + } + } + Thread.new { sleep 5 ; t.kill rescue nil } # watchdog + t.join + assert ( count == 1 ), "transaction should not retry" + assert a.value + assert !b.value + assert c.value + end + + def test_or_else_retry_both + a = STM::Ref.new false + b = STM::Ref.new false + c = STM::Ref.new false + flag = STM::Ref.new false + count = 0 + t = Thread.new { + STM.atomic { + a.value = true + count += 1 + unless flag.value + if STM.retries? { b.value = true ; STM.retry } + c.value = true ; STM.retry + end + end + } + } + sleep 1 + flag.value = true + Thread.new { sleep 5 ; t.kill rescue nil } # watchdog + t.join + assert ( count == 2 ), "transaction should retry once" + assert a.value + assert !b.value + assert !c.value + end + + def test_or_else_existing_descriptor + s = STM::Ref.new 0 + STM.atomic { + s.value = 1 + STM.retries? { s.value = 2 } + } + assert_equal 2, s.value + end + + def test_to_proc + s = STM::Ref.new 1 + + proc = STM::Proc.new { s.value += 1 }.to_proc + assert_instance_of Proc, proc + + proc.call + assert_equal 2, s.value + end + + def test_proc_equals + a = Proc.new { 1 + 1 } + b = Proc.new { 1 + 1 } + + aa = STM::Proc.new &a + ab = STM::Proc.new &b + aa2 = STM::Proc.new &a + + assert_equal aa, aa2 + assert_not_equal ab, aa + end + + def test_wait + s = STM::Ref.new 1 + + t = nil + STM.atomic { + s.value += 1 + t = Thread.new { + STM.atomic { + s.value += 1 + } + } + sleep 1 + } + t.join + + assert_equal 3, s.value + end + + def test_deadlock + s1 = STM::Ref.new 0 + s2 = STM::Ref.new 0 + + t1_slept = false + t1 = Thread.new { + STM.atomic { + s1.value += 1 + sleep 1 unless t1_slept + t1_slept = true + s2.value += 1 + } + } + t2_slept = false + t2 = Thread.new { + STM.atomic { + s2.value += 1 + sleep 1 unless t2_slept + t2_slept = true + s1.value += 1 + } + } + Thread.new { # watchdog + sleep 10 + t1.raise ThreadError, "timed out" rescue nil + t2.raise ThreadError, "timed out" rescue nil + } + t1.value + t2.value + + assert_equal 2, s1.value + assert_equal 2, s2.value + end + + def test_timeout_timed_out + s = STM::Ref.new false + assert_raise STM::TimeoutError do + STM.timeout( 1 ) do + STM.retry unless s.value + end + end + end + + def test_timeout_succeeds + s = STM::Ref.new true + STM.timeout( 1 ) do + STM.retry unless s.value + end + end + + def test_nested_timeout_fails + assert_raise STM::InvalidNestingError do + STM.atomic { STM.timeout( 1 ) {} } + end + end +end + diff --git a/test/test_synchronized.rb b/test/test_synchronized.rb new file mode 100644 index 0000000..cbdeed4 --- /dev/null +++ b/test/test_synchronized.rb @@ -0,0 +1,10 @@ +require 'test/unit' +require 'concurrent/synchronized' +require 'thread' + +include Concurrent::Synchronized + +class TestSynchronized < Test::Unit::TestCase + def test_dummy + end +end -- 2.11.4.GIT