restore Rainbows::HttpResponse.write for Cramp
[rainbows.git] / lib / rainbows / writer_thread_spawn.rb
blobb9bbad2cc56f272c549655539a5a807d6976abe2
1 # -*- encoding: binary -*-
2 require 'thread'
3 module Rainbows
5   # This concurrency model implements a single-threaded app dispatch and
6   # spawns a new thread for writing responses.  This concurrency model
7   # should be ideal for apps that serve large responses or stream
8   # responses slowly.
9   #
10   # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
11   # designed to run behind nginx just like Unicorn is.  This concurrency
12   # model may be useful for existing Unicorn users looking for more
13   # output concurrency than socket buffers can provide while still
14   # maintaining a single-threaded application dispatch (though if the
15   # response body is generated on-the-fly, it must be thread safe).
16   #
17   # For serving large or streaming responses, setting
18   # "proxy_buffering off" in nginx is recommended.  If your application
19   # does not handle uploads, then using any HTTP-aware proxy like
20   # haproxy is fine.  Using a non-HTTP-aware proxy will leave you
21   # vulnerable to slow client denial-of-service attacks.
23   module WriterThreadSpawn
24     include Base
26     CUR = {}
28     # used to wrap a BasicSocket to use with +q+ for all writes
29     # this is compatible with IO.select
30     class MySocket < Struct.new(:to_io, :q, :thr)
31       include Rainbows::Response
33       def readpartial(size, buf = "")
34         to_io.readpartial(size, buf)
35       end
37       def write_nonblock(buf)
38         to_io.write_nonblock(buf)
39       end
41       def queue_writer
42         # not using Thread.pass here because that spins the CPU during
43         # I/O wait and will eat cycles from other worker processes.
44         until CUR.size < MAX
45           CUR.delete_if { |t,_|
46             t.alive? ? t.join(0) : true
47           }.size >= MAX and sleep(0.01)
48         end
50         q = Queue.new
51         self.thr = Thread.new(to_io, q) do |io, q|
52           while response = q.shift
53             begin
54               arg1, arg2 = response
55               case arg1
56               when :body then write_body(io, arg2)
57               when :close
58                 io.close unless io.closed?
59                 break
60               else
61                 io.write(arg1)
62               end
63             rescue => e
64               Error.write(io, e)
65             end
66           end
67           CUR.delete(Thread.current)
68         end
69         CUR[thr] = q
70       end
72       def write(buf)
73         (self.q ||= queue_writer) << buf
74       end
76       def queue_body(body)
77         (self.q ||= queue_writer) << [ :body, body ]
78       end
80       def close
81         if q
82           q << :close
83         else
84           to_io.close
85         end
86       end
88       def closed?
89         false
90       end
91     end
93     def write_body(my_sock, body)
94       my_sock.queue_body(body)
95     end
97     def process_client(client)
98       super(MySocket[client])
99     end
101     def worker_loop(worker)
102       MySocket.const_set(:MAX, worker_connections)
103       Rainbows::Response.setup(MySocket)
104       super(worker) # accept loop from Unicorn
105       CUR.delete_if do |t,q|
106         q << nil
107         G.tick
108         t.alive? ? t.join(0.01) : true
109       end until CUR.empty?
110     end
111   end