1 # -*- encoding: binary -*-
3 module Rainbows::XEpollThreadSpawn::Client
4 Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
6 ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
7 extend Rainbows::WorkerYield
9 def self.included(klass) # included in Rainbows::Client
10 max = Rainbows.server.worker_connections
11 ACCEPTORS.map! do |sock|
15 if io = sock.kgio_accept(klass)
19 worker_yield while N[0] >= max
21 Rainbows::Error.listen_loop(e)
22 end while Rainbows.alive
27 ep = SleepyPenguin::Epoll
29 IN = ep::IN | ep::ET | ep::ONESHOT
31 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
35 LOCK.synchronize { clients = KATO.keys; KATO.clear }
36 clients.each { |io| io.closed? or io.shutdown }
38 @@last_expire = Time.now
41 LOCK.synchronize { KATO[self] = @@last_expire }
46 LOCK.synchronize { KATO.delete self }
52 EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
56 Rainbows::Error.listen_loop(e)
57 end while Rainbows.tick || N[0] > 0
58 Rainbows::JoinThreads.acceptors(ACCEPTORS)
62 return if ((now = Time.now) - @@last_expire) < 1.0
63 if (ot = KEEPALIVE_TIMEOUT) >= 0
67 KATO.delete_if { |client, time| time < ot and defer << client }
69 defer.each { |io| io.closed? or io.close }
75 @hp = Rainbows::HttpParser.new
93 case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
98 env = @hp.add_parse(buf) and return spawn(env, @hp)
107 Thread.new { process_pipeline(env, hp) }
110 def pipeline_ready(hp)
111 hp.parse and return true
112 case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
117 hp.add_parse(buf) and return true