1 # -*- encoding: binary -*-
5 # This concurrency model implements a single-threaded app dispatch and
6 # spawns a new thread for writing responses. This concurrency model
7 # should be ideal for apps that serve large responses or stream
10 # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
11 # designed to run behind nginx just like Unicorn is. This concurrency
12 # model may be useful for existing Unicorn users looking for more
13 # output concurrency than socket buffers can provide while still
14 # maintaining a single-threaded application dispatch (though if the
15 # response body is generated on-the-fly, it must be thread safe).
17 # For serving large or streaming responses, setting
18 # "proxy_buffering off" in nginx is recommended. If your application
19 # does not handle uploads, then using any HTTP-aware proxy like
20 # haproxy is fine. Using a non-HTTP-aware proxy will leave you
21 # vulnerable to slow client denial-of-service attacks.
23 module WriterThreadSpawn
28 # used to wrap a BasicSocket to use with +q+ for all writes
29 # this is compatible with IO.select
30 class MySocket < Struct.new(:to_io, :q, :thr)
31 include Rainbows::Response
33 def readpartial(size, buf = "")
34 to_io.readpartial(size, buf)
37 def write_nonblock(buf)
38 to_io.write_nonblock(buf)
42 # not using Thread.pass here because that spins the CPU during
43 # I/O wait and will eat cycles from other worker processes.
46 t.alive? ? t.join(0) : true
47 }.size >= MAX and sleep(0.01)
51 self.thr = Thread.new(to_io, q) do |io, q|
52 while response = q.shift
56 when :body then write_body(io, arg2)
58 io.close unless io.closed?
67 CUR.delete(Thread.current)
73 (self.q ||= queue_writer) << buf
77 (self.q ||= queue_writer) << [ :body, body ]
93 def write_body(my_sock, body)
94 my_sock.queue_body(body)
97 def process_client(client)
98 super(MySocket[client])
101 def worker_loop(worker)
102 MySocket.const_set(:MAX, worker_connections)
103 Rainbows::Response.setup(MySocket)
104 super(worker) # accept loop from Unicorn
105 CUR.delete_if do |t,q|
108 t.alive? ? t.join(0.01) : true