add test for grain size
[concurrent.git] / lib / concurrent / futures.rb
blob5d8c03441a3e3c75efdabfc6e72a74e702fdfdfb
2 # concurrent/futures - futures and lazy evaluation for Ruby
4 # Copyright (C) 2007  MenTaLguY <mental@rydia.net>
6 # This file is made available under the same terms as Ruby.
9 require 'concurrent/primitives'
11 module Concurrent
12 module Futures
14 # Encapsulates an exception raised asynchronously during evaluation of
15 # a future; this encapsulation is necessary to avoid ambiguity with similar
16 # exceptions synchronously raised.
18 class AsyncError < RuntimeError
19   attr_reader :reason # the asynchronously raised exception
21   def initialize( reason, desc=nil )
22     super( desc )
23     @reason = reason
24   end
25 end
27 require 'concurrent/futures.so' # provides Thunk
29 # Raised in response to attempts to fulfill an already fulfilled promise
30 class AlreadyFulfilledError < StandardError
31 end
33 # A Promise is a cell which can be set to a value exactly once: initially,
34 # it is empty, and may be either "fulfilled" with a value or "failed" with
35 # an exception.
37 # <b>Scheduling</b>: Unfair
39 class Promise
40   # A future which evaluates to the fulfilled value of the promise
41   attr_reader :future
43   class Value < Struct.new :value #:nodoc:
44   end
46   class Failure #:nodoc:
47     def initialize( exception )
48       @exception = exception
49     end
51     def value
52       raise @exception
53     end
54   end
56   # Creates a new unfulfilled promise
57   def initialize
58     @lock = Primitives::Semaphore.new 1
59     @ready = Primitives::Latch.new
60     @fulfilled = Primitives::Volatile.new
61     @future = Thunk.new self
62   end
64   # Returns true if the promise has been fulfilled or failed, false otherwise.
65   #
66   # <b>Can Block</b>: No
67   #
68   def fulfilled?
69     !@fulfilled.value.nil?
70   end
72   # Returns the value the promise was fulfilled with, raises the exception
73   # it was failed with, and otherwise waits for the promise to be fulfilled.
74   #
75   # <b>Can Block</b>: Yes, if the promise is unfulfilled
76   #
77   def value
78     value = @fulfilled.value
79     @ready.wait unless value
80     value.value
81   end
83   def fulfill_inner
84     @lock.synchronize do
85       if @fulfilled.value
86         raise AlreadyFulfilledError, "promise already fulfilled"
87       end
88       @fulfilled.value = yield
89       @ready.set
90     end
91     self
92   end
93   private :fulfill_inner
95   # Fulfills the promise with +value+, or raises AlreadyFulfilledError if it
96   # has already been fulfilled or failed.
97   #
98   # <b>Can Block</b>: Yes
99   #
100   def fulfill( value )
101     fulfill_inner { Value.new value }
102     self
103   end
105   # Fails the promise with +exception+, or raises AlreadyFulfilledError if it
106   # has already been fulfilled or failed.
107   #
108   # <b>Can Block</b>: Yes
109   #
110   def fail( exception )
111     fulfill_inner { Failure.new exception }
112     self
113   end
116 class Lazy #:nodoc:
117   def initialize( &block )
118     @block = block
119     @promise = Promise.new
120     @value = Primitives::Atomic.new nil
121   end
123   def value
124     if @value.set_if_equal( nil, @promise )
125       @promise.fulfill Future.async( &@block )
126       @promise = nil
127     end
128     @value.value.future
129   end
131   def inspect
132     "#<Lazy #{ @block.inspect }>"
133   end
136 # A single-value cell similar to Primitives::Atomic except it has no
137 # compare-and-set operation, but the added ability to atomically assign
138 # a new value which is based on the old value.  See Ref#modify.
140 # <b>Scheduling</b>: Unfair
142 class Ref
143   def initialize( value=nil )
144     @value = Primitives::Atomic.new value
145   end
147   # Returns the cell's current value
148   #
149   # <b>Can Block</b>: No
150   #
151   def value ; @value.value ; end
153   # Assigns +new_value+ to the cell, returning +new_value+.
154   #
155   # <b>Can Block</b>: No
156   #
157   def value=( new_value )
158     @value.value = new_value
159   end
161   # Swaps the old value for a new one, returning the old value.
162   #
163   # See Primitives::Atomic#swap
164   #
165   # <b>Can Block</b>: No
166   #
167   def exchange( new_value )
168     @value.swap( new_value )
169   end
170   alias swap exchange
172   # Modifies the existing value, passing it to a block whose result will
173   # become the new value.
174   #
175   # <b>Can Block</b>: Yes
176   #
177   def modify( &block )
178     promise = Promise.new
179     old_value = @value.swap( promise.future )
180     promise.fulfill Future.async { block.call old_value }
181     old_value
182   end
185 # Futures are placeholders for the result of a pending computation.  When
186 # the computation is complete, they become nearly indistinguishable from the
187 # computation's result object.
189 # Calling a method on a future belonging to an unfinished computation will
190 # block until the computation completes.
192 # Once a computation is finished, the future will delegate the method call
193 # to the result object, or re-raise the exception that terminated the
194 # computation, wrapping it in AsyncError.  Subsequent method calls do not
195 # block, but delegate or raise immediately.
197 # Futures let you take a "buy now, pay later" approach to computation: you
198 # can fire off a background computation, get a handle (a future) for its
199 # result immediately, and nothing will have to wait for that computation to
200 # finish until or unless someone tries to call a method on its result.
202 # One caveat: a future will always appear to be true to Ruby, even if the
203 # result object is false or nil.  This can be worked around by using
204 # Future.await to unwrap the computation's real result object.
206 # <b>Scheduling</b>: Unfair
207 # <b>Write Visibility</b>: All writes by the computation become visible to a requesting thread
208 # <b>Blocking</b>: Yes, if the computation is not complete
210 class Future
211   class << self
212     undef allocate
213     undef new
215     # Spawns +block+ in a new thread, immediately returning a future for
216     # its result.
217     #
218     # <b>Can Block</b>: Yes
219     #
220     def async( &block )
221       Thunk.new( Thread.new( &block ) )
222     end
223     alias spawn async
224     alias future async
226     # Lazily spawns +block+ in a new thread, immediately returning a future
227     # but waiting to actually spawn the block until the future's value is
228     # required.
229     #
230     # <b>Can Block</b>: Yes
231     #
232     def lazy( &block )
233       Thunk.new( Lazy.new( &block ) )
234     end
236     # Waits for the future's computation to finish, and then returns
237     # its unwrapped result (or raises the exception which terminated it).
238     # Calling it on an ordinary object will simply return that object.
239     # 
240     # <b>Write Visibility</b>: All writes by the computation become visible to the calling thread
241     # <b>Can Block</b>: Yes, if the computation is unfinished
242     #
243     def await( future )
244       Thunk.value future
245     end
246   end