add client_header_buffer_size tuning parameter
[rainbows.git] / lib / rainbows / xepoll_thread_spawn / client.rb
blob4111f278a8420b8f3e01be8ecd671bd32f06fe10
1 # -*- encoding: binary -*-
2 require "thread"
3 require "sleepy_penguin"
4 require "raindrops"
6 module Rainbows::XEpollThreadSpawn::Client
7   HBUFSIZ = Rainbows.client_header_buffer_size
8   N = Raindrops.new(1)
9   max = Rainbows.server.worker_connections
10   ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
11     Thread.new do
12       sleep
13       buf = ""
14       begin
15         if io = sock.kgio_accept(Rainbows::Client)
16           N.incr(0, 1)
17           io.epoll_once(buf)
18         end
19         sleep while N[0] >= max
20       rescue => e
21         Rainbows::Error.listen_loop(e)
22       end while Rainbows.alive
23     end
24   end
26   ep = SleepyPenguin::Epoll
27   EP = ep.new
28   IN = ep::IN | ep::ET | ep::ONESHOT
29   THRESH = max - 1
30   KATO = {}
31   KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
32   LOCK = Mutex.new
33   @@last_expire = Time.now
35   def kato_set
36     LOCK.synchronize { KATO[self] = @@last_expire }
37     EP.set(self, IN)
38   end
40   def kato_delete
41     LOCK.synchronize { KATO.delete self }
42   end
44   def self.loop
45     ACCEPTORS.each { |thr| thr.run }
46     buf = ""
47     begin
48       EP.wait(nil, 1000) { |fl, obj| obj.epoll_run(buf) }
49       expire
50     rescue Errno::EINTR
51     rescue => e
52       Rainbows::Error.listen_loop(e)
53     end while Rainbows.tick || N[0] > 0
54     Rainbows::JoinThreads.acceptors(ACCEPTORS)
55   end
57   def self.expire
58     return if ((now = Time.now) - @@last_expire) < 1.0
59     if (ot = Rainbows.keepalive_timeout) >= 0
60       ot = now - ot
61       defer = []
62       LOCK.synchronize do
63         KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
64       end
65       defer.each { |io| io.closed? or io.close }
66     end
67     @@last_expire = now
68   end
70   def epoll_once(buf)
71     @hp = Rainbows::HttpParser.new
72     epoll_run(buf)
73   end
75   def timeout!(defer)
76     defer << self
77   end
79   def close
80     super
81     kato_delete
82     N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
83   end
85   def handle_error(e)
86     super
87     ensure
88       closed? or close
89   end
91   def epoll_run(buf)
92     case kgio_tryread(HBUFSIZ, buf)
93     when :wait_readable
94       return kato_set
95     when String
96       kato_delete
97       @hp.buf << buf
98       env = @hp.parse and return spawn(env, @hp)
99     else
100       return close
101     end while true
102     rescue => e
103       handle_error(e)
104   end
106   def spawn(env, hp)
107     Thread.new { process_pipeline(env, hp) }
108   end
110   def pipeline_ready(hp)
111     env = hp.parse and return env
112     case buf = kgio_tryread(HBUFSIZ)
113     when :wait_readable
114       kato_set
115       return false
116     when String
117       hp.buf << buf
118       env = hp.parse and return env
119       # continue loop
120     else
121       return close
122     end while true
123   end