hijacking support for Rack 1.5.x users
[rainbows.git] / lib / rainbows / process_client.rb
blobf58770cb967d9c122b3dfcb29f64ed8b2608e8b4
1 # -*- encoding: binary -*-
2 # :enddoc:
3 module Rainbows::ProcessClient
4   include Rainbows::Response
5   include Rainbows::Const
7   NULL_IO = Unicorn::HttpRequest::NULL_IO
8   RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT
9   IC = Unicorn::HttpRequest.input_class
10   Rainbows.config!(self, :client_header_buffer_size, :keepalive_timeout)
12   def read_expire
13     Time.now + KEEPALIVE_TIMEOUT
14   end
16   # used for reading headers (respecting keepalive_timeout)
17   def timed_read(buf)
18     expire = nil
19     begin
20       case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
21       when :wait_readable
22         return if expire && expire < Time.now
23         expire ||= read_expire
24         kgio_wait_readable(KEEPALIVE_TIMEOUT)
25       else
26         return rv
27       end
28     end while true
29   end
31   def process_loop
32     @hp = hp = Rainbows::HttpParser.new
33     kgio_read!(CLIENT_HEADER_BUFFER_SIZE, buf = hp.buf) or return
35     begin # loop
36       until env = hp.parse
37         timed_read(buf2 ||= "") or return
38         buf << buf2
39       end
41       set_input(env, hp)
42       env[REMOTE_ADDR] = kgio_addr
43       hp.hijack_setup(env, to_io)
44       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
46       if 100 == status.to_i
47         write(EXPECT_100_RESPONSE)
48         env.delete(HTTP_EXPECT)
49         status, headers, body = APP.call(env)
50       end
51       return if hp.hijacked?
52       write_response(status, headers, body, alive = hp.next?) or return
53     end while alive
54   # if we get any error, try to write something back to the client
55   # assuming we haven't closed the socket, but don't get hung up
56   # if the socket is already closed or broken.  We'll always ensure
57   # the socket is closed at the end of this function
58   rescue => e
59     handle_error(e)
60   ensure
61     close unless closed? || hp.hijacked?
62   end
64   def handle_error(e)
65     Rainbows::Error.write(self, e)
66   end
68   def set_input(env, hp)
69     env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
70   end
72   def process_pipeline(env, hp)
73     begin
74       set_input(env, hp)
75       env[REMOTE_ADDR] = kgio_addr
76       hp.hijack_setup(env, to_io)
77       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
78       if 100 == status.to_i
79         write(EXPECT_100_RESPONSE)
80         env.delete(HTTP_EXPECT)
81         status, headers, body = APP.call(env)
82       end
83       return if hp.hijacked?
84       write_response(status, headers, body, alive = hp.next?) or return
85     end while alive && pipeline_ready(hp)
86     alive or close
87     rescue => e
88       handle_error(e)
89   end
91   # override this in subclass/module
92   def pipeline_ready(hp)
93   end
94 end