cleanup: consolidate write_nonblock error handling
[rainbows.git] / lib / rainbows / fiber / base.rb
blob1617c547f7047274971afaf2489fc31079546c81
1 # -*- encoding: binary -*-
2 require 'rainbows/fiber/io'
4 module Rainbows
5   module Fiber
7     # blocked readers (key: Rainbows::Fiber::IO object, value is irrelevant)
8     RD = {}
10     # blocked writers (key: Rainbows::Fiber::IO object, value is irrelevant)
11     WR = {}
13     # sleeping fibers go here (key: Fiber object, value: wakeup time)
14     ZZ = {}
16     # puts the current Fiber into uninterruptible sleep for at least
17     # +seconds+.  Unlike Kernel#sleep, this it is not possible to sleep
18     # indefinitely to be woken up (nobody wants that in a web server,
19     # right?).
20     def self.sleep(seconds)
21       ZZ[::Fiber.current] = Time.now + seconds
22       ::Fiber.yield
23     end
25     # base module used by FiberSpawn and FiberPool
26     module Base
27       include Rainbows::Base
29       # the scheduler method that powers both FiberSpawn and FiberPool
30       # concurrency models.  It times out idle clients and attempts to
31       # schedules ones that were blocked on I/O.  At most it'll sleep
32       # for one second (returned by the schedule_sleepers method) which
33       # will cause it.
34       def schedule(&block)
35         ret = begin
36           G.tick
37           RD.keys.each { |c| c.f.resume } # attempt to time out idle clients
38           t = schedule_sleepers
39           Kernel.select(RD.keys.concat(LISTENERS), WR.keys, nil, t) or return
40         rescue Errno::EINTR
41           retry
42         rescue Errno::EBADF, TypeError
43           LISTENERS.compact!
44           raise
45         end or return
47         # active writers first, then _all_ readers for keepalive timeout
48         ret[1].concat(RD.keys).each { |c| c.f.resume }
50         # accept is an expensive syscall, filter out listeners we don't want
51         (ret.first & LISTENERS).each(&block)
52       end
54       # wakes up any sleepers that need to be woken and
55       # returns an interval to IO.select on
56       def schedule_sleepers
57         max = nil
58         now = Time.now
59         ZZ.delete_if { |fib, time|
60           if now >= time
61             fib.resume
62             now = Time.now
63           else
64             max = time
65             false
66           end
67         }
68         max.nil? || max > (now + 1) ? 1 : max - now
69       end
71       def process_client(client)
72         G.cur += 1
73         io = client.to_io
74         buf = client.read_timeout or return
75         hp = HttpParser.new
76         env = {}
77         alive = true
78         remote_addr = TCPSocket === io ? io.peeraddr.last : LOCALHOST
80         begin # loop
81           while ! hp.headers(env, buf)
82             buf << (client.read_timeout or return)
83           end
85           env[CLIENT_IO] = client
86           env[RACK_INPUT] = 0 == hp.content_length ?
87                     HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
88           env[REMOTE_ADDR] = remote_addr
89           response = APP.call(env.update(RACK_DEFAULTS))
91           if 100 == response.first.to_i
92             client.write(EXPECT_100_RESPONSE)
93             env.delete(HTTP_EXPECT)
94             response = APP.call(env)
95           end
97           alive = hp.keepalive? && G.alive
98           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
99           HttpResponse.write(client, response, out)
100         end while alive and hp.reset.nil? and env.clear
101       rescue => e
102         Error.write(io, e)
103       ensure
104         G.cur -= 1
105         RD.delete(client)
106         WR.delete(client)
107         ZZ.delete(client.f)
108         io.close unless io.closed?
109       end
111     end
112   end