1 # -*- encoding: binary -*-
3 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
7 # Enables use of the Actor model through
8 # {Revactor}[http://revactor.org] under Ruby 1.9. It spawns one
9 # long-lived Actor for every listen socket in the process and spawns a
10 # new Actor for every client connection accept()-ed.
11 # +worker_connections+ will limit the number of client Actors we have
12 # running at any one time.
14 # Applications using this model are required to be reentrant, but do
15 # not have to worry about race conditions unless they use threads
16 # internally. \Rainbows! does not spawn threads under this model.
17 # Multiple instances of the same app may run in the same address space
18 # sequentially (but at interleaved points). Any network dependencies
19 # in the application using this model should be implemented using the
20 # \Revactor library as well, to take advantage of the networking
21 # concurrency features this model provides.
28 # once a client is accepted, it is processed in its entirety here
29 # in 3 easy steps: read request, call app, write app response
30 def process_client(client)
31 io = client.instance_variable_get(:@_io)
32 io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
34 remote_addr = if ::Revactor::TCP::Socket === client
38 Unicorn::HttpRequest::LOCALHOST
40 buf = client.read(*rd_args)
46 while ! hp.headers(env, buf)
47 buf << client.read(*rd_args)
50 env[Const::CLIENT_IO] = client
51 env[Const::RACK_INPUT] = 0 == hp.content_length ?
52 HttpRequest::NULL_IO :
53 TeeInput.new(PartialSocket.new(client), env, hp, buf)
54 env[Const::REMOTE_ADDR] = remote_addr
55 response = app.call(env.update(RACK_DEFAULTS))
57 if 100 == response.first.to_i
58 client.write(Const::EXPECT_100_RESPONSE)
59 env.delete(Const::HTTP_EXPECT)
60 response = app.call(env)
63 alive = hp.keepalive? && G.alive
64 out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
65 HttpResponse.write(client, response, out)
66 end while alive and hp.reset.nil? and env.clear
67 rescue ::Revactor::TCP::ReadError
74 # runs inside each forked worker, this sits around and waits
75 # for connections and doesn't die until the parent dies (or is
76 # given a INT, QUIT, or TERM signal)
77 def worker_loop(worker)
78 init_worker_process(worker)
79 RD_ARGS[:timeout] = G.kato if G.kato > 0
81 limit = worker_connections
82 actor_exit = Case[:exit, Actor, Object]
84 revactorize_listeners.each do |l, close, accept|
85 Actor.spawn(l, close, accept) do |l, close, accept|
86 Actor.current.trap_exit = true
87 l.controller = l.instance_variable_set(:@receiver, Actor.current)
90 l.disable if l.enabled?
91 logger.info "busy: clients=#{nr} >= limit=#{limit}"
94 f.when(actor_exit) { nr -= 1 }
95 f.after(0.01) {} # another listener could've gotten an exit
99 l.enable unless l.enabled?
102 f.when(actor_exit) { nr -= 1 }
103 f.when(accept) do |_, _, s|
105 Actor.spawn_link(s) { |c| process_client(c) }
113 f.when(actor_exit) { nr -= 1 }
118 Actor.sleep 1 while G.tick || nr > 0
120 # ignore, let another worker process take it
123 def revactorize_listeners
127 l = ::Revactor::TCP.listen(s, nil)
128 [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
129 T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
131 l = ::Revactor::UNIX.listen(s)
132 [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
133 T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
138 # Revactor Sockets do not implement readpartial, so we emulate just
139 # enough to avoid mucking with TeeInput internals. Fortunately
140 # this code is not heavily used so we can usually avoid the overhead
141 # of adding a userspace buffer.
142 class PartialSocket < Struct.new(:socket, :rbuf)
143 def initialize(socket)
144 # IO::Buffer is used internally by Rev which Revactor is based on
145 # so we'll always have it available
146 super(socket, IO::Buffer.new)
149 # Revactor socket reads always return an unspecified amount,
151 def readpartial(length, dst = "")
152 return dst if length == 0
153 # always check and return from the userspace buffer first
154 rbuf.size > 0 and return dst.replace(rbuf.read(length))
156 # read off the socket since there was nothing in rbuf
159 # we didn't read too much, good, just return it straight back
160 # to avoid needlessly wasting memory bandwidth
161 tmp.size <= length and return dst.replace(tmp)
163 # ugh, read returned too much, copy + reread to avoid slicing
164 rbuf << tmp[length, tmp.size]
165 dst.replace(tmp[0, length])
168 # just proxy any remaining methods TeeInput may use