xepoll_thread_spawn: fix race condition with acceptors
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob9735f644f86ba531be736332fa8f43c08ef4070b
1 # -*- encoding: binary -*-
2 require "thread"
3 require "sleepy_penguin"
4 require "raindrops"
6 module Rainbows::XEpollThreadSpawn::Client
7   N = Raindrops.new(1)
8   max = Rainbows.server.worker_connections
9   ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
10     Thread.new do
11       sleep
12       begin
13         if io = sock.kgio_accept(Rainbows::Client)
14           N.incr(0, 1)
15           io.epoll_once
16         end
17         sleep while N[0] >= max
18       rescue => e
19         Rainbows::Error.listen_loop(e)
20       end while Rainbows.alive
21     end
22   end
24   ep = SleepyPenguin::Epoll
25   EP = ep.new
26   IN = ep::IN | ep::ET | ep::ONESHOT
27   THRESH = max - 1
28   KATO = {}
29   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
30   LOCK = Mutex.new
31   @@last_expire = Time.now
33   def kato_set
34     LOCK.synchronize { KATO[self] = @@last_expire }
35     EP.set(self, IN)
36   end
38   def kato_delete
39     LOCK.synchronize { KATO.delete self }
40   end
42   def self.loop
43     ACCEPTORS.each { |thr| thr.run }
44     begin
45       EP.wait(nil, 1000) { |fl, obj| obj.epoll_run }
46       expire
47     rescue Errno::EINTR
48     rescue => e
49       Rainbows::Error.listen_loop(e)
50     end while Rainbows.tick || N[0] > 0
51     Rainbows::JoinThreads.acceptors(ACCEPTORS)
52   end
54   def self.expire
55     return if ((now = Time.now) - @@last_expire) < 1.0
56     if (ot = Rainbows.keepalive_timeout) >= 0
57       ot = now - ot
58       defer = []
59       LOCK.synchronize do
60         KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
61       end
62       defer.each { |io| io.closed? or io.close }
63     end
64     @@last_expire = now
65   end
67   def epoll_once
68     @hp = Rainbows::HttpParser.new
69     @buf2 = ""
70     epoll_run
71   end
73   def timeout!(defer)
74     defer << self
75   end
77   def close
78     super
79     kato_delete
80     N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
81   end
83   def handle_error(e)
84     super
85     ensure
86       closed? or close
87   end
89   def epoll_run
90     case kgio_tryread(0x4000, @buf2)
91     when :wait_readable
92       return kato_set
93     when String
94       kato_delete
95       @hp.buf << @buf2
96       env = @hp.parse and return spawn(env, @hp)
97     else
98       return close
99     end while true
100     rescue => e
101       handle_error(e)
102   end
104   def spawn(env, hp)
105     Thread.new { process_pipeline(env, hp) }
106   end
108   def pipeline_ready(hp)
109     env = hp.parse and return env
110     case kgio_tryread(0x4000, @buf2)
111     when :wait_readable
112       kato_set
113       return false
114     when String
115       hp.buf << @buf2
116       env = hp.parse and return env
117       # continue loop
118     else
119       return close
120     end while true
121   end