fiber/base: reuse process_client logic in base
[rainbows.git] / lib / rainbows / fiber / base.rb
blob0298948e9511dc7d153cbd89018e2d85f6250689
1 # -*- encoding: binary -*-
2 require 'rainbows/fiber/io'
4 module Rainbows
5   module Fiber
7     # blocked readers (key: fileno, value: Rainbows::Fiber::IO object)
8     RD = []
10     # blocked writers (key: fileno, value: Rainbows::Fiber::IO object)
11     WR = []
13     # sleeping fibers go here (key: Fiber object, value: wakeup time)
14     ZZ = {}.compare_by_identity
16     # puts the current Fiber into uninterruptible sleep for at least
17     # +seconds+.  Unlike Kernel#sleep, this it is not possible to sleep
18     # indefinitely to be woken up (nobody wants that in a web server,
19     # right?).  Calling this directly is deprecated, use
20     # Rainbows.sleep(seconds) instead.
21     def self.sleep(seconds)
22       ZZ[::Fiber.current] = Time.now + seconds
23       ::Fiber.yield
24     end
26     # base module used by FiberSpawn and FiberPool
27     module Base
28       include Rainbows::Base
30       # the scheduler method that powers both FiberSpawn and FiberPool
31       # concurrency models.  It times out idle clients and attempts to
32       # schedules ones that were blocked on I/O.  At most it'll sleep
33       # for one second (returned by the schedule_sleepers method) which
34       # will cause it.
35       def schedule(&block)
36         ret = begin
37           G.tick
38           RD.compact.each { |c| c.f.resume } # attempt to time out idle clients
39           t = schedule_sleepers
40           Kernel.select(RD.compact.concat(LISTENERS),
41                         WR.compact, nil, t) or return
42         rescue Errno::EINTR
43           retry
44         rescue Errno::EBADF, TypeError
45           LISTENERS.compact!
46           raise
47         end or return
49         # active writers first, then _all_ readers for keepalive timeout
50         ret[1].concat(RD.compact).each { |c| c.f.resume }
52         # accept is an expensive syscall, filter out listeners we don't want
53         (ret[0] & LISTENERS).each(&block)
54       end
56       # wakes up any sleepers that need to be woken and
57       # returns an interval to IO.select on
58       def schedule_sleepers
59         max = nil
60         now = Time.now
61         fibs = []
62         ZZ.delete_if { |fib, time|
63           if now >= time
64             fibs << fib
65           else
66             max = time
67             false
68           end
69         }
70         fibs.each { |fib| fib.resume }
71         now = Time.now
72         max.nil? || max > (now + 1) ? 1 : max - now
73       end
75       def write_body(client, body)
76         body.each { |chunk| client.write(chunk) }
77         ensure
78           body.respond_to?(:close) and body.close
79       end
81       def wait_headers_readable(client)
82         io = client.to_io
83         expire = nil
84         begin
85           return io.recv_nonblock(1, Socket::MSG_PEEK)
86         rescue Errno::EAGAIN
87           return if expire && expire < Time.now
88           expire ||= Time.now + G.kato
89           client.wait_readable
90           retry
91         end
92       end
94       def process_client(client)
95         G.cur += 1
96         super(client) # see Rainbows::Base
97       ensure
98         G.cur -= 1
99         ZZ.delete(client.f)
100       end
102     end
103   end