hijacking support for Rack 1.5.x users
[rainbows.git] / lib / rainbows / epoll / client.rb
blobf6af6faab0b7a66b300f0973a749fa9f4dbf34e9
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
11   KATO = {}
12   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
13   Rainbows.at_quit { KATO.each_key { |k| k.timeout! }.clear }
14   Rainbows.config!(self, :keepalive_timeout)
15   EP = Rainbows::EP
16   ReRun = []
17   @@last_expire = Time.now
19   def self.expire
20     return if ((now = Time.now) - @@last_expire) < 1.0
21     if (ot = KEEPALIVE_TIMEOUT) >= 0
22       ot = now - ot
23       KATO.delete_if { |client, time| time < ot and client.timeout! }
24     end
25     @@last_expire = now
26   end
28   def self.loop
29     begin
30       EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
31       while obj = ReRun.shift
32         obj.epoll_run
33       end
34       expire
35     rescue Errno::EINTR
36     rescue => e
37       Rainbows::Error.listen_loop(e)
38     end while Rainbows.tick || Server.nr > 0
39   end
41   # only call this once
42   def epoll_once
43     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
44     post_init
45     on_readable
46     rescue => e
47       handle_error(e)
48   end
50   def on_readable
51     case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
52     when String
53       on_read(rv)
54       return if @wr_queue[0] || closed?
55       return hijacked if @hp.hijacked?
56     when :wait_readable
57       KATO[self] = @@last_expire if :headers == @state
58       return EP.set(self, IN)
59     else
60       break
61     end until :close == @state
62     close unless closed?
63     rescue Errno::ECONNRESET
64       close
65     rescue IOError
66   end
68   def app_call input # called by on_read()
69     @env[RACK_INPUT] = input
70     @env[REMOTE_ADDR] = kgio_addr
71     @hp.hijack_setup(@env, self)
72     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
73     return hijacked if @hp.hijacked?
74     ev_write_response(status, headers, body, @hp.next?)
75   end
77   def write_response_path(status, headers, body, alive)
78     io = body_to_io(body)
79     st = io.stat
81     if st.file?
82       defer_file(status, headers, body, alive, io, st)
83     elsif st.socket? || st.pipe?
84       chunk = stream_response_headers(status, headers, alive, body)
85       return hijacked if nil == chunk
86       stream_response_body(body, io, chunk)
87     else
88       # char or block device... WTF?
89       write_response(status, headers, body, alive)
90     end
91   end
93   # used for streaming sockets and pipes
94   def stream_response_body(body, io, chunk)
95     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
96                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
97     return @wr_queue << pipe if @wr_queue[0]
98     stream_pipe(pipe) or return
99     @wr_queue[0] or @wr_queue << Z
100   end
102   def ev_write_response(status, headers, body, alive)
103     @state = alive ? :headers : :close
104     if body.respond_to?(:to_path)
105       write_response_path(status, headers, body, alive)
106     else
107       write_response(status, headers, body, alive)
108     end
109     return hijacked if @hp.hijacked?
110     # try to read more if we didn't have to buffer writes
111     next_request if alive && 0 == @wr_queue.size
112   end
114   def hijacked
115     KATO.delete(self)
116     Server.decr # no other place to do this
117     EP.delete(self)
118     nil
119   end
121   def next_request
122     if 0 == @buf.size
123       want_more
124     else
125       # pipelined request (already in buffer)
126       on_read(Z)
127       return if @wr_queue[0] || closed?
128       return hijacked if @hp.hijacked?
129       close if :close == @state
130     end
131   end
133   def epoll_run
134     if @wr_queue[0]
135       on_writable
136     else
137       KATO.delete self
138       on_readable
139     end
140   end
142   def want_more
143     ReRun << self
144   end
146   def on_deferred_write_complete
147     :close == @state and return close
148     next_request
149   end
151   def handle_error(e)
152     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
153     ensure
154       close
155   end
157   def write_deferred(obj)
158     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
159   end
161   # writes until our write buffer is empty or we block
162   # returns true if we're done writing everything
163   def on_writable
164     obj = @wr_queue.shift
166     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
167     when nil
168       obj = @wr_queue.shift or return on_deferred_write_complete
169     when String
170       obj = rv # retry
171     when :wait_writable # Strings and StreamFiles only
172       @wr_queue.unshift(obj)
173       EP.set(self, OUT)
174       return
175     when :deferred
176       return
177     end while true
178     rescue => e
179       handle_error(e)
180   end
182   def write(buf)
183     unless @wr_queue[0]
184       case rv = kgio_trywrite(buf)
185       when nil
186         return # all written
187       when String
188         buf = rv # retry
189       when :wait_writable
190         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
191         return EP.set(self, OUT)
192       end while true
193     end
194     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
195   end
197   def close
198     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
199     super
200     on_close
201   end
203   def on_close
204     KATO.delete(self)
205     Server.decr
206   end
208   def timeout!
209     shutdown
210     true
211   end
213   # Rack apps should not hijack here, but they may...
214   def defer_file(status, headers, body, alive, io, st)
215     if r = sendfile_range(status, headers)
216       status, headers, range = r
217       write_headers(status, headers, alive, body) or return hijacked
218       range and defer_file_stream(range[0], range[1], io, body)
219     else
220       write_headers(status, headers, alive, body) or return hijacked
221       defer_file_stream(0, st.size, io, body)
222     end
223   end
225   # returns +nil+ on EOF, :wait_writable if the client blocks
226   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
227     case n = trysendfile(sf, sf.offset, sf.count)
228     when Integer
229       sf.offset += n
230       0 == (sf.count -= n) and return sf.close
231     else
232       return n # :wait_writable or nil
233     end while true
234     rescue
235       sf.close
236       raise
237   end
239   def defer_file_stream(offset, count, io, body)
240     sf = Rainbows::StreamFile.new(offset, count, io, body)
241     unless @wr_queue[0]
242       stream_file(sf) or return
243     end
244     @wr_queue << sf
245     EP.set(self, OUT)
246   end
248   # this alternates between a push and pull model from the pipe -> client
249   # to avoid having too much data in userspace on either end.
250   def stream_pipe(pipe)
251     case buf = pipe.tryread
252     when String
253       write(buf)
254       if @wr_queue[0]
255         # client is blocked on write, client will pull from pipe later
256         EP.delete pipe
257         @wr_queue << pipe
258         EP.set(self, OUT)
259         return :deferred
260       end
261       # continue looping...
262     when :wait_readable
263       # pipe blocked on read, let the pipe push to the client in the future
264       EP.delete self
265       EP.set(pipe, IN)
266       return :deferred
267     else # nil => EOF
268       return pipe.close # nil
269     end while true
270     rescue
271       pipe.close
272       raise
273   end