1 # -*- encoding: binary -*-
5 # This concurrency model implements a single-threaded app dispatch
6 # with a separate thread pool for writing responses.
8 # Unlike most \Rainbows! concurrency models, WriterThreadPool is
9 # designed to run behind nginx just like Unicorn is. This concurrency
10 # model may be useful for existing Unicorn users looking for more
11 # output concurrency than socket buffers can provide while still
12 # maintaining a single-threaded application dispatch (though if the
13 # response body is dynamically generated, it must be thread safe).
15 # For serving large or streaming responses, using more threads (via
16 # the +worker_connections+ setting) and setting "proxy_buffering off"
17 # in nginx is recommended. If your application does not handle
18 # uploads, then using any HTTP-aware proxy like haproxy is fine.
19 # Using a non-HTTP-aware proxy will leave you vulnerable to
20 # slow client denial-of-service attacks.
22 module WriterThreadPool
25 # used to wrap a BasicSocket to use with +q+ for all writes
26 # this is compatible with IO.select
27 class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
32 def readpartial(size, buf = "")
33 to_io.readpartial(size, buf)
36 def kgio_read(size, buf = "")
37 to_io.kgio_read(size, buf)
40 def kgio_read!(size, buf = "")
41 to_io.kgio_read!(size, buf)
44 def write_nonblock(buf)
45 to_io.write_nonblock(buf)
53 q << [ to_io, :close ]
64 def async_write_body(qclient, body, range)
65 qclient.q << [ qclient.to_io, :body, body, range ]
68 def process_client(client) # :nodoc:
70 super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
73 def init_worker_process(worker)
75 self.class.__send__(:alias_method, :sync_write_body, :write_body)
76 WriterThreadPool.__send__(:alias_method, :write_body, :async_write_body)
79 def worker_loop(worker) # :nodoc:
80 # we have multiple, single-thread queues since we don't want to
81 # interleave writes from the same client
82 qp = (1..worker_connections).map do |n|
83 QueuePool.new(1) do |response|
85 io, arg1, arg2, arg3 = response
87 when :body then sync_write_body(io, arg2, arg3)
88 when :close then io.close unless io.closed?
98 @@q = qp.map { |q| q.queue }
99 super(worker) # accept loop from Unicorn
100 qp.map { |q| q.quit! }