xepoll_thread_*/client: EPOLLONESHOT implies EPOLLET
[rainbows.git] / lib / rainbows / xepoll_thread_pool / client.rb
blob001e69d0965c731cc59c921cbe248fdd0b3ad42b
1 # -*- encoding: binary -*-
2 # :enddoc:
3 # FIXME: lots of duplication from xepolll_thread_spawn/client
5 module Rainbows::XEpollThreadPool::Client
6   Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
7   N = Raindrops.new(1)
8   ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
9   extend Rainbows::WorkerYield
11   def self.included(klass) # included in Rainbows::Client
12     max = Rainbows.server.worker_connections
13     ACCEPTORS.map! do |sock|
14       Thread.new do
15         buf = ""
16         begin
17           if io = sock.kgio_accept(klass)
18             N.incr(0, 1)
19             io.epoll_once(buf)
20           end
21           worker_yield while N[0] >= max
22         rescue => e
23           Rainbows::Error.listen_loop(e)
24         end while Rainbows.alive
25       end
26     end
27   end
29   def self.app_run(queue)
30     while client = queue.pop
31       client.run
32     end
33   end
35   QUEUE = Queue.new
36   Rainbows::O[:pool_size].times { Thread.new { app_run(QUEUE) } }
38   ep = SleepyPenguin::Epoll
39   EP = ep.new
40   IN = ep::IN | ep::ONESHOT
41   KATO = {}
42   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
43   LOCK = Mutex.new
44   Rainbows.at_quit do
45     clients = nil
46     LOCK.synchronize { clients = KATO.keys; KATO.clear }
47     clients.each { |io| io.closed? or io.close }
48   end
49   @@last_expire = Time.now
51   def kato_set
52     LOCK.synchronize { KATO[self] = @@last_expire }
53     EP.set(self, IN)
54   end
56   def kato_delete
57     LOCK.synchronize { KATO.delete self }
58   end
60   def self.loop
61     buf = ""
62     begin
63       EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
64       expire
65     rescue Errno::EINTR
66     rescue => e
67       Rainbows::Error.listen_loop(e)
68     end while Rainbows.tick || N[0] > 0
69     Rainbows::JoinThreads.acceptors(ACCEPTORS)
70   end
72   def self.expire
73     return if ((now = Time.now) - @@last_expire) < 1.0
74     if (ot = KEEPALIVE_TIMEOUT) >= 0
75       ot = now - ot
76       defer = []
77       LOCK.synchronize do
78         KATO.delete_if { |client, time| time < ot and defer << client }
79       end
80       defer.each { |io| io.closed? or io.shutdown }
81     end
82     @@last_expire = now
83   end
85   def epoll_once(buf)
86     @hp = Rainbows::HttpParser.new
87     epoll_run(buf)
88   end
90   def close
91     super
92     kato_delete
93     N.decr(0, 1)
94     nil
95   end
97   def handle_error(e)
98     super
99     ensure
100       closed? or close
101   end
103   def queue!
104     QUEUE << self
105     false
106   end
108   def epoll_run(buf)
109     case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
110     when :wait_readable
111       return kato_set
112     when String
113       kato_delete
114       @hp.add_parse(buf) and return queue!
115     else
116       return close
117     end while true
118     rescue => e
119       handle_error(e)
120   end
122   def run
123     process_pipeline(@hp.env, @hp)
124   end
126   def pipeline_ready(hp)
127     # be fair to other clients, let others run first
128     hp.parse and return queue!
129     epoll_run("")
130     false
131   end