rev_thread_spawn: more 1.8 notes
[rainbows.git] / lib / rainbows / rev_thread_spawn.rb
blob0bfeb36483a8043e1934c79c551a36f28a0fa2be
1 # -*- encoding: binary -*-
2 require 'rainbows/rev'
4 RUBY_VERSION =~ %r{\A1\.8} && ::Rev::VERSION < "0.3.2" and
5   warn "Rainbows::RevThreadSpawn + Rev (< 0.3.2)" \
6        " does not work well under Ruby 1.8"
8 module Rainbows
10   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
11   # Thread-based concurrency for application processing.  It DOES NOT
12   # expose a streamable "rack.input" for upload processing within the
13   # app.  DevFdResponse should be used with this class to proxy
14   # asynchronous responses.  All network I/O between the client and
15   # server are handled by the main thread and outside of the core
16   # application dispatch.
17   #
18   # WARNING: this model does not currently perform well under 1.8.  See the
19   # {rev-talk mailing list}[http://rubyforge.org/mailman/listinfo/rev-talk]
20   # for ongoing performance work that will hopefully make it into the
21   # next release of {Rev}[http://rev.rubyforge.org/].
23   module RevThreadSpawn
25     class Master < ::Rev::AsyncWatcher
27       def initialize
28         super
29         @queue = Queue.new
30       end
32       def <<(output)
33         @queue << output
34         signal
35       end
37       def on_signal
38         client, response = @queue.pop
39         client.response_write(response)
40       end
41     end
43     class Client < Rainbows::Rev::Client
44       DR = Rainbows::Rev::DeferredResponse
45       KATO = Rainbows::Rev::KATO
47       def response_write(response)
48         enable
49         alive = @hp.keepalive? && G.alive
50         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
51         DR.write(self, response, out)
52         return quit unless alive && G.alive
54         @env.clear
55         @hp.reset
56         @state = :headers
57         # keepalive requests are always body-less, so @input is unchanged
58         if @hp.headers(@env, @buf)
59           @input = HttpRequest::NULL_IO
60           app_call
61         else
62           KATO[self] = Time.now
63         end
64       end
66       # fails-safe application dispatch, we absolutely cannot
67       # afford to fail or raise an exception (killing the thread)
68       # here because that could cause a deadlock and we'd leak FDs
69       def app_response
70         begin
71           @env[REMOTE_ADDR] = @remote_addr
72           APP.call(@env.update(RACK_DEFAULTS))
73         rescue => e
74           Error.app(e) # we guarantee this does not raise
75           [ 500, {}, [] ]
76         end
77       end
79       def app_call
80         KATO.delete(client = self)
81         disable
82         @env[RACK_INPUT] = @input
83         @input = nil # not sure why, @input seems to get closed otherwise...
84         Thread.new { MASTER << [ client, app_response ] }
85       end
86     end
88     include Rainbows::Rev::Core
90     def init_worker_process(worker)
91       super
92       Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default))
93     end
95   end
96 end