code shuffling for kgio
[rainbows.git] / lib / rainbows / writer_thread_pool.rb
blob335a9010bb1c419f418172923b98a07bc8d81f58
1 # -*- encoding: binary -*-
3 module Rainbows
5   # This concurrency model implements a single-threaded app dispatch
6   # with a separate thread pool for writing responses.
7   #
8   # Unlike most \Rainbows! concurrency models, WriterThreadPool is
9   # designed to run behind nginx just like Unicorn is.  This concurrency
10   # model may be useful for existing Unicorn users looking for more
11   # output concurrency than socket buffers can provide while still
12   # maintaining a single-threaded application dispatch (though if the
13   # response body is dynamically generated, it must be thread safe).
14   #
15   # For serving large or streaming responses, using more threads (via
16   # the +worker_connections+ setting) and setting "proxy_buffering off"
17   # in nginx is recommended.  If your application does not handle
18   # uploads, then using any HTTP-aware proxy like haproxy is fine.
19   # Using a non-HTTP-aware proxy will leave you vulnerable to
20   # slow client denial-of-service attacks.
22   module WriterThreadPool
23     include Base
25     # used to wrap a BasicSocket to use with +q+ for all writes
26     # this is compatible with IO.select
27     class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
28       def kgio_addr
29         to_io.kgio_addr
30       end
32       def readpartial(size, buf = "")
33         to_io.readpartial(size, buf)
34       end
36       def kgio_read(size, buf = "")
37         to_io.kgio_read(size, buf)
38       end
40       def kgio_read!(size, buf = "")
41         to_io.kgio_read!(size, buf)
42       end
44       def write_nonblock(buf)
45         to_io.write_nonblock(buf)
46       end
48       def write(buf)
49         q << [ to_io, buf ]
50       end
52       def close
53         q << [ to_io, :close ]
54       end
56       def closed?
57         false
58       end
59     end
61     @@nr = 0
62     @@q = nil
64     def async_write_body(qclient, body, range)
65       qclient.q << [ qclient.to_io, :body, body, range ]
66     end
68     def process_client(client) # :nodoc:
69       @@nr += 1
70       super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
71     end
73     def init_worker_process(worker)
74       super
75       self.class.__send__(:alias_method, :sync_write_body, :write_body)
76       WriterThreadPool.__send__(:alias_method, :write_body, :async_write_body)
77     end
79     def worker_loop(worker) # :nodoc:
80       # we have multiple, single-thread queues since we don't want to
81       # interleave writes from the same client
82       qp = (1..worker_connections).map do |n|
83         QueuePool.new(1) do |response|
84           begin
85             io, arg1, arg2, arg3 = response
86             case arg1
87             when :body then sync_write_body(io, arg2, arg3)
88             when :close then io.close unless io.closed?
89             else
90               io.write(arg1)
91             end
92           rescue => err
93             Error.write(io, err)
94           end
95         end
96       end
98       @@q = qp.map { |q| q.queue }
99       super(worker) # accept loop from Unicorn
100       qp.map { |q| q.quit! }
101     end
102   end