eliminate G constant and just use the Rainbows! module
[rainbows.git] / lib / rainbows / revactor.rb
bloba33583560da2eede1ac87d80ecac51baa6eaf919
1 # -*- encoding: binary -*-
2 require 'revactor'
3 require 'fcntl'
4 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
6 # Enables use of the Actor model through
7 # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
8 # long-lived Actor for every listen socket in the process and spawns a
9 # new Actor for every client connection accept()-ed.
10 # +worker_connections+ will limit the number of client Actors we have
11 # running at any one time.
13 # Applications using this model are required to be reentrant, but do
14 # not have to worry about race conditions unless they use threads
15 # internally.  \Rainbows! does not spawn threads under this model.
16 # Multiple instances of the same app may run in the same address space
17 # sequentially (but at interleaved points).  Any network dependencies
18 # in the application using this model should be implemented using the
19 # \Revactor library as well, to take advantage of the networking
20 # concurrency features this model provides.
21 module Rainbows::Revactor
22   autoload :Client, 'rainbows/revactor/client'
23   autoload :Proxy, 'rainbows/revactor/proxy'
25   include Rainbows::Base
27   # runs inside each forked worker, this sits around and waits
28   # for connections and doesn't die until the parent dies (or is
29   # given a INT, QUIT, or TERM signal)
30   def worker_loop(worker) #:nodoc:
31     Client.setup
32     init_worker_process(worker)
33     nr = 0
34     limit = worker_connections
35     actor_exit = Case[:exit, Actor, Object]
37     revactorize_listeners.each do |l, close, accept|
38       Actor.spawn(l, close, accept) do |l, close, accept|
39         Actor.current.trap_exit = true
40         l.controller = l.instance_variable_set(:@receiver, Actor.current)
41         begin
42           while nr >= limit
43             l.disable if l.enabled?
44             logger.info "busy: clients=#{nr} >= limit=#{limit}"
45             Actor.receive do |f|
46               f.when(close) {}
47               f.when(actor_exit) { nr -= 1 }
48               f.after(0.01) {} # another listener could've gotten an exit
49             end
50           end
52           l.enable unless l.enabled?
53           Actor.receive do |f|
54             f.when(close) {}
55             f.when(actor_exit) { nr -= 1 }
56             f.when(accept) do |_, _, s|
57               nr += 1
58               Actor.spawn_link(s) { |c| Client.new(c).process_loop }
59             end
60           end
61         rescue => e
62           Rainbows::Error.listen_loop(e)
63         end while Rainbows.alive
64         Actor.receive do |f|
65           f.when(close) {}
66           f.when(actor_exit) { nr -= 1 }
67         end while nr > 0
68       end
69     end
71     Actor.sleep 1 while Rainbows.tick || nr > 0
72     rescue Errno::EMFILE
73       # ignore, let another worker process take it
74   end
76   def revactorize_listeners
77     LISTENERS.map do |s|
78       case s
79       when TCPServer
80         l = Revactor::TCP.listen(s, nil)
81         [ l, T[:tcp_closed, Revactor::TCP::Socket],
82           T[:tcp_connection, l, Revactor::TCP::Socket] ]
83       when UNIXServer
84         l = Revactor::UNIX.listen(s)
85         [ l, T[:unix_closed, Revactor::UNIX::Socket ],
86           T[:unix_connection, l, Revactor::UNIX::Socket] ]
87       end
88     end
89   end
90   # :startdoc:
91 end