simplify per-client keepalive state checks
[rainbows.git] / lib / rainbows / event_machine / client.rb
blob0b46b42bface94066255740604c1485ccbf2d45b
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::EventMachine::Client < EM::Connection
4   attr_writer :body
5   include Rainbows::EvCore
7   def initialize(io)
8     @_io = io
9     @body = nil
10   end
12   alias write send_data
14   def receive_data(data)
15     # To avoid clobbering the current streaming response
16     # (often a static file), we do not attempt to process another
17     # request on the same connection until the first is complete
18     if @body
19       if data
20         @buf << data
21         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
22       end
23       EM.next_tick { receive_data(nil) } unless @buf.empty?
24     else
25       on_read(data || "") if (@buf.size > 0) || data
26     end
27   end
29   def quit
30     super
31     close_connection_after_writing
32   end
34   def app_call
35     set_comm_inactivity_timeout 0
36     @env[RACK_INPUT] = @input
37     @env[REMOTE_ADDR] = @_io.kgio_addr
38     @env[ASYNC_CALLBACK] = method(:em_write_response)
39     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
41     response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
43     # too tricky to support pipelining with :async since the
44     # second (pipelined) request could be a stuck behind a
45     # long-running async response
46     (response.nil? || -1 == response[0]) and return @state = :close
48     if @hp.next?
49       @state = :headers
50       em_write_response(response, true)
51       if @buf.empty?
52         set_comm_inactivity_timeout(G.kato)
53       elsif @body.nil?
54         EM.next_tick { receive_data(nil) }
55       end
56     else
57       em_write_response(response, false)
58     end
59   end
61   def em_write_response(response, alive = false)
62     status, headers, body = response
63     if @hp.headers?
64       headers = HH.new(headers)
65       headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
66     else
67       headers = nil
68     end
70     if body.respond_to?(:errback) && body.respond_to?(:callback)
71       @body = body
72       body.callback { quit }
73       body.errback { quit }
74       # async response, this could be a trickle as is in comet-style apps
75       headers[CONNECTION] = CLOSE if headers
76       alive = true
77     elsif body.respond_to?(:to_path)
78       st = File.stat(path = body.to_path)
80       if st.file?
81         write(response_header(status, headers)) if headers
82         @body = stream_file_data(path)
83         @body.errback do
84           body.close if body.respond_to?(:close)
85           quit
86         end
87         @body.callback do
88           body.close if body.respond_to?(:close)
89           @body = nil
90           alive ? receive_data(nil) : quit
91         end
92         return
93       elsif st.socket? || st.pipe?
94         @body = io = body_to_io(body)
95         chunk = stream_response_headers(status, headers) if headers
96         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
97                     Rainbows::EventMachine::ResponsePipe
98         return EM.watch(io, m, self, alive, body).notify_readable = true
99       end
100       # char or block device... WTF? fall through to body.each
101     end
103     write(response_header(status, headers)) if headers
104     write_body_each(self, body)
105     quit unless alive
106   end
108   def next!
109     @hp.keepalive? ? receive_data(@body = nil) : quit
110   end
112   def unbind
113     async_close = @env[ASYNC_CLOSE] and async_close.succeed
114     @body.respond_to?(:fail) and @body.fail
115     begin
116       @_io.close
117     rescue Errno::EBADF
118       # EventMachine's EventableDescriptor::Close() may close
119       # the underlying file descriptor without invalidating the
120       # associated IO object on errors, so @_io.closed? isn't
121       # sufficient.
122     end
123   end