1 # -*- encoding: binary -*-
2 module Rainbows::XEpollThreadSpawn::Client
3 HBUFSIZ = Rainbows.client_header_buffer_size
5 ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
6 extend Rainbows::WorkerYield
8 def self.included(klass) # included in Rainbows::Client
9 max = Rainbows.server.worker_connections
10 ACCEPTORS.map! do |sock|
14 if io = sock.kgio_accept(klass)
18 worker_yield while N[0] >= max
20 Rainbows::Error.listen_loop(e)
21 end while Rainbows.alive
26 ep = SleepyPenguin::Epoll
28 IN = ep::IN | ep::ET | ep::ONESHOT
30 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
32 @@last_expire = Time.now
35 LOCK.synchronize { KATO[self] = @@last_expire }
40 LOCK.synchronize { KATO.delete self }
46 EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
50 Rainbows::Error.listen_loop(e)
51 end while Rainbows.tick || N[0] > 0
52 Rainbows::JoinThreads.acceptors(ACCEPTORS)
56 return if ((now = Time.now) - @@last_expire) < 1.0
57 if (ot = Rainbows.keepalive_timeout) >= 0
61 KATO.delete_if { |client, time| time < ot and defer << client }
63 defer.each { |io| io.closed? or io.close }
69 @hp = Rainbows::HttpParser.new
87 case kgio_tryread(HBUFSIZ, buf)
93 env = @hp.parse and return spawn(env, @hp)
102 Thread.new { process_pipeline(env, hp) }
105 def pipeline_ready(hp)
106 hp.parse and return true
107 case buf = kgio_tryread(HBUFSIZ)
113 hp.parse and return true