doc: misc cleanups and additions for RDoc
[rainbows.git] / lib / rainbows / revactor.rb
blobd99624342117445e7262a73e00b579db0bb7860c
1 # -*- encoding: binary -*-
2 require 'revactor'
3 require 'fcntl'
4 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
6 # Enables use of the Actor model through
7 # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
8 # long-lived Actor for every listen socket in the process and spawns a
9 # new Actor for every client connection accept()-ed.
10 # +worker_connections+ will limit the number of client Actors we have
11 # running at any one time.
13 # Applications using this model are required to be reentrant, but do
14 # not have to worry about race conditions unless they use threads
15 # internally.  \Rainbows! does not spawn threads under this model.
16 # Multiple instances of the same app may run in the same address space
17 # sequentially (but at interleaved points).  Any network dependencies
18 # in the application using this model should be implemented using the
19 # \Revactor library as well, to take advantage of the networking
20 # concurrency features this model provides.
21 module Rainbows::Revactor
23   # :stopdoc:
24   RD_ARGS = {}
26   autoload :Proxy, 'rainbows/revactor/proxy'
27   autoload :TeeSocket, 'rainbows/revactor/tee_socket'
29   include Rainbows::Base
30   LOCALHOST = Kgio::LOCALHOST
31   TCP = Revactor::TCP::Socket
33   # once a client is accepted, it is processed in its entirety here
34   # in 3 easy steps: read request, call app, write app response
35   def process_client(client) # :nodoc:
36     io = client.instance_variable_get(:@_io)
37     io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
38     rd_args = [ nil ]
39     remote_addr = if TCP === client
40       rd_args << RD_ARGS
41       client.remote_addr
42     else
43       LOCALHOST
44     end
45     hp = Unicorn::HttpParser.new
46     buf = hp.buf
47     alive = false
49     begin
50       ts = nil
51       until env = hp.parse
52         buf << client.read(*rd_args)
53       end
55       env[CLIENT_IO] = client
56       env[RACK_INPUT] = 0 == hp.content_length ?
57                NULL_IO : IC.new(ts = TeeSocket.new(client), hp)
58       env[REMOTE_ADDR] = remote_addr
59       status, headers, body = app.call(env.update(RACK_DEFAULTS))
61       if 100 == status.to_i
62         client.write(EXPECT_100_RESPONSE)
63         env.delete(HTTP_EXPECT)
64         status, headers, body = app.call(env)
65       end
67       if hp.headers?
68         headers = HH.new(headers)
69         range = make_range!(env, status, headers) and status = range.shift
70         alive = hp.next? && G.alive && G.kato > 0
71         headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
72         client.write(response_header(status, headers))
73         alive && ts and buf << ts.leftover
74       end
75       write_body(client, body, range)
76     end while alive
77   rescue Revactor::TCP::ReadError
78   rescue => e
79     Rainbows::Error.write(io, e)
80   ensure
81     client.close
82   end
84   # runs inside each forked worker, this sits around and waits
85   # for connections and doesn't die until the parent dies (or is
86   # given a INT, QUIT, or TERM signal)
87   def worker_loop(worker) #:nodoc:
88     init_worker_process(worker)
89     require 'rainbows/revactor/body'
90     self.class.__send__(:include, Rainbows::Revactor::Body)
91     self.class.const_set(:IC, Unicorn::HttpRequest.input_class)
92     RD_ARGS[:timeout] = G.kato if G.kato > 0
93     nr = 0
94     limit = worker_connections
95     actor_exit = Case[:exit, Actor, Object]
97     revactorize_listeners.each do |l, close, accept|
98       Actor.spawn(l, close, accept) do |l, close, accept|
99         Actor.current.trap_exit = true
100         l.controller = l.instance_variable_set(:@receiver, Actor.current)
101         begin
102           while nr >= limit
103             l.disable if l.enabled?
104             logger.info "busy: clients=#{nr} >= limit=#{limit}"
105             Actor.receive do |f|
106               f.when(close) {}
107               f.when(actor_exit) { nr -= 1 }
108               f.after(0.01) {} # another listener could've gotten an exit
109             end
110           end
112           l.enable unless l.enabled?
113           Actor.receive do |f|
114             f.when(close) {}
115             f.when(actor_exit) { nr -= 1 }
116             f.when(accept) do |_, _, s|
117               nr += 1
118               Actor.spawn_link(s) { |c| process_client(c) }
119             end
120           end
121         rescue => e
122           Rainbows::Error.listen_loop(e)
123         end while G.alive
124         Actor.receive do |f|
125           f.when(close) {}
126           f.when(actor_exit) { nr -= 1 }
127         end while nr > 0
128       end
129     end
131     Actor.sleep 1 while G.tick || nr > 0
132     rescue Errno::EMFILE
133       # ignore, let another worker process take it
134   end
136   def revactorize_listeners
137     LISTENERS.map do |s|
138       case s
139       when TCPServer
140         l = Revactor::TCP.listen(s, nil)
141         [ l, T[:tcp_closed, Revactor::TCP::Socket],
142           T[:tcp_connection, l, Revactor::TCP::Socket] ]
143       when UNIXServer
144         l = Revactor::UNIX.listen(s)
145         [ l, T[:unix_closed, Revactor::UNIX::Socket ],
146           T[:unix_connection, l, Revactor::UNIX::Socket] ]
147       end
148     end
149   end
150   # :startdoc: