xepoll_thread_spawn/client: remove rdoc
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob049d4e1c7d3a6eb6a170c9aa7ffd16df8bc543ef
1 # -*- encoding: binary -*-
2 # :stopdoc:
3 module Rainbows::XEpollThreadSpawn::Client
4   HBUFSIZ = Rainbows.client_header_buffer_size
5   N = Raindrops.new(1)
6   ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
7   extend Rainbows::WorkerYield
9   def self.included(klass) # included in Rainbows::Client
10     max = Rainbows.server.worker_connections
11     ACCEPTORS.map! do |sock|
12       Thread.new do
13         buf = ""
14         begin
15           if io = sock.kgio_accept(klass)
16             N.incr(0, 1)
17             io.epoll_once(buf)
18           end
19           worker_yield while N[0] >= max
20         rescue => e
21           Rainbows::Error.listen_loop(e)
22         end while Rainbows.alive
23       end
24     end
25   end
27   ep = SleepyPenguin::Epoll
28   EP = ep.new
29   IN = ep::IN | ep::ET | ep::ONESHOT
30   KATO = {}
31   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
32   LOCK = Mutex.new
33   @@last_expire = Time.now
35   def kato_set
36     LOCK.synchronize { KATO[self] = @@last_expire }
37     EP.set(self, IN)
38   end
40   def kato_delete
41     LOCK.synchronize { KATO.delete self }
42   end
44   def self.loop
45     buf = ""
46     begin
47       EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
48       expire
49     rescue Errno::EINTR
50     rescue => e
51       Rainbows::Error.listen_loop(e)
52     end while Rainbows.tick || N[0] > 0
53     Rainbows::JoinThreads.acceptors(ACCEPTORS)
54   end
56   def self.expire
57     return if ((now = Time.now) - @@last_expire) < 1.0
58     if (ot = Rainbows.keepalive_timeout) >= 0
59       ot = now - ot
60       defer = []
61       LOCK.synchronize do
62         KATO.delete_if { |client, time| time < ot and defer << client }
63       end
64       defer.each { |io| io.closed? or io.close }
65     end
66     @@last_expire = now
67   end
69   def epoll_once(buf)
70     @hp = Rainbows::HttpParser.new
71     epoll_run(buf)
72   end
74   def close
75     super
76     kato_delete
77     N.decr(0, 1)
78     nil
79   end
81   def handle_error(e)
82     super
83     ensure
84       closed? or close
85   end
87   def epoll_run(buf)
88     case kgio_tryread(HBUFSIZ, buf)
89     when :wait_readable
90       return kato_set
91     when String
92       kato_delete
93       @hp.buf << buf
94       env = @hp.parse and return spawn(env, @hp)
95     else
96       return close
97     end while true
98     rescue => e
99       handle_error(e)
100   end
102   def spawn(env, hp)
103     Thread.new { process_pipeline(env, hp) }
104   end
106   def pipeline_ready(hp)
107     hp.parse and return true
108     case buf = kgio_tryread(HBUFSIZ)
109     when :wait_readable
110       kato_set
111       return false
112     when String
113       hp.buf << buf
114       hp.parse and return true
115       # continue loop
116     else
117       return close
118     end while true
119   end