stream_response_epoll: our most "special" concurrency option yet
[rainbows.git] / lib / rainbows / stream_response_epoll / client.rb
blobcf3056ec162a8d482f6bb35673487fe288385dab
1 # -*- encoding: binary -*-
2 # :enddoc:
3 class Rainbows::StreamResponseEpoll::Client
4   OUT = SleepyPenguin::Epoll::OUT
5   N = Raindrops.new(1)
6   EP = SleepyPenguin::Epoll.new
7   timeout = Rainbows.server.timeout
8   thr = Thread.new do
9     begin
10       EP.wait(nil, timeout) { |_,client| client.epoll_run }
11     rescue Errno::EINTR
12     rescue => e
13       Rainbows::Error.listen_loop(e)
14     end while Rainbows.alive || N[0] > 0
15   end
16   Rainbows.at_quit { thr.join(timeout) }
18   attr_reader :to_io
20   def initialize(io, unwritten)
21     @closed = false
22     @to_io = io.dup
23     @wr_queue = [ unwritten.dup ]
24     EP.set(self, OUT)
25   end
27   def write(str)
28     @wr_queue << str.dup
29   end
31   def close
32     @closed = true
33   end
35   def epoll_run
36     return if @to_io.closed?
37     buf = @wr_queue.shift or return on_write_complete
38     case rv = @to_io.kgio_trywrite(buf)
39     when nil
40       buf = @wr_queue.shift or return on_write_complete
41     when String # retry, socket buffer may grow
42       buf = rv
43     when :wait_writable
44       return @wr_queue.unshift(buf)
45     end while true
46     rescue => err
47       @to_io.close
48       N.decr(0, 1)
49   end
51   def on_write_complete
52     if @closed
53       @to_io.close
54       N.decr(0, 1)
55     end
56   end
57 end