1 # -*- encoding: binary -*-
3 require "sleepy_penguin"
6 module Rainbows::XEpollThreadSpawn::Client
8 max = Rainbows.server.worker_connections
9 ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
13 if io = sock.kgio_accept(Rainbows::Client)
17 sleep while N[0] >= max
19 Rainbows::Error.listen_loop(e)
20 end while Rainbows.alive
24 ep = SleepyPenguin::Epoll
26 IN = ep::IN | ep::ET | ep::ONESHOT
29 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
31 @@last_expire = Time.now
34 LOCK.synchronize { KATO[self] = @@last_expire }
39 LOCK.synchronize { KATO.delete self }
43 ACCEPTORS.each { |thr| thr.run }
45 EP.wait(nil, 1000) { |fl, obj| obj.epoll_run }
49 Rainbows::Error.listen_loop(e)
50 end while Rainbows.tick || N[0] > 0
51 Rainbows::JoinThreads.acceptors(ACCEPTORS)
55 return if ((now = Time.now) - @@last_expire) < 1.0
56 if (ot = Rainbows.keepalive_timeout) >= 0
60 KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
62 defer.each { |io| io.closed? or io.close }
68 @hp = Rainbows::HttpParser.new
80 N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
90 case kgio_tryread(0x4000, @buf2)
96 env = @hp.parse and return spawn(env, @hp)
105 Thread.new { process_pipeline(env, hp) }
108 def pipeline_ready(hp)
109 env = hp.parse and return env
110 case kgio_tryread(0x4000, @buf2)
116 env = hp.parse and return env