hijacking support for Rack 1.5.x users
[rainbows.git] / lib / rainbows / event_machine / client.rb
blob9871c099fa74cbe27c8bb566c5cfed6b19eb7f86
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::EventMachine::Client < EM::Connection
4   include Rainbows::EvCore
5   Rainbows.config!(self, :keepalive_timeout)
7   def initialize(io)
8     @_io = io
9     @deferred = nil
10   end
12   alias write send_data
13   alias hijacked detach
15   def receive_data(data)
16     # To avoid clobbering the current streaming response
17     # (often a static file), we do not attempt to process another
18     # request on the same connection until the first is complete
19     if @deferred
20       if data
21         @buf << data
22         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
23       end
24       EM.next_tick { receive_data(nil) } unless @buf.empty?
25     else
26       on_read(data || Z) if (@buf.size > 0) || data
27     end
28   end
30   def quit
31     super
32     close_connection_after_writing if nil == @deferred
33   end
35   def app_call input
36     set_comm_inactivity_timeout 0
37     @env[RACK_INPUT] = input
38     @env[REMOTE_ADDR] = @_io.kgio_addr
39     @env[ASYNC_CALLBACK] = method(:write_async_response)
40     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
41     @hp.hijack_setup(@env, @_io)
42     status, headers, body = catch(:async) {
43       APP.call(@env.merge!(RACK_DEFAULTS))
44     }
45     return hijacked if @hp.hijacked?
47     if (nil == status || -1 == status)
48       @deferred = true
49     else
50       ev_write_response(status, headers, body, @hp.next?)
51     end
52   end
54   def deferred_errback(orig_body)
55     @deferred.errback do
56       orig_body.close if orig_body.respond_to?(:close)
57       @deferred = nil
58       quit
59     end
60   end
62   def deferred_callback(orig_body, alive)
63     @deferred.callback do
64       orig_body.close if orig_body.respond_to?(:close)
65       @deferred = nil
66       alive ? receive_data(nil) : quit
67     end
68   end
70   def ev_write_response(status, headers, body, alive)
71     @state = :headers if alive
72     if body.respond_to?(:errback) && body.respond_to?(:callback)
73       write_headers(status, headers, alive, body) or return hijacked
74       @deferred = body
75       write_body_each(body)
76       deferred_errback(body)
77       deferred_callback(body, alive)
78       return
79     elsif body.respond_to?(:to_path)
80       st = File.stat(path = body.to_path)
82       if st.file?
83         write_headers(status, headers, alive, body) or return hijacked
84         @deferred = stream_file_data(path)
85         deferred_errback(body)
86         deferred_callback(body, alive)
87         return
88       elsif st.socket? || st.pipe?
89         chunk = stream_response_headers(status, headers, alive, body)
90         return hijacked if nil == chunk
91         io = body_to_io(@deferred = body)
92         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
93                     Rainbows::EventMachine::ResponsePipe
94         return EM.watch(io, m, self).notify_readable = true
95       end
96       # char or block device... WTF? fall through to body.each
97     end
98     write_response(status, headers, body, alive) or return hijacked
99     if alive
100       if @deferred.nil?
101         if @buf.empty?
102           set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
103         else
104           EM.next_tick { receive_data(nil) }
105         end
106       end
107     else
108       quit unless @deferred
109     end
110   end
112   def next!
113     @deferred.close if @deferred.respond_to?(:close)
114     @deferred = nil
115     @hp.keepalive? ? receive_data(nil) : quit
116   end
118   def unbind
119     return if @hp.hijacked?
120     async_close = @env[ASYNC_CLOSE] and async_close.succeed
121     @deferred.respond_to?(:fail) and @deferred.fail
122     begin
123       @_io.close
124     rescue Errno::EBADF
125       # EventMachine's EventableDescriptor::Close() may close
126       # the underlying file descriptor without invalidating the
127       # associated IO object on errors, so @_io.closed? isn't
128       # sufficient.
129     end
130   end