writer_thread_*: unindent
[rainbows.git] / lib / rainbows / writer_thread_pool.rb
blob7b5e861c4f7d6f12dd83b48b22ccce4daeb2f397
1 # -*- encoding: binary -*-
3 # This concurrency model implements a single-threaded app dispatch
4 # with a separate thread pool for writing responses.
6 # Unlike most \Rainbows! concurrency models, WriterThreadPool is
7 # designed to run behind nginx just like Unicorn is.  This concurrency
8 # model may be useful for existing Unicorn users looking for more
9 # output concurrency than socket buffers can provide while still
10 # maintaining a single-threaded application dispatch (though if the
11 # response body is dynamically generated, it must be thread safe).
13 # For serving large or streaming responses, using more threads (via
14 # the +worker_connections+ setting) and setting "proxy_buffering off"
15 # in nginx is recommended.  If your application does not handle
16 # uploads, then using any HTTP-aware proxy like haproxy is fine.
17 # Using a non-HTTP-aware proxy will leave you vulnerable to
18 # slow client denial-of-service attacks.
19 module Rainbows::WriterThreadPool
20   # :stopdoc:
21   include Rainbows::Base
23   # used to wrap a BasicSocket to use with +q+ for all writes
24   # this is compatible with IO.select
25   class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
26     def kgio_addr
27       to_io.kgio_addr
28     end
30     def kgio_read(size, buf = "")
31       to_io.kgio_read(size, buf)
32     end
34     def kgio_read!(size, buf = "")
35       to_io.kgio_read!(size, buf)
36     end
38     def kgio_trywrite(buf)
39       to_io.kgio_trywrite(buf)
40     end
42     def timed_read(buf)
43       to_io.timed_read(buf)
44     end
46     def write(buf)
47       q << [ to_io, buf ]
48     end
50     def close
51       q << [ to_io, :close ]
52     end
54     def closed?
55       false
56     end
57   end
59   @@nr = 0
60   @@q = nil
62   def async_write_body(qclient, body, range)
63     qclient.q << [ qclient.to_io, :body, body, range ]
64   end
66   def process_client(client) # :nodoc:
67     @@nr += 1
68     super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
69   end
71   def init_worker_process(worker)
72     super
73     self.class.__send__(:alias_method, :sync_write_body, :write_body)
74     Rainbows::WriterThreadPool.__send__(
75                         :alias_method, :write_body, :async_write_body)
76   end
78   def worker_loop(worker) # :nodoc:
79     # we have multiple, single-thread queues since we don't want to
80     # interleave writes from the same client
81     qp = (1..worker_connections).map do |n|
82       Rainbows::QueuePool.new(1) do |response|
83         begin
84           io, arg1, arg2, arg3 = response
85           case arg1
86           when :body then sync_write_body(io, arg2, arg3)
87           when :close then io.close unless io.closed?
88           else
89             io.write(arg1)
90           end
91         rescue => err
92           Rainbows::Error.write(io, err)
93         end
94       end
95     end
97     @@q = qp.map { |q| q.queue }
98     super(worker) # accept loop from Unicorn
99     qp.map { |q| q.quit! }
100   end
101   # :startdoc: