event_machine: cleanup confusing assignment
[rainbows.git] / lib / rainbows / event_machine / client.rb
blob26f0dbd3515de0f39dd92d68d0fd3158cd8236b0
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::EventMachine::Client < EM::Connection
4   include Rainbows::EvCore
5   Rainbows.config!(self, :keepalive_timeout)
7   def initialize(io)
8     @_io = io
9     @deferred = nil
10   end
12   alias write send_data
14   def receive_data(data)
15     # To avoid clobbering the current streaming response
16     # (often a static file), we do not attempt to process another
17     # request on the same connection until the first is complete
18     if @deferred
19       if data
20         @buf << data
21         @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
22       end
23       EM.next_tick { receive_data(nil) } unless @buf.empty?
24     else
25       on_read(data || Z) if (@buf.size > 0) || data
26     end
27   end
29   def quit
30     super
31     close_connection_after_writing if nil == @deferred
32   end
34   def app_call input
35     set_comm_inactivity_timeout 0
36     @env[RACK_INPUT] = input
37     @env[REMOTE_ADDR] = @_io.kgio_addr
38     @env[ASYNC_CALLBACK] = method(:write_async_response)
39     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
40     status, headers, body = catch(:async) {
41       APP.call(@env.merge!(RACK_DEFAULTS))
42     }
44     if (nil == status || -1 == status)
45       @deferred = true
46     else
47       ev_write_response(status, headers, body, @hp.next?)
48     end
49   end
51   def deferred_errback(orig_body)
52     @deferred.errback do
53       orig_body.close if orig_body.respond_to?(:close)
54       @deferred = nil
55       quit
56     end
57   end
59   def deferred_callback(orig_body, alive)
60     @deferred.callback do
61       orig_body.close if orig_body.respond_to?(:close)
62       @deferred = nil
63       alive ? receive_data(nil) : quit
64     end
65   end
67   def ev_write_response(status, headers, body, alive)
68     @state = :headers if alive
69     if body.respond_to?(:errback) && body.respond_to?(:callback)
70       @deferred = body
71       write_headers(status, headers, alive)
72       write_body_each(body)
73       deferred_errback(body)
74       deferred_callback(body, alive)
75       return
76     elsif body.respond_to?(:to_path)
77       st = File.stat(path = body.to_path)
79       if st.file?
80         write_headers(status, headers, alive)
81         @deferred = stream_file_data(path)
82         deferred_errback(body)
83         deferred_callback(body, alive)
84         return
85       elsif st.socket? || st.pipe?
86         io = body_to_io(@deferred = body)
87         chunk = stream_response_headers(status, headers, alive)
88         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
89                     Rainbows::EventMachine::ResponsePipe
90         return EM.watch(io, m, self).notify_readable = true
91       end
92       # char or block device... WTF? fall through to body.each
93     end
94     write_response(status, headers, body, alive)
95     if alive
96       if @deferred.nil?
97         if @buf.empty?
98           set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
99         else
100           EM.next_tick { receive_data(nil) }
101         end
102       end
103     else
104       quit unless @deferred
105     end
106   end
108   def next!
109     @deferred.close if @deferred.respond_to?(:close)
110     @deferred = nil
111     @hp.keepalive? ? receive_data(nil) : quit
112   end
114   def unbind
115     async_close = @env[ASYNC_CLOSE] and async_close.succeed
116     @deferred.respond_to?(:fail) and @deferred.fail
117     begin
118       @_io.close
119     rescue Errno::EBADF
120       # EventMachine's EventableDescriptor::Close() may close
121       # the underlying file descriptor without invalidating the
122       # associated IO object on errors, so @_io.closed? isn't
123       # sufficient.
124     end
125   end