Rainbows! 5.2.1
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob8eebbb07d0b8ba9ebb04ac78a7046c3016ca4539
1 # -*- encoding: binary -*-
2 # :stopdoc:
3 module Rainbows::XEpollThreadSpawn::Client
4   Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
5   N = Raindrops.new(1)
6   ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
7   extend Rainbows::WorkerYield
9   def self.included(klass) # included in Rainbows::Client
10     max = Rainbows.server.worker_connections
11     ACCEPTORS.map! do |sock|
12       Thread.new do
13         buf = ""
14         begin
15           if io = sock.kgio_accept(klass)
16             N.incr(0, 1)
17             io.epoll_once(buf)
18           end
19           worker_yield while N[0] >= max
20         rescue => e
21           Rainbows::Error.listen_loop(e)
22         end while Rainbows.alive
23       end
24     end
25   end
27   ep = SleepyPenguin::Epoll
28   EP = ep.new
29   IN = ep::IN | ep::ONESHOT
30   KATO = {}.compare_by_identity
31   LOCK = Mutex.new
32   Rainbows.at_quit do
33     clients = nil
34     LOCK.synchronize { clients = KATO.keys; KATO.clear }
35     clients.each { |io| io.closed? or io.shutdown }
36   end
37   @@last_expire = Rainbows.now
39   def kato_set
40     LOCK.synchronize { KATO[self] = @@last_expire }
41     EP.set(self, IN)
42   end
44   def kato_delete
45     LOCK.synchronize { KATO.delete self }
46   end
48   def self.loop
49     buf = ""
50     begin
51       EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
52       expire
53     rescue Errno::EINTR
54     rescue => e
55       Rainbows::Error.listen_loop(e)
56     end while Rainbows.tick || N[0] > 0
57     Rainbows::JoinThreads.acceptors(ACCEPTORS)
58   end
60   def self.expire
61     return if ((now = Rainbows.now) - @@last_expire) < 1.0
62     if (ot = KEEPALIVE_TIMEOUT) >= 0
63       ot = now - ot
64       defer = []
65       LOCK.synchronize do
66         KATO.delete_if { |client, time| time < ot and defer << client }
67       end
68       defer.each { |io| io.closed? or io.close }
69     end
70     @@last_expire = now
71   end
73   def epoll_once(buf)
74     @hp = Rainbows::HttpParser.new
75     epoll_run(buf)
76   end
78   def close
79     super
80     kato_delete
81     N.decr(0, 1)
82     nil
83   end
85   def handle_error(e)
86     super
87   ensure
88     closed? or close
89   end
91   def epoll_run(buf)
92     case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
93     when :wait_readable
94       return kato_set
95     when String
96       kato_delete
97       env = @hp.add_parse(buf) and return spawn(env, @hp)
98     else
99       return close
100     end while true
101   rescue => e
102     handle_error(e)
103   end
105   def spawn(env, hp)
106     Thread.new { process_pipeline(env, hp) }
107   end
109   def pipeline_ready(hp)
110     hp.parse and return true
111     case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
112     when :wait_readable
113       kato_set
114       return false
115     when String
116       hp.add_parse(buf) and return true
117       # continue loop
118     else
119       return close
120     end while true
121   end