epoll_wait: flags argument is unused
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob118836bdc120506fa1d1798835d338ccc1424b63
1 # -*- encoding: binary -*-
2 module Rainbows::XEpollThreadSpawn::Client
3   HBUFSIZ = Rainbows.client_header_buffer_size
4   N = Raindrops.new(1)
5   ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
6   extend Rainbows::WorkerYield
8   def self.included(klass) # included in Rainbows::Client
9     max = Rainbows.server.worker_connections
10     ACCEPTORS.map! do |sock|
11       Thread.new do
12         buf = ""
13         begin
14           if io = sock.kgio_accept(klass)
15             N.incr(0, 1)
16             io.epoll_once(buf)
17           end
18           worker_yield while N[0] >= max
19         rescue => e
20           Rainbows::Error.listen_loop(e)
21         end while Rainbows.alive
22       end
23     end
24   end
26   ep = SleepyPenguin::Epoll
27   EP = ep.new
28   IN = ep::IN | ep::ET | ep::ONESHOT
29   KATO = {}
30   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
31   LOCK = Mutex.new
32   @@last_expire = Time.now
34   def kato_set
35     LOCK.synchronize { KATO[self] = @@last_expire }
36     EP.set(self, IN)
37   end
39   def kato_delete
40     LOCK.synchronize { KATO.delete self }
41   end
43   def self.loop
44     buf = ""
45     begin
46       EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
47       expire
48     rescue Errno::EINTR
49     rescue => e
50       Rainbows::Error.listen_loop(e)
51     end while Rainbows.tick || N[0] > 0
52     Rainbows::JoinThreads.acceptors(ACCEPTORS)
53   end
55   def self.expire
56     return if ((now = Time.now) - @@last_expire) < 1.0
57     if (ot = Rainbows.keepalive_timeout) >= 0
58       ot = now - ot
59       defer = []
60       LOCK.synchronize do
61         KATO.delete_if { |client, time| time < ot and defer << client }
62       end
63       defer.each { |io| io.closed? or io.close }
64     end
65     @@last_expire = now
66   end
68   def epoll_once(buf)
69     @hp = Rainbows::HttpParser.new
70     epoll_run(buf)
71   end
73   def close
74     super
75     kato_delete
76     N.decr(0, 1)
77     nil
78   end
80   def handle_error(e)
81     super
82     ensure
83       closed? or close
84   end
86   def epoll_run(buf)
87     case kgio_tryread(HBUFSIZ, buf)
88     when :wait_readable
89       return kato_set
90     when String
91       kato_delete
92       @hp.buf << buf
93       env = @hp.parse and return spawn(env, @hp)
94     else
95       return close
96     end while true
97     rescue => e
98       handle_error(e)
99   end
101   def spawn(env, hp)
102     Thread.new { process_pipeline(env, hp) }
103   end
105   def pipeline_ready(hp)
106     hp.parse and return true
107     case buf = kgio_tryread(HBUFSIZ)
108     when :wait_readable
109       kato_set
110       return false
111     when String
112       hp.buf << buf
113       hp.parse and return true
114       # continue loop
115     else
116       return close
117     end while true
118   end