1 # -*- encoding: binary -*-
2 require 'rainbows/fiber/io'
7 # blocked readers (key: fileno, value: Rainbows::Fiber::IO object)
10 # blocked writers (key: fileno, value: Rainbows::Fiber::IO object)
13 # sleeping fibers go here (key: Fiber object, value: wakeup time)
14 ZZ = {}.compare_by_identity
16 # puts the current Fiber into uninterruptible sleep for at least
17 # +seconds+. Unlike Kernel#sleep, this it is not possible to sleep
18 # indefinitely to be woken up (nobody wants that in a web server,
19 # right?). Calling this directly is deprecated, use
20 # Rainbows.sleep(seconds) instead.
21 def self.sleep(seconds)
22 ZZ[::Fiber.current] = Time.now + seconds
26 # base module used by FiberSpawn and FiberPool
28 include Rainbows::Base
30 # the scheduler method that powers both FiberSpawn and FiberPool
31 # concurrency models. It times out idle clients and attempts to
32 # schedules ones that were blocked on I/O. At most it'll sleep
33 # for one second (returned by the schedule_sleepers method) which
38 RD.compact.each { |c| c.f.resume } # attempt to time out idle clients
40 Kernel.select(RD.compact.concat(LISTENERS),
41 WR.compact, nil, t) or return
44 rescue Errno::EBADF, TypeError
49 # active writers first, then _all_ readers for keepalive timeout
50 ret[1].concat(RD.compact).each { |c| c.f.resume }
52 # accept is an expensive syscall, filter out listeners we don't want
53 (ret[0] & LISTENERS).each(&block)
56 # wakes up any sleepers that need to be woken and
57 # returns an interval to IO.select on
62 ZZ.delete_if { |fib, time|
70 fibs.each { |fib| fib.resume }
72 max.nil? || max > (now + 1) ? 1 : max - now
75 def write_body(client, body)
76 body.each { |chunk| client.write(chunk) }
78 body.respond_to?(:close) and body.close
81 def wait_headers_readable(client)
85 return io.recv_nonblock(1, Socket::MSG_PEEK)
87 return if expire && expire < Time.now
88 expire ||= Time.now + G.kato
94 def process_client(client)
96 super(client) # see Rainbows::Base