cleanup: consolidate write_nonblock error handling
[rainbows.git] / lib / rainbows / revactor.rb
blobb410bda0b34a1aa77137f6c63f20915731e57cef
1 # -*- encoding: binary -*-
2 require 'revactor'
3 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
5 module Rainbows
7   # Enables use of the Actor model through
8   # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
9   # long-lived Actor for every listen socket in the process and spawns a
10   # new Actor for every client connection accept()-ed.
11   # +worker_connections+ will limit the number of client Actors we have
12   # running at any one time.
13   #
14   # Applications using this model are required to be reentrant, but do
15   # not have to worry about race conditions unless they use threads
16   # internally.  \Rainbows! does not spawn threads under this model.
17   # Multiple instances of the same app may run in the same address space
18   # sequentially (but at interleaved points).  Any network dependencies
19   # in the application using this model should be implemented using the
20   # \Revactor library as well, to take advantage of the networking
21   # concurrency features this model provides.
23   module Revactor
24     require 'rainbows/revactor/tee_input'
26     RD_ARGS = {}
28     include Base
30     # once a client is accepted, it is processed in its entirety here
31     # in 3 easy steps: read request, call app, write app response
32     def process_client(client)
33       defined?(Fcntl::FD_CLOEXEC) and
34         client.instance_eval { @_io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
35       rd_args = [ nil ]
36       remote_addr = if ::Revactor::TCP::Socket === client
37         rd_args << RD_ARGS
38         client.remote_addr
39       else
40         LOCALHOST
41       end
42       buf = client.read(*rd_args)
43       hp = HttpParser.new
44       env = {}
45       alive = true
47       begin
48         while ! hp.headers(env, buf)
49           buf << client.read(*rd_args)
50         end
52         env[Const::CLIENT_IO] = client
53         env[Const::RACK_INPUT] = 0 == hp.content_length ?
54                  HttpRequest::NULL_IO :
55                  Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
56         env[Const::REMOTE_ADDR] = remote_addr
57         response = app.call(env.update(RACK_DEFAULTS))
59         if 100 == response.first.to_i
60           client.write(Const::EXPECT_100_RESPONSE)
61           env.delete(Const::HTTP_EXPECT)
62           response = app.call(env)
63         end
65         alive = hp.keepalive? && G.alive
66         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
67         HttpResponse.write(client, response, out)
68       end while alive and hp.reset.nil? and env.clear
69     rescue ::Revactor::TCP::ReadError
70     rescue => e
71       Error.write(client.instance_eval { @_io }, e)
72     ensure
73       client.close
74     end
76     # runs inside each forked worker, this sits around and waits
77     # for connections and doesn't die until the parent dies (or is
78     # given a INT, QUIT, or TERM signal)
79     def worker_loop(worker)
80       init_worker_process(worker)
81       RD_ARGS[:timeout] = G.kato if G.kato > 0
82       nr = 0
83       limit = worker_connections
84       actor_exit = Case[:exit, Actor, Object]
86       revactorize_listeners.each do |l, close, accept|
87         Actor.spawn(l, close, accept) do |l, close, accept|
88           Actor.current.trap_exit = true
89           l.controller = l.instance_eval { @receiver = Actor.current }
90           begin
91             while nr >= limit
92               l.disable if l.enabled?
93               logger.info "busy: clients=#{nr} >= limit=#{limit}"
94               Actor.receive do |f|
95                 f.when(close) {}
96                 f.when(actor_exit) { nr -= 1 }
97                 f.after(0.01) {} # another listener could've gotten an exit
98               end
99             end
101             l.enable unless l.enabled?
102             Actor.receive do |f|
103               f.when(close) {}
104               f.when(actor_exit) { nr -= 1 }
105               f.when(accept) do |_, _, s|
106                 nr += 1
107                 Actor.spawn_link(s) { |c| process_client(c) }
108               end
109             end
110           rescue => e
111             Error.listen_loop(e)
112           end while G.alive
113           Actor.receive do |f|
114             f.when(close) {}
115             f.when(actor_exit) { nr -= 1 }
116           end while nr > 0
117         end
118       end
120       Actor.sleep 1 while G.tick || nr > 0
121       rescue Errno::EMFILE => e
122     end
124     def revactorize_listeners
125       LISTENERS.map do |s|
126         case s
127         when TCPServer
128           l = ::Revactor::TCP.listen(s, nil)
129           [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
130             T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
131         when UNIXServer
132           l = ::Revactor::UNIX.listen(s)
133           [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
134             T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
135         end
136       end
137     end
139   end