cleanup and refactor error handling
[rainbows.git] / lib / rainbows / rev_thread_spawn.rb
blobf2ce82265330298c958d07425566b41e40b5f00d
1 # -*- encoding: binary -*-
2 require 'rainbows/rev'
4 RUBY_VERSION =~ %r{\A1\.8} and
5   warn "Rainbows::RevThreadSpawn does not work well under Ruby 1.8"
7 module Rainbows
9   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
10   # Thread-based concurrency for application processing.  It DOES NOT
11   # expose a streamable "rack.input" for upload processing within the
12   # app.  DevFdResponse should be used with this class to proxy
13   # asynchronous responses.  All network I/O between the client and
14   # server are handled by the main thread and outside of the core
15   # application dispatch.
16   #
17   # WARNING: this model does not perform well under 1.8, especially
18   # if your application itself performs heavy I/O
20   module RevThreadSpawn
22     class Master < ::Rev::AsyncWatcher
24       def initialize
25         super
26         @queue = Queue.new
27       end
29       def <<(output)
30         @queue << output
31         signal
32       end
34       def on_signal
35         client, response = @queue.pop
36         client.response_write(response)
37       end
38     end
40     class Client < Rainbows::Rev::Client
41       DR = Rainbows::Rev::DeferredResponse
42       KATO = Rainbows::Rev::KATO
44       def response_write(response)
45         enable
46         alive = @hp.keepalive? && G.alive
47         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
48         DR.write(self, response, out)
49         return quit unless alive && G.alive
51         @env.clear
52         @hp.reset
53         @state = :headers
54         # keepalive requests are always body-less, so @input is unchanged
55         if @hp.headers(@env, @buf)
56           @input = HttpRequest::NULL_IO
57           app_call
58         else
59           KATO[self] = Time.now
60         end
61       end
63       # fails-safe application dispatch, we absolutely cannot
64       # afford to fail or raise an exception (killing the thread)
65       # here because that could cause a deadlock and we'd leak FDs
66       def app_response
67         begin
68           @env[REMOTE_ADDR] = @remote_addr
69           APP.call(@env.update(RACK_DEFAULTS))
70         rescue => e
71           Error.app(e) # we guarantee this does not raise
72           [ 500, {}, [] ]
73         end
74       end
76       def app_call
77         KATO.delete(client = self)
78         disable
79         @env[RACK_INPUT] = @input
80         @input = nil # not sure why, @input seems to get closed otherwise...
81         Thread.new { MASTER << [ client, app_response ] }
82       end
83     end
85     include Rainbows::Rev::Core
87     def init_worker_process(worker)
88       super
89       Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default))
90     end
92   end
93 end