hijacking support for Rack 1.5.x users
[rainbows.git] / lib / rainbows / stream_response_epoll / client.rb
blobdc226d65c33e1644dd7add3ef551958e40597db2
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::StreamResponseEpoll::Client
4   OUT = SleepyPenguin::Epoll::OUT
5   N = Raindrops.new(1)
6   EP = SleepyPenguin::Epoll.new
7   timeout = Rainbows.server.timeout
8   thr = Thread.new do
9     begin
10       EP.wait(nil, timeout) { |_,client| client.epoll_run }
11     rescue Errno::EINTR
12     rescue => e
13       Rainbows::Error.listen_loop(e)
14     end while Rainbows.alive || N[0] > 0
15   end
16   Rainbows.at_quit { thr.join(timeout) }
18   attr_reader :to_io
20   def initialize(io, unwritten)
21     @finish = false
22     @to_io = io
23     @wr_queue = [ unwritten.dup ]
24     EP.set(self, OUT)
25   end
27   def write(str)
28     @wr_queue << str.dup
29   end
31   def close
32     @finish = true
33   end
35   def hijack(hijack)
36     @finish = hijack
37   end
39   def epoll_run
40     return if @to_io.closed?
41     buf = @wr_queue.shift or return on_write_complete
42     case rv = @to_io.kgio_trywrite(buf)
43     when nil
44       buf = @wr_queue.shift or return on_write_complete
45     when String # retry, socket buffer may grow
46       buf = rv
47     when :wait_writable
48       return @wr_queue.unshift(buf)
49     end while true
50     rescue => err
51       @to_io.close
52       N.decr(0, 1)
53   end
55   def on_write_complete
56     if true == @finish
57       @to_io.shutdown
58       @to_io.close
59       N.decr(0, 1)
60     elsif @finish.respond_to?(:call) # hijacked
61       EP.delete(self)
62       N.decr(0, 1)
63       @finish.call(@to_io)
64     end
65   end
66 end