1 # -*- encoding: binary -*-
4 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
6 # Enables use of the Actor model through
7 # {Revactor}[http://revactor.org] under Ruby 1.9. It spawns one
8 # long-lived Actor for every listen socket in the process and spawns a
9 # new Actor for every client connection accept()-ed.
10 # +worker_connections+ will limit the number of client Actors we have
11 # running at any one time.
13 # Applications using this model are required to be reentrant, but do
14 # not have to worry about race conditions unless they use threads
15 # internally. \Rainbows! does not spawn threads under this model.
16 # Multiple instances of the same app may run in the same address space
17 # sequentially (but at interleaved points). Any network dependencies
18 # in the application using this model should be implemented using the
19 # \Revactor library as well, to take advantage of the networking
20 # concurrency features this model provides.
21 module Rainbows::Revactor
26 autoload :Proxy, 'rainbows/revactor/proxy'
28 include Rainbows::Base
29 LOCALHOST = Kgio::LOCALHOST
30 TCP = ::Revactor::TCP::Socket
32 # once a client is accepted, it is processed in its entirety here
33 # in 3 easy steps: read request, call app, write app response
34 def process_client(client) # :nodoc:
35 io = client.instance_variable_get(:@_io)
36 io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
38 remote_addr = if TCP === client
44 hp = Unicorn::HttpParser.new
51 buf << client.read(*rd_args)
54 env[CLIENT_IO] = client
55 env[RACK_INPUT] = 0 == hp.content_length ?
56 NULL_IO : IC.new(ts = TeeSocket.new(client), hp)
57 env[REMOTE_ADDR] = remote_addr
58 status, headers, body = app.call(env.update(RACK_DEFAULTS))
61 client.write(EXPECT_100_RESPONSE)
62 env.delete(HTTP_EXPECT)
63 status, headers, body = app.call(env)
67 headers = HH.new(headers)
68 range = make_range!(env, status, headers) and status = range.shift
69 alive = hp.next? && G.alive && G.kato > 0
70 headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
71 client.write(response_header(status, headers))
72 alive && ts and buf << ts.leftover
74 write_body(client, body, range)
76 rescue ::Revactor::TCP::ReadError
78 Rainbows::Error.write(io, e)
83 # runs inside each forked worker, this sits around and waits
84 # for connections and doesn't die until the parent dies (or is
85 # given a INT, QUIT, or TERM signal)
86 def worker_loop(worker) #:nodoc:
87 init_worker_process(worker)
88 require 'rainbows/revactor/body'
89 self.class.__send__(:include, Rainbows::Revactor::Body)
90 self.class.const_set(:IC, Unicorn::HttpRequest.input_class)
91 RD_ARGS[:timeout] = G.kato if G.kato > 0
93 limit = worker_connections
94 actor_exit = Case[:exit, Actor, Object]
96 revactorize_listeners.each do |l, close, accept|
97 Actor.spawn(l, close, accept) do |l, close, accept|
98 Actor.current.trap_exit = true
99 l.controller = l.instance_variable_set(:@receiver, Actor.current)
102 l.disable if l.enabled?
103 logger.info "busy: clients=#{nr} >= limit=#{limit}"
106 f.when(actor_exit) { nr -= 1 }
107 f.after(0.01) {} # another listener could've gotten an exit
111 l.enable unless l.enabled?
114 f.when(actor_exit) { nr -= 1 }
115 f.when(accept) do |_, _, s|
117 Actor.spawn_link(s) { |c| process_client(c) }
121 Rainbows::Error.listen_loop(e)
125 f.when(actor_exit) { nr -= 1 }
130 Actor.sleep 1 while G.tick || nr > 0
132 # ignore, let another worker process take it
135 def revactorize_listeners
139 l = ::Revactor::TCP.listen(s, nil)
140 [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
141 T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
143 l = ::Revactor::UNIX.listen(s)
144 [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
145 T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
150 # Revactor Sockets do not implement readpartial, so we emulate just
151 # enough to avoid mucking with TeeInput internals. Fortunately
152 # this code is not heavily used so we can usually avoid the overhead
153 # of adding a userspace buffer.
155 def initialize(socket)
156 # IO::Buffer is used internally by Rev which Revactor is based on
157 # so we'll always have it available
158 @socket, @rbuf = socket, IO::Buffer.new
165 # Revactor socket reads always return an unspecified amount,
167 def kgio_read(length, dst = "")
168 return dst.replace("") if length == 0
170 # always check and return from the userspace buffer first
171 @rbuf.size > 0 and return dst.replace(@rbuf.read(length))
173 # read off the socket since there was nothing in rbuf
176 # we didn't read too much, good, just return it straight back
177 # to avoid needlessly wasting memory bandwidth
178 tmp.size <= length and return dst.replace(tmp)
180 # ugh, read returned too much
181 @rbuf << tmp[length, tmp.size]
182 dst.replace(tmp[0, length])
186 # just proxy any remaining methods TeeInput may use