remove unnecessary variable assignments
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob8cb3d27c4f48bf3d16aebf4c896d7c08dfd4dc6a
1 # -*- encoding: binary -*-
2 module Rainbows::XEpollThreadSpawn::Client
3   HBUFSIZ = Rainbows.client_header_buffer_size
4   N = Raindrops.new(1)
5   max = Rainbows.server.worker_connections
6   ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
7     Thread.new do
8       sleep
9       buf = ""
10       begin
11         if io = sock.kgio_accept(Rainbows::Client)
12           N.incr(0, 1)
13           io.epoll_once(buf)
14         end
15         sleep while N[0] >= max
16       rescue => e
17         Rainbows::Error.listen_loop(e)
18       end while Rainbows.alive
19     end
20   end
22   ep = SleepyPenguin::Epoll
23   EP = ep.new
24   IN = ep::IN | ep::ET | ep::ONESHOT
25   THRESH = max - 1
26   KATO = {}
27   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
28   LOCK = Mutex.new
29   @@last_expire = Time.now
31   def kato_set
32     LOCK.synchronize { KATO[self] = @@last_expire }
33     EP.set(self, IN)
34   end
36   def kato_delete
37     LOCK.synchronize { KATO.delete self }
38   end
40   def self.loop
41     ACCEPTORS.each { |thr| thr.run }
42     buf = ""
43     begin
44       EP.wait(nil, 1000) { |fl, obj| obj.epoll_run(buf) }
45       expire
46     rescue Errno::EINTR
47     rescue => e
48       Rainbows::Error.listen_loop(e)
49     end while Rainbows.tick || N[0] > 0
50     Rainbows::JoinThreads.acceptors(ACCEPTORS)
51   end
53   def self.expire
54     return if ((now = Time.now) - @@last_expire) < 1.0
55     if (ot = Rainbows.keepalive_timeout) >= 0
56       ot = now - ot
57       defer = []
58       LOCK.synchronize do
59         KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
60       end
61       defer.each { |io| io.closed? or io.close }
62     end
63     @@last_expire = now
64   end
66   def epoll_once(buf)
67     @hp = Rainbows::HttpParser.new
68     epoll_run(buf)
69   end
71   def timeout!(defer)
72     defer << self
73   end
75   def close
76     super
77     kato_delete
78     N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
79   end
81   def handle_error(e)
82     super
83     ensure
84       closed? or close
85   end
87   def epoll_run(buf)
88     case kgio_tryread(HBUFSIZ, buf)
89     when :wait_readable
90       return kato_set
91     when String
92       kato_delete
93       @hp.buf << buf
94       env = @hp.parse and return spawn(env, @hp)
95     else
96       return close
97     end while true
98     rescue => e
99       handle_error(e)
100   end
102   def spawn(env, hp)
103     Thread.new { process_pipeline(env, hp) }
104   end
106   def pipeline_ready(hp)
107     hp.parse and return true
108     case buf = kgio_tryread(HBUFSIZ)
109     when :wait_readable
110       kato_set
111       return false
112     when String
113       hp.buf << buf
114       hp.parse and return true
115       # continue loop
116     else
117       return close
118     end while true
119   end