writer_thread_spawn: worker_connections limits thread spawned
[rainbows.git] / lib / rainbows / writer_thread_pool.rb
blobec07ccd39eefdfda6c950720704bae93b9a57419
1 # -*- encoding: binary -*-
3 module Rainbows
5   # This concurrency model implements a single-threaded app dispatch
6   # with a separate thread pool for writing responses.
7   #
8   # Unlike most \Rainbows! concurrency models, WriterThreadPool is
9   # designed to run behind nginx just like Unicorn is.  This concurrency
10   # model may be useful for existing Unicorn users looking for more
11   # output concurrency than socket buffers can provide while still
12   # maintaining a single-threaded application dispatch (though if the
13   # response body is dynamically generated, it must be thread safe).
14   #
15   # For serving large or streaming responses, using more threads (via
16   # the +worker_connections+ setting) and setting "proxy_buffering off"
17   # in nginx is recommended.  If your application does not handle
18   # uploads, then using any HTTP-aware proxy like haproxy is fine.
19   # Using a non-HTTP-aware proxy will leave you vulnerable to
20   # slow client denial-of-service attacks.
22   module WriterThreadPool
23     include Base
25     # used to wrap a BasicSocket to use with +q+ for all writes
26     # this is compatible with IO.select
27     class QueueSocket < Struct.new(:to_io, :q)
28       def readpartial(size, buf = "")
29         to_io.readpartial(size, buf)
30       end
32       def write_nonblock(buf)
33         to_io.write_nonblock(buf)
34       end
36       def write(buf)
37         q << [ to_io, buf ]
38       end
40       def close
41         q << [ to_io, :close ]
42       end
44       def closed?
45         false
46       end
47     end
49     alias base_write_body write_body
50     if IO.respond_to?(:copy_stream)
51       undef_method :write_body
53       def write_body(qclient, body)
54         qclient.q << [ qclient.to_io, :body, body ]
55       end
56     end
58     @@nr = 0
59     @@q = nil
61     def process_client(client)
62       @@nr += 1
63       super(QueueSocket[client, @@q[@@nr %= @@q.size]])
64     end
66     def worker_loop(worker)
67       # we have multiple, single-thread queues since we don't want to
68       # interleave writes from the same client
69       qp = (1..worker_connections).map do |n|
70         QueuePool.new(1) do |response|
71           begin
72             io, arg1, arg2 = response
73             case arg1
74             when :body then base_write_body(io, arg2)
75             when :close then io.close unless io.closed?
76             else
77               io.write(arg1)
78             end
79           rescue => err
80             Error.app(err)
81           end
82         end
83       end
85       @@q = qp.map { |q| q.queue }
86       super(worker) # accept loop from Unicorn
87       qp.map { |q| q.quit! }
88     end
89   end
90 end