1 # -*- encoding: binary -*-
5 # This concurrency model implements a single-threaded app dispatch
6 # with a separate thread pool for writing responses.
8 # Unlike most \Rainbows! concurrency models, WriterThreadPool is
9 # designed to run behind nginx just like Unicorn is. This concurrency
10 # model may be useful for existing Unicorn users looking for more
11 # output concurrency than socket buffers can provide while still
12 # maintaining a single-threaded application dispatch (though if the
13 # response body is dynamically generated, it must be thread safe).
15 # For serving large or streaming responses, using more threads (via
16 # the +worker_connections+ setting) and setting "proxy_buffering off"
17 # in nginx is recommended. If your application does not handle
18 # uploads, then using any HTTP-aware proxy like haproxy is fine.
19 # Using a non-HTTP-aware proxy will leave you vulnerable to
20 # slow client denial-of-service attacks.
22 module WriterThreadPool
25 # used to wrap a BasicSocket to use with +q+ for all writes
26 # this is compatible with IO.select
27 class QueueSocket < Struct.new(:to_io, :q)
28 def readpartial(size, buf = "")
29 to_io.readpartial(size, buf)
32 def write_nonblock(buf)
33 to_io.write_nonblock(buf)
41 q << [ to_io, :close ]
49 alias base_write_body write_body
50 if IO.respond_to?(:copy_stream)
51 undef_method :write_body
53 def write_body(qclient, body)
54 qclient.q << [ qclient.to_io, :body, body ]
61 def process_client(client)
63 super(QueueSocket[client, @@q[@@nr %= @@q.size]])
66 def worker_loop(worker)
67 # we have multiple, single-thread queues since we don't want to
68 # interleave writes from the same client
69 qp = (1..worker_connections).map do |n|
70 QueuePool.new(1) do |response|
72 io, arg1, arg2 = response
74 when :body then base_write_body(io, arg2)
75 when :close then io.close unless io.closed?
85 @@q = qp.map { |q| q.queue }
86 super(worker) # accept loop from Unicorn
87 qp.map { |q| q.quit! }