1 # -*- encoding: binary -*-
3 require 'rainbows/fiber'
4 require 'rainbows/fiber/io'
10 # keep-alive timeout class
11 class Kato < ::Rev::TimerWatcher
19 enable unless enabled?
24 while f = @watch.shift
31 class Heartbeat < ::Rev::TimerWatcher
33 exit if (! G.tick && G.cur <= 0)
37 class Sleeper < ::Rev::TimerWatcher
39 def initialize(seconds)
42 attach(::Rev::Loop.default)
51 class Server < ::Rev::IOWatcher
54 include Rainbows::Const
55 FIO = Rainbows::Fiber::IO
72 return if G.cur >= MAX
73 c = Rainbows.accept(@io) and ::Fiber.new { process(c) }.resume
78 client = FIO.new(io, ::Fiber.current)
79 buf = client.read_timeout or return
83 remote_addr = Rainbows.addr(io)
86 buf << (client.read_timeout or return) until hp.headers(env, buf)
88 env[CLIENT_IO] = client
89 env[RACK_INPUT] = 0 == hp.content_length ?
90 HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
91 env[REMOTE_ADDR] = remote_addr
92 response = APP.call(env.update(RACK_DEFAULTS))
94 if 100 == response[0].to_i
95 client.write(EXPECT_100_RESPONSE)
96 env.delete(HTTP_EXPECT)
97 response = APP.call(env)
100 alive = hp.keepalive? && G.alive
101 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
102 HttpResponse.write(client, response, out)
103 end while alive and hp.reset.nil? and env.clear
113 class IO # see rainbows/fiber/io for original definition
115 class Watcher < ::Rev::IOWatcher
116 def initialize(fio, flag)
119 attach(::Rev::Loop.default)
126 alias on_writable on_readable
129 undef_method :wait_readable
130 undef_method :wait_writable
133 def initialize(*args)
142 to_io.close unless to_io.closed?
146 @w ||= Watcher.new(self, :w)
147 @w.enable unless @w.enabled?
153 @r ||= Watcher.new(self, :r)
154 @r.enable unless @r.enabled?