6c07b9204dd3fe9433764e9fe5c6e8b96e5df4c3
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob6c07b9204dd3fe9433764e9fe5c6e8b96e5df4c3
1 # -*- encoding: binary -*-
2 # :stopdoc:
3 module Rainbows::XEpollThreadSpawn::Client
4   Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
5   N = Raindrops.new(1)
6   ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
7   extend Rainbows::WorkerYield
9   def self.included(klass) # included in Rainbows::Client
10     max = Rainbows.server.worker_connections
11     ACCEPTORS.map! do |sock|
12       Thread.new do
13         buf = ""
14         begin
15           if io = sock.kgio_accept(klass)
16             N.incr(0, 1)
17             io.epoll_once(buf)
18           end
19           worker_yield while N[0] >= max
20         rescue => e
21           Rainbows::Error.listen_loop(e)
22         end while Rainbows.alive
23       end
24     end
25   end
27   ep = SleepyPenguin::Epoll
28   EP = ep.new
29   IN = ep::IN | ep::ET | ep::ONESHOT
30   KATO = {}
31   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
32   LOCK = Mutex.new
33   Rainbows.at_quit do
34     clients = nil
35     LOCK.synchronize { clients = KATO.keys; KATO.clear }
36     clients.each { |io| io.closed? or io.shutdown }
37   end
38   @@last_expire = Time.now
40   def kato_set
41     LOCK.synchronize { KATO[self] = @@last_expire }
42     EP.set(self, IN)
43   end
45   def kato_delete
46     LOCK.synchronize { KATO.delete self }
47   end
49   def self.loop
50     buf = ""
51     begin
52       EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
53       expire
54     rescue Errno::EINTR
55     rescue => e
56       Rainbows::Error.listen_loop(e)
57     end while Rainbows.tick || N[0] > 0
58     Rainbows::JoinThreads.acceptors(ACCEPTORS)
59   end
61   def self.expire
62     return if ((now = Time.now) - @@last_expire) < 1.0
63     if (ot = KEEPALIVE_TIMEOUT) >= 0
64       ot = now - ot
65       defer = []
66       LOCK.synchronize do
67         KATO.delete_if { |client, time| time < ot and defer << client }
68       end
69       defer.each { |io| io.closed? or io.close }
70     end
71     @@last_expire = now
72   end
74   def epoll_once(buf)
75     @hp = Rainbows::HttpParser.new
76     epoll_run(buf)
77   end
79   def close
80     super
81     kato_delete
82     N.decr(0, 1)
83     nil
84   end
86   def handle_error(e)
87     super
88     ensure
89       closed? or close
90   end
92   def epoll_run(buf)
93     case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
94     when :wait_readable
95       return kato_set
96     when String
97       kato_delete
98       env = @hp.add_parse(buf) and return spawn(env, @hp)
99     else
100       return close
101     end while true
102     rescue => e
103       handle_error(e)
104   end
106   def spawn(env, hp)
107     Thread.new { process_pipeline(env, hp) }
108   end
110   def pipeline_ready(hp)
111     hp.parse and return true
112     case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
113     when :wait_readable
114       kato_set
115       return false
116     when String
117       hp.add_parse(buf) and return true
118       # continue loop
119     else
120       return close
121     end while true
122   end