1 # -*- encoding: binary -*-
3 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
7 # Enables use of the Actor model through
8 # {Revactor}[http://revactor.org] under Ruby 1.9. It spawns one
9 # long-lived Actor for every listen socket in the process and spawns a
10 # new Actor for every client connection accept()-ed.
11 # +worker_connections+ will limit the number of client Actors we have
12 # running at any one time.
14 # Applications using this model are required to be reentrant, but do
15 # not have to worry about race conditions unless they use threads
16 # internally. \Rainbows! does not spawn threads under this model.
17 # Multiple instances of the same app may run in the same address space
18 # sequentially (but at interleaved points). Any network dependencies
19 # in the application using this model should be implemented using the
20 # \Revactor library as well, to take advantage of the networking
21 # concurrency features this model provides.
24 require 'rainbows/revactor/tee_input'
30 # once a client is accepted, it is processed in its entirety here
31 # in 3 easy steps: read request, call app, write app response
32 def process_client(client)
33 defined?(Fcntl::FD_CLOEXEC) and
34 client.instance_eval { @_io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
36 remote_addr = if ::Revactor::TCP::Socket === client
42 buf = client.read(*rd_args)
48 while ! hp.headers(env, buf)
49 buf << client.read(*rd_args)
52 env[Const::CLIENT_IO] = client
53 env[Const::RACK_INPUT] = 0 == hp.content_length ?
54 HttpRequest::NULL_IO :
55 Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
56 env[Const::REMOTE_ADDR] = remote_addr
57 response = app.call(env.update(RACK_DEFAULTS))
59 if 100 == response.first.to_i
60 client.write(Const::EXPECT_100_RESPONSE)
61 env.delete(Const::HTTP_EXPECT)
62 response = app.call(env)
65 alive = hp.keepalive? && G.alive
66 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
67 HttpResponse.write(client, response, out)
68 end while alive and hp.reset.nil? and env.clear
69 rescue ::Revactor::TCP::ReadError
71 Error.write(client.instance_eval { @_io }, e)
76 # runs inside each forked worker, this sits around and waits
77 # for connections and doesn't die until the parent dies (or is
78 # given a INT, QUIT, or TERM signal)
79 def worker_loop(worker)
80 init_worker_process(worker)
81 RD_ARGS[:timeout] = G.kato if G.kato > 0
83 limit = worker_connections
84 actor_exit = Case[:exit, Actor, Object]
86 revactorize_listeners.each do |l, close, accept|
87 Actor.spawn(l, close, accept) do |l, close, accept|
88 Actor.current.trap_exit = true
89 l.controller = l.instance_eval { @receiver = Actor.current }
92 l.disable if l.enabled?
93 logger.info "busy: clients=#{nr} >= limit=#{limit}"
96 f.when(actor_exit) { nr -= 1 }
97 f.after(0.01) {} # another listener could've gotten an exit
101 l.enable unless l.enabled?
104 f.when(actor_exit) { nr -= 1 }
105 f.when(accept) do |_, _, s|
107 Actor.spawn_link(s) { |c| process_client(c) }
115 f.when(actor_exit) { nr -= 1 }
120 Actor.sleep 1 while G.tick || nr > 0
121 rescue Errno::EMFILE => e
124 def revactorize_listeners
128 l = ::Revactor::TCP.listen(s, nil)
129 [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
130 T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
132 l = ::Revactor::UNIX.listen(s)
133 [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
134 T[:unix_connection, l, ::Revactor::UNIX::Socket] ]