enable Range: responses for static files for most models
[rainbows.git] / lib / rainbows / writer_thread_pool.rb
blobdd3dd7cec5153c2b493a629161692c78e3f41c5a
1 # -*- encoding: binary -*-
3 module Rainbows
5   # This concurrency model implements a single-threaded app dispatch
6   # with a separate thread pool for writing responses.
7   #
8   # Unlike most \Rainbows! concurrency models, WriterThreadPool is
9   # designed to run behind nginx just like Unicorn is.  This concurrency
10   # model may be useful for existing Unicorn users looking for more
11   # output concurrency than socket buffers can provide while still
12   # maintaining a single-threaded application dispatch (though if the
13   # response body is dynamically generated, it must be thread safe).
14   #
15   # For serving large or streaming responses, using more threads (via
16   # the +worker_connections+ setting) and setting "proxy_buffering off"
17   # in nginx is recommended.  If your application does not handle
18   # uploads, then using any HTTP-aware proxy like haproxy is fine.
19   # Using a non-HTTP-aware proxy will leave you vulnerable to
20   # slow client denial-of-service attacks.
22   module WriterThreadPool
23     include Base
25     # used to wrap a BasicSocket to use with +q+ for all writes
26     # this is compatible with IO.select
27     class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
28       def readpartial(size, buf = "")
29         to_io.readpartial(size, buf)
30       end
32       def write_nonblock(buf)
33         to_io.write_nonblock(buf)
34       end
36       def write(buf)
37         q << [ to_io, buf ]
38       end
40       def close
41         q << [ to_io, :close ]
42       end
44       def closed?
45         false
46       end
47     end
49     module Response # :nodoc:
50       def write_body(qclient, body, range)
51         qclient.q << [ qclient.to_io, :body, body, range ]
52       end
53     end
55     @@nr = 0
56     @@q = nil
58     def process_client(client) # :nodoc:
59       @@nr += 1
60       super(QueueSocket[client, @@q[@@nr %= @@q.size]])
61     end
63     def worker_loop(worker) # :nodoc:
64       Rainbows::Response.setup(self.class)
65       self.class.__send__(:alias_method, :sync_write_body, :write_body)
66       self.class.__send__(:include, Response)
68       # we have multiple, single-thread queues since we don't want to
69       # interleave writes from the same client
70       qp = (1..worker_connections).map do |n|
71         QueuePool.new(1) do |response|
72           begin
73             io, arg1, arg2, arg3 = response
74             case arg1
75             when :body then sync_write_body(io, arg2, arg3)
76             when :close then io.close unless io.closed?
77             else
78               io.write(arg1)
79             end
80           rescue => err
81             Error.write(io, err)
82           end
83         end
84       end
86       @@q = qp.map { |q| q.queue }
87       super(worker) # accept loop from Unicorn
88       qp.map { |q| q.quit! }
89     end
90   end
91 end