start using kgio library
[rainbows.git] / lib / rainbows / revactor.rb
blobeae76739366048d08ccf92f7d68dc6ed23c6b8af
1 # -*- encoding: binary -*-
2 require 'revactor'
3 require 'fcntl'
4 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
6 # Enables use of the Actor model through
7 # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
8 # long-lived Actor for every listen socket in the process and spawns a
9 # new Actor for every client connection accept()-ed.
10 # +worker_connections+ will limit the number of client Actors we have
11 # running at any one time.
13 # Applications using this model are required to be reentrant, but do
14 # not have to worry about race conditions unless they use threads
15 # internally.  \Rainbows! does not spawn threads under this model.
16 # Multiple instances of the same app may run in the same address space
17 # sequentially (but at interleaved points).  Any network dependencies
18 # in the application using this model should be implemented using the
19 # \Revactor library as well, to take advantage of the networking
20 # concurrency features this model provides.
21 module Rainbows::Revactor
23   # :stopdoc:
24   RD_ARGS = {}
26   autoload :Proxy, 'rainbows/revactor/proxy'
28   include Rainbows::Base
29   LOCALHOST = Unicorn::HttpRequest::LOCALHOST
30   TCP = ::Revactor::TCP::Socket
32   # once a client is accepted, it is processed in its entirety here
33   # in 3 easy steps: read request, call app, write app response
34   def process_client(client) # :nodoc:
35     io = client.instance_variable_get(:@_io)
36     io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
37     rd_args = [ nil ]
38     remote_addr = if TCP === client
39       rd_args << RD_ARGS
40       client.remote_addr
41     else
42       LOCALHOST
43     end
44     buf = client.read(*rd_args)
45     hp = HttpParser.new
46     env = {}
48     begin
49       buf << client.read(*rd_args) until hp.headers(env, buf)
51       env[CLIENT_IO] = client
52       env[RACK_INPUT] = 0 == hp.content_length ?
53                NULL_IO : TeeInput.new(PartialSocket.new(client), env, hp, buf)
54       env[REMOTE_ADDR] = remote_addr
55       status, headers, body = app.call(env.update(RACK_DEFAULTS))
57       if 100 == status.to_i
58         client.write(EXPECT_100_RESPONSE)
59         env.delete(HTTP_EXPECT)
60         status, headers, body = app.call(env)
61       end
63       if hp.headers?
64         headers = HH.new(headers)
65         range = make_range!(env, status, headers) and status = range.shift
66         env = false unless hp.keepalive? && G.alive && G.kato > 0
67         headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
68         client.write(response_header(status, headers))
69       end
70       write_body(client, body, range)
71     end while env && env.clear && hp.reset.nil?
72   rescue ::Revactor::TCP::ReadError
73   rescue => e
74     Rainbows::Error.write(io, e)
75   ensure
76     client.close
77   end
79   # runs inside each forked worker, this sits around and waits
80   # for connections and doesn't die until the parent dies (or is
81   # given a INT, QUIT, or TERM signal)
82   def worker_loop(worker) #:nodoc:
83     init_worker_process(worker)
84     require 'rainbows/revactor/body'
85     self.class.__send__(:include, Rainbows::Revactor::Body)
86     RD_ARGS[:timeout] = G.kato if G.kato > 0
87     nr = 0
88     limit = worker_connections
89     actor_exit = Case[:exit, Actor, Object]
91     revactorize_listeners.each do |l, close, accept|
92       Actor.spawn(l, close, accept) do |l, close, accept|
93         Actor.current.trap_exit = true
94         l.controller = l.instance_variable_set(:@receiver, Actor.current)
95         begin
96           while nr >= limit
97             l.disable if l.enabled?
98             logger.info "busy: clients=#{nr} >= limit=#{limit}"
99             Actor.receive do |f|
100               f.when(close) {}
101               f.when(actor_exit) { nr -= 1 }
102               f.after(0.01) {} # another listener could've gotten an exit
103             end
104           end
106           l.enable unless l.enabled?
107           Actor.receive do |f|
108             f.when(close) {}
109             f.when(actor_exit) { nr -= 1 }
110             f.when(accept) do |_, _, s|
111               nr += 1
112               Actor.spawn_link(s) { |c| process_client(c) }
113             end
114           end
115         rescue => e
116           Rainbows::Error.listen_loop(e)
117         end while G.alive
118         Actor.receive do |f|
119           f.when(close) {}
120           f.when(actor_exit) { nr -= 1 }
121         end while nr > 0
122       end
123     end
125     Actor.sleep 1 while G.tick || nr > 0
126     rescue Errno::EMFILE
127       # ignore, let another worker process take it
128   end
130   def revactorize_listeners
131     LISTENERS.map do |s|
132       case s
133       when TCPServer
134         l = ::Revactor::TCP.listen(s, nil)
135         [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
136           T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
137       when UNIXServer
138         l = ::Revactor::UNIX.listen(s)
139         [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
140           T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
141       end
142     end
143   end
145   # Revactor Sockets do not implement readpartial, so we emulate just
146   # enough to avoid mucking with TeeInput internals.  Fortunately
147   # this code is not heavily used so we can usually avoid the overhead
148   # of adding a userspace buffer.
149   class PartialSocket < Struct.new(:socket, :rbuf)
150     def initialize(socket)
151       # IO::Buffer is used internally by Rev which Revactor is based on
152       # so we'll always have it available
153       super(socket, IO::Buffer.new)
154     end
156     # Revactor socket reads always return an unspecified amount,
157     # sometimes too much
158     def readpartial(length, dst = "")
159       return dst.replace("") if length == 0
161       # always check and return from the userspace buffer first
162       rbuf.size > 0 and return dst.replace(rbuf.read(length))
164       # read off the socket since there was nothing in rbuf
165       tmp = socket.read
167       # we didn't read too much, good, just return it straight back
168       # to avoid needlessly wasting memory bandwidth
169       tmp.size <= length and return dst.replace(tmp)
171       # ugh, read returned too much
172       rbuf << tmp[length, tmp.size]
173       dst.replace(tmp[0, length])
174     end
176     # just proxy any remaining methods TeeInput may use
177     def close
178       socket.close
179     end
180   end
182   # :startdoc: