writer_thread_spawn: worker_connections limits thread spawned
[rainbows.git] / lib / rainbows / writer_thread_spawn.rb
blob1d0cfa001018ab2141264d4bd9531c46f9db253e
1 # -*- encoding: binary -*-
2 require 'thread'
3 module Rainbows
5   # This concurrency model implements a single-threaded app dispatch and
6   # spawns a new thread for writing responses.  This concurrency model
7   # should be ideal for apps that serve large responses or stream
8   # responses slowly.
9   #
10   # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
11   # designed to run behind nginx just like Unicorn is.  This concurrency
12   # model may be useful for existing Unicorn users looking for more
13   # output concurrency than socket buffers can provide while still
14   # maintaining a single-threaded application dispatch (though if the
15   # response body is generated on-the-fly, it must be thread safe).
16   #
17   # For serving large or streaming responses, setting
18   # "proxy_buffering off" in nginx is recommended.  If your application
19   # does not handle uploads, then using any HTTP-aware proxy like
20   # haproxy is fine.  Using a non-HTTP-aware proxy will leave you
21   # vulnerable to slow client denial-of-service attacks.
23   module WriterThreadSpawn
24     include Base
26     CUR = {}
28     # used to wrap a BasicSocket to use with +q+ for all writes
29     # this is compatible with IO.select
30     class MySocket < Struct.new(:to_io, :q, :thr)
31       def readpartial(size, buf = "")
32         to_io.readpartial(size, buf)
33       end
35       def write_nonblock(buf)
36         to_io.write_nonblock(buf)
37       end
39       def queue_writer
40         # not using Thread.pass here because that spins the CPU during
41         # I/O wait and will eat cycles from other worker processes.
42         until CUR.size < MAX
43           CUR.delete_if { |t,_|
44             t.alive? ? t.join(0) : true
45           }.size >= MAX and sleep(0.01)
46         end
48         q = Queue.new
49         self.thr = Thread.new(to_io, q) do |io, q|
50           while response = q.shift
51             begin
52               arg1, arg2 = response
53               case arg1
54               when :body then Base.write_body(io, arg2)
55               when :close
56                 io.close unless io.closed?
57                 break
58               else
59                 io.write(arg1)
60               end
61             rescue => e
62               Error.app(e)
63             end
64           end
65           CUR.delete(Thread.current)
66         end
67         CUR[thr] = q
68       end
70       def write(buf)
71         (self.q ||= queue_writer) << buf
72       end
74       def write_body(body)
75         (self.q ||= queue_writer) << [ :body, body ]
76       end
78       def close
79         if q
80           q << :close
81         else
82           to_io.close
83         end
84       end
86       def closed?
87         false
88       end
89     end
91     if IO.respond_to?(:copy_stream)
92       undef_method :write_body
94       def write_body(my_sock, body)
95         my_sock.write_body(body)
96       end
97     end
99     def process_client(client)
100       super(MySocket[client])
101     end
103     def worker_loop(worker)
104       MySocket.const_set(:MAX, worker_connections)
105       super(worker) # accept loop from Unicorn
106       CUR.delete_if do |t,q|
107         q << nil
108         G.tick
109         t.alive? ? thr.join(0.01) : true
110       end until CUR.empty?
111     end
112   end