restore Rainbows::HttpResponse.write for Cramp
[rainbows.git] / lib / rainbows / fiber / rev.rb
bloba1ffe33829c516181179c8d2962576cbab501926
1 # -*- encoding: binary -*-
2 require 'rev'
3 require 'rainbows/fiber'
4 require 'rainbows/fiber/io'
6 module Rainbows::Fiber
7   module Rev
8     G = Rainbows::G
10     # keep-alive timeout class
11     class Kato < ::Rev::TimerWatcher
12       def initialize
13         @watch = []
14         super(1, true)
15       end
17       def <<(fiber)
18         @watch << fiber
19         enable unless enabled?
20       end
22       def on_timer
23         @watch.uniq!
24         while f = @watch.shift
25           f.resume if f.alive?
26         end
27         disable
28       end
29     end
31     class Heartbeat < ::Rev::TimerWatcher
32       def on_timer
33         exit if (! G.tick && G.cur <= 0)
34       end
35     end
37     class Sleeper < ::Rev::TimerWatcher
39       def initialize(seconds)
40         @f = ::Fiber.current
41         super(seconds, false)
42         attach(::Rev::Loop.default)
43         ::Fiber.yield
44       end
46       def on_timer
47         @f.resume
48       end
49     end
51     class Server < ::Rev::IOWatcher
52       include Unicorn
53       include Rainbows
54       include Rainbows::Const
55       include Rainbows::Response
56       FIO = Rainbows::Fiber::IO
58       def to_io
59         @io
60       end
62       def initialize(io)
63         @io = io
64         super(self, :r)
65       end
67       def close
68         detach if attached?
69         @io.close
70       end
72       def on_readable
73         return if G.cur >= MAX
74         c = Rainbows.accept(@io) and ::Fiber.new { process(c) }.resume
75       end
77       def process(io)
78         G.cur += 1
79         client = FIO.new(io, ::Fiber.current)
80         buf = client.read_timeout or return
81         hp = HttpParser.new
82         env = {}
83         alive = true
84         remote_addr = Rainbows.addr(io)
86         begin # loop
87           buf << (client.read_timeout or return) until hp.headers(env, buf)
89           env[CLIENT_IO] = client
90           env[RACK_INPUT] = 0 == hp.content_length ?
91                     HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
92           env[REMOTE_ADDR] = remote_addr
93           response = APP.call(env.update(RACK_DEFAULTS))
95           if 100 == response[0].to_i
96             client.write(EXPECT_100_RESPONSE)
97             env.delete(HTTP_EXPECT)
98             response = APP.call(env)
99           end
101           alive = hp.keepalive? && G.alive
102           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
103           write_response(client, response, out)
104         end while alive and hp.reset.nil? and env.clear
105       rescue => e
106         Error.write(io, e)
107       ensure
108         G.cur -= 1
109         client.close
110       end
111     end
112   end
114   class IO # see rainbows/fiber/io for original definition
116     class Watcher < ::Rev::IOWatcher
117       def initialize(fio, flag)
118         @fiber = fio.f
119         super(fio, flag)
120         attach(::Rev::Loop.default)
121       end
123       def on_readable
124         @fiber.resume
125       end
127       alias on_writable on_readable
128     end
130     undef_method :wait_readable
131     undef_method :wait_writable
132     undef_method :close
134     def initialize(*args)
135       super
136       @r = @w = false
137     end
139     def close
140       @w.detach if @w
141       @r.detach if @r
142       @r = @w = false
143       to_io.close unless to_io.closed?
144     end
146     def wait_writable
147       @w ||= Watcher.new(self, :w)
148       @w.enable unless @w.enabled?
149       ::Fiber.yield
150       @w.disable
151     end
153     def wait_readable
154       @r ||= Watcher.new(self, :r)
155       @r.enable unless @r.enabled?
156       KATO << f
157       ::Fiber.yield
158       @r.disable
159     end
160   end