b685001107dbded058499cd1f70c2bffd0e84e0f
[rainbows.git] / lib / rainbows / process_client.rb
blobb685001107dbded058499cd1f70c2bffd0e84e0f
1 # -*- encoding: binary -*-
2 # :enddoc:
3 module Rainbows::ProcessClient
4   include Rainbows::Response
5   include Rainbows::Const
7   NULL_IO = Unicorn::HttpRequest::NULL_IO
8   RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT
9   IC = Unicorn::HttpRequest.input_class
10   Rainbows.config!(self, :client_header_buffer_size, :keepalive_timeout)
12   def read_expire
13     Time.now + KEEPALIVE_TIMEOUT
14   end
16   # used for reading headers (respecting keepalive_timeout)
17   def timed_read(buf)
18     expire = nil
19     begin
20       case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
21       when :wait_readable
22         return if expire && expire < Time.now
23         expire ||= read_expire
24         kgio_wait_readable(KEEPALIVE_TIMEOUT)
25       else
26         return rv
27       end
28     end while true
29   end
31   def process_loop
32     @hp = hp = Rainbows::HttpParser.new
33     kgio_read!(CLIENT_HEADER_BUFFER_SIZE, buf = hp.buf) or return
35     begin # loop
36       until env = hp.parse
37         timed_read(buf2 ||= "") or return
38         buf << buf2
39       end
41       set_input(env, hp)
42       env[REMOTE_ADDR] = kgio_addr
43       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
45       if 100 == status.to_i
46         write(EXPECT_100_RESPONSE)
47         env.delete(HTTP_EXPECT)
48         status, headers, body = APP.call(env)
49       end
50       write_response(status, headers, body, alive = @hp.next?)
51     end while alive
52   # if we get any error, try to write something back to the client
53   # assuming we haven't closed the socket, but don't get hung up
54   # if the socket is already closed or broken.  We'll always ensure
55   # the socket is closed at the end of this function
56   rescue => e
57     handle_error(e)
58   ensure
59     close unless closed?
60   end
62   def handle_error(e)
63     Rainbows::Error.write(self, e)
64   end
66   def set_input(env, hp)
67     env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
68   end
70   def process_pipeline(env, hp)
71     begin
72       set_input(env, hp)
73       env[REMOTE_ADDR] = kgio_addr
74       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
75       if 100 == status.to_i
76         write(EXPECT_100_RESPONSE)
77         env.delete(HTTP_EXPECT)
78         status, headers, body = APP.call(env)
79       end
80       write_response(status, headers, body, alive = hp.next?)
81     end while alive && pipeline_ready(hp)
82     alive or close
83     rescue => e
84       handle_error(e)
85   end
87   # override this in subclass/module
88   def pipeline_ready(hp)
89   end
90 end