prefer Array#[] lookup to Array#first/Array#last
[rainbows.git] / lib / rainbows / fiber / rev.rb
blobb8ec56bd957baa8881266b1eaf5523f0cbdb95cc
1 # -*- encoding: binary -*-
2 require 'rev'
3 require 'rainbows/fiber'
4 require 'rainbows/fiber/io'
6 module Rainbows::Fiber
7   module Rev
8     G = Rainbows::G
10     # keep-alive timeout class
11     class Kato < ::Rev::TimerWatcher
12       def initialize
13         @watch = []
14         super(1, true)
15       end
17       def <<(fiber)
18         @watch << fiber
19         enable unless enabled?
20       end
22       def on_timer
23         @watch.uniq!
24         while f = @watch.shift
25           f.resume if f.alive?
26         end
27         disable
28       end
29     end
31     class Heartbeat < ::Rev::TimerWatcher
32       def on_timer
33         exit if (! G.tick && G.cur <= 0)
34       end
35     end
37     class Sleeper < ::Rev::TimerWatcher
39       def initialize(seconds)
40         @f = ::Fiber.current
41         super(seconds, false)
42         attach(::Rev::Loop.default)
43         ::Fiber.yield
44       end
46       def on_timer
47         @f.resume
48       end
49     end
51     class Server < ::Rev::IOWatcher
52       include Unicorn
53       include Rainbows
54       include Rainbows::Const
55       FIO = Rainbows::Fiber::IO
57       def to_io
58         @io
59       end
61       def initialize(io)
62         @io = io
63         super(self, :r)
64       end
66       def close
67         detach if attached?
68         @io.close
69       end
71       def on_readable
72         return if G.cur >= MAX
73         c = Rainbows.accept(@io) and ::Fiber.new { process(c) }.resume
74       end
76       def process(io)
77         G.cur += 1
78         client = FIO.new(io, ::Fiber.current)
79         buf = client.read_timeout or return
80         hp = HttpParser.new
81         env = {}
82         alive = true
83         remote_addr = Rainbows.addr(io)
85         begin # loop
86           buf << (client.read_timeout or return) until hp.headers(env, buf)
88           env[CLIENT_IO] = client
89           env[RACK_INPUT] = 0 == hp.content_length ?
90                     HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
91           env[REMOTE_ADDR] = remote_addr
92           response = APP.call(env.update(RACK_DEFAULTS))
94           if 100 == response[0].to_i
95             client.write(EXPECT_100_RESPONSE)
96             env.delete(HTTP_EXPECT)
97             response = APP.call(env)
98           end
100           alive = hp.keepalive? && G.alive
101           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
102           HttpResponse.write(client, response, out)
103         end while alive and hp.reset.nil? and env.clear
104       rescue => e
105         Error.write(io, e)
106       ensure
107         G.cur -= 1
108         client.close
109       end
110     end
111   end
113   class IO # see rainbows/fiber/io for original definition
115     class Watcher < ::Rev::IOWatcher
116       def initialize(fio, flag)
117         @fiber = fio.f
118         super(fio, flag)
119         attach(::Rev::Loop.default)
120       end
122       def on_readable
123         @fiber.resume
124       end
126       alias on_writable on_readable
127     end
129     undef_method :wait_readable
130     undef_method :wait_writable
131     undef_method :close
133     def initialize(*args)
134       super
135       @r = @w = false
136     end
138     def close
139       @w.detach if @w
140       @r.detach if @r
141       @r = @w = false
142       to_io.close unless to_io.closed?
143     end
145     def wait_writable
146       @w ||= Watcher.new(self, :w)
147       @w.enable unless @w.enabled?
148       ::Fiber.yield
149       @w.disable
150     end
152     def wait_readable
153       @r ||= Watcher.new(self, :r)
154       @r.enable unless @r.enabled?
155       KATO << f
156       ::Fiber.yield
157       @r.disable
158     end
159   end