rev_thread_spawn: 16K chunked reads work better
[rainbows.git] / lib / rainbows / rev_thread_spawn.rb
blobb8aa420552470ee7120a1f04aba12acdcc1d50de
1 # -*- encoding: binary -*-
2 require 'rainbows/rev'
3 require 'rainbows/ev_thread_core'
5 module Rainbows
7   # A combination of the Rev and ThreadSpawn models.  This allows Ruby
8   # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients
9   # on systems that support kqueue or epoll while still using
10   # Thread-based concurrency for application processing.  It exposes
11   # Unicorn::TeeInput for a streamable "rack.input" for upload
12   # processing within the app.  Threads are spawned immediately after
13   # header processing is done for calling the application.  Rack
14   # applications running under this mode should be thread-safe.
15   # DevFdResponse should be used with this class to proxy asynchronous
16   # responses.  All network I/O between the client and server are
17   # handled by the main thread (even when streaming "rack.input").
18   #
19   # Caveats:
20   #
21   # * TeeInput performance under Ruby 1.8 is terrible unless you
22   #   match the length argument of your env["rack.input"]#read
23   #   calls so that it is greater than or equal to Rev::IO::INPUT_SIZE.
24   #   Most applications depending on Rack to do multipart POST
25   #   processing should be alright as the current Rev::IO::INPUT_SIZE
26   #   of 16384 bytes matches the read size used by
27   #   Rack::Utils::Multipart::parse_multipart.
29   module RevThreadSpawn
30     class Client < Rainbows::Rev::Client
31       include EvThreadCore
32       LOOP = ::Rev::Loop.default
33       DR = Rainbows::Rev::DeferredResponse
34       TEE_RESUMER = ::Rev::AsyncWatcher.new
36       def pause
37         @lock.synchronize { disable if enabled? }
38       end
40       def resume
41         @lock.synchronize { enable unless enabled? }
42         TEE_RESUMER.signal
43       end
45       def write(data)
46         if Thread.current != @thread && @lock.locked?
47           # we're being called inside on_writable
48           super
49         else
50           @lock.synchronize { super }
51         end
52       end
54       def defer_body(io, out_headers)
55         @lock.synchronize { super }
56       end
58       def response_write(response, out)
59         DR.write(self, response, out)
60         (out && CONN_ALIVE == out.first) or
61             @lock.synchronize {
62               quit
63               schedule_write
64             }
65       end
67       def on_writable
68         # don't ever want to block in the main loop with lots of clients,
69         # libev is level-triggered so we'll always get another chance later
70         if @lock.try_lock
71           begin
72             super
73           ensure
74             @lock.unlock
75           end
76         end
77       end
79     end
81     include Rainbows::Rev::Core
83     def init_worker_process(worker)
84       super
85       Client::TEE_RESUMER.attach(::Rev::Loop.default)
86     end
88   end
89 end