bump TCP_DEFER_ACCEPT default value
[rainbows.git] / lib / rainbows / revactor.rb
blobde423a39d984899557586752ee29f68d56954b71
1 # -*- encoding: binary -*-
2 require 'revactor'
3 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
5 # Enables use of the Actor model through
6 # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
7 # long-lived Actor for every listen socket in the process and spawns a
8 # new Actor for every client connection accept()-ed.
9 # +worker_connections+ will limit the number of client Actors we have
10 # running at any one time.
12 # Applications using this model are required to be reentrant, but do
13 # not have to worry about race conditions unless they use threads
14 # internally.  \Rainbows! does not spawn threads under this model.
15 # Multiple instances of the same app may run in the same address space
16 # sequentially (but at interleaved points).  Any network dependencies
17 # in the application using this model should be implemented using the
18 # \Revactor library as well, to take advantage of the networking
19 # concurrency features this model provides.
20 module Rainbows::Revactor
22   # :stopdoc:
23   RD_ARGS = {}
25   include Rainbows::Base
26   LOCALHOST = Unicorn::HttpRequest::LOCALHOST
27   TCP = ::Revactor::TCP::Socket
29   # once a client is accepted, it is processed in its entirety here
30   # in 3 easy steps: read request, call app, write app response
31   def process_client(client)
32     io = client.instance_variable_get(:@_io)
33     io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
34     rd_args = [ nil ]
35     remote_addr = if TCP === client
36       rd_args << RD_ARGS
37       client.remote_addr
38     else
39       LOCALHOST
40     end
41     buf = client.read(*rd_args)
42     hp = HttpParser.new
43     env = {}
44     alive = true
46     begin
47       buf << client.read(*rd_args) until hp.headers(env, buf)
49       env[CLIENT_IO] = client
50       env[RACK_INPUT] = 0 == hp.content_length ?
51                NULL_IO : TeeInput.new(PartialSocket.new(client), env, hp, buf)
52       env[REMOTE_ADDR] = remote_addr
53       response = app.call(env.update(RACK_DEFAULTS))
55       if 100 == response[0].to_i
56         client.write(EXPECT_100_RESPONSE)
57         env.delete(HTTP_EXPECT)
58         response = app.call(env)
59       end
61       alive = hp.keepalive? && G.alive
62       out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
63       write_response(client, response, out)
64     end while alive and hp.reset.nil? and env.clear
65   rescue ::Revactor::TCP::ReadError
66   rescue => e
67     Rainbows::Error.write(io, e)
68   ensure
69     client.close
70   end
72   # runs inside each forked worker, this sits around and waits
73   # for connections and doesn't die until the parent dies (or is
74   # given a INT, QUIT, or TERM signal)
75   def worker_loop(worker)
76     init_worker_process(worker)
77     self.class.__send__(:alias_method, :write_body, :write_body_each)
78     RD_ARGS[:timeout] = G.kato if G.kato > 0
79     nr = 0
80     limit = worker_connections
81     actor_exit = Case[:exit, Actor, Object]
83     revactorize_listeners.each do |l, close, accept|
84       Actor.spawn(l, close, accept) do |l, close, accept|
85         Actor.current.trap_exit = true
86         l.controller = l.instance_variable_set(:@receiver, Actor.current)
87         begin
88           while nr >= limit
89             l.disable if l.enabled?
90             logger.info "busy: clients=#{nr} >= limit=#{limit}"
91             Actor.receive do |f|
92               f.when(close) {}
93               f.when(actor_exit) { nr -= 1 }
94               f.after(0.01) {} # another listener could've gotten an exit
95             end
96           end
98           l.enable unless l.enabled?
99           Actor.receive do |f|
100             f.when(close) {}
101             f.when(actor_exit) { nr -= 1 }
102             f.when(accept) do |_, _, s|
103               nr += 1
104               Actor.spawn_link(s) { |c| process_client(c) }
105             end
106           end
107         rescue => e
108           Rainbows::Error.listen_loop(e)
109         end while G.alive
110         Actor.receive do |f|
111           f.when(close) {}
112           f.when(actor_exit) { nr -= 1 }
113         end while nr > 0
114       end
115     end
117     Actor.sleep 1 while G.tick || nr > 0
118     rescue Errno::EMFILE
119       # ignore, let another worker process take it
120   end
122   def revactorize_listeners
123     LISTENERS.map do |s|
124       case s
125       when TCPServer
126         l = ::Revactor::TCP.listen(s, nil)
127         [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
128           T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
129       when UNIXServer
130         l = ::Revactor::UNIX.listen(s)
131         [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
132           T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
133       end
134     end
135   end
137   # Revactor Sockets do not implement readpartial, so we emulate just
138   # enough to avoid mucking with TeeInput internals.  Fortunately
139   # this code is not heavily used so we can usually avoid the overhead
140   # of adding a userspace buffer.
141   class PartialSocket < Struct.new(:socket, :rbuf)
142     def initialize(socket)
143       # IO::Buffer is used internally by Rev which Revactor is based on
144       # so we'll always have it available
145       super(socket, IO::Buffer.new)
146     end
148     # Revactor socket reads always return an unspecified amount,
149     # sometimes too much
150     def readpartial(length, dst = "")
151       return dst.replace("") if length == 0
153       # always check and return from the userspace buffer first
154       rbuf.size > 0 and return dst.replace(rbuf.read(length))
156       # read off the socket since there was nothing in rbuf
157       tmp = socket.read
159       # we didn't read too much, good, just return it straight back
160       # to avoid needlessly wasting memory bandwidth
161       tmp.size <= length and return dst.replace(tmp)
163       # ugh, read returned too much
164       rbuf << tmp[length, tmp.size]
165       dst.replace(tmp[0, length])
166     end
168     # just proxy any remaining methods TeeInput may use
169     def close
170       socket.close
171     end
172   end
174   # :startdoc: