tests: disable async_sinatra test for Ruby 1.9.2
[rainbows.git] / lib / rainbows / revactor.rb
blobb1f0d9b0d4e663e690a34de4a11cc453a380e6bc
1 # -*- encoding: binary -*-
2 require 'revactor'
3 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
5 module Rainbows
7   # Enables use of the Actor model through
8   # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
9   # long-lived Actor for every listen socket in the process and spawns a
10   # new Actor for every client connection accept()-ed.
11   # +worker_connections+ will limit the number of client Actors we have
12   # running at any one time.
13   #
14   # Applications using this model are required to be reentrant, but do
15   # not have to worry about race conditions unless they use threads
16   # internally.  \Rainbows! does not spawn threads under this model.
17   # Multiple instances of the same app may run in the same address space
18   # sequentially (but at interleaved points).  Any network dependencies
19   # in the application using this model should be implemented using the
20   # \Revactor library as well, to take advantage of the networking
21   # concurrency features this model provides.
23   module Revactor
24     RD_ARGS = {}
26     include Base
28     # once a client is accepted, it is processed in its entirety here
29     # in 3 easy steps: read request, call app, write app response
30     def process_client(client)
31       io = client.instance_variable_get(:@_io)
32       io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
33       rd_args = [ nil ]
34       remote_addr = if ::Revactor::TCP::Socket === client
35         rd_args << RD_ARGS
36         client.remote_addr
37       else
38         Unicorn::HttpRequest::LOCALHOST
39       end
40       buf = client.read(*rd_args)
41       hp = HttpParser.new
42       env = {}
43       alive = true
45       begin
46         while ! hp.headers(env, buf)
47           buf << client.read(*rd_args)
48         end
50         env[Const::CLIENT_IO] = client
51         env[Const::RACK_INPUT] = 0 == hp.content_length ?
52                  HttpRequest::NULL_IO :
53                  TeeInput.new(PartialSocket.new(client), env, hp, buf)
54         env[Const::REMOTE_ADDR] = remote_addr
55         response = app.call(env.update(RACK_DEFAULTS))
57         if 100 == response.first.to_i
58           client.write(Const::EXPECT_100_RESPONSE)
59           env.delete(Const::HTTP_EXPECT)
60           response = app.call(env)
61         end
63         alive = hp.keepalive? && G.alive
64         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
65         HttpResponse.write(client, response, out)
66       end while alive and hp.reset.nil? and env.clear
67     rescue ::Revactor::TCP::ReadError
68     rescue => e
69       Error.write(io, e)
70     ensure
71       client.close
72     end
74     # runs inside each forked worker, this sits around and waits
75     # for connections and doesn't die until the parent dies (or is
76     # given a INT, QUIT, or TERM signal)
77     def worker_loop(worker)
78       init_worker_process(worker)
79       RD_ARGS[:timeout] = G.kato if G.kato > 0
80       nr = 0
81       limit = worker_connections
82       actor_exit = Case[:exit, Actor, Object]
84       revactorize_listeners.each do |l, close, accept|
85         Actor.spawn(l, close, accept) do |l, close, accept|
86           Actor.current.trap_exit = true
87           l.controller = l.instance_variable_set(:@receiver, Actor.current)
88           begin
89             while nr >= limit
90               l.disable if l.enabled?
91               logger.info "busy: clients=#{nr} >= limit=#{limit}"
92               Actor.receive do |f|
93                 f.when(close) {}
94                 f.when(actor_exit) { nr -= 1 }
95                 f.after(0.01) {} # another listener could've gotten an exit
96               end
97             end
99             l.enable unless l.enabled?
100             Actor.receive do |f|
101               f.when(close) {}
102               f.when(actor_exit) { nr -= 1 }
103               f.when(accept) do |_, _, s|
104                 nr += 1
105                 Actor.spawn_link(s) { |c| process_client(c) }
106               end
107             end
108           rescue => e
109             Error.listen_loop(e)
110           end while G.alive
111           Actor.receive do |f|
112             f.when(close) {}
113             f.when(actor_exit) { nr -= 1 }
114           end while nr > 0
115         end
116       end
118       Actor.sleep 1 while G.tick || nr > 0
119       rescue Errno::EMFILE
120         # ignore, let another worker process take it
121     end
123     def revactorize_listeners
124       LISTENERS.map do |s|
125         case s
126         when TCPServer
127           l = ::Revactor::TCP.listen(s, nil)
128           [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
129             T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
130         when UNIXServer
131           l = ::Revactor::UNIX.listen(s)
132           [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
133             T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
134         end
135       end
136     end
138     # Revactor Sockets do not implement readpartial, so we emulate just
139     # enough to avoid mucking with TeeInput internals.  Fortunately
140     # this code is not heavily used so we can usually avoid the overhead
141     # of adding a userspace buffer.
142     class PartialSocket < Struct.new(:socket, :rbuf)
143       def initialize(socket)
144         # IO::Buffer is used internally by Rev which Revactor is based on
145         # so we'll always have it available
146         super(socket, IO::Buffer.new)
147       end
149       # Revactor socket reads always return an unspecified amount,
150       # sometimes too much
151       def readpartial(length, dst = "")
152         return dst if length == 0
153         # always check and return from the userspace buffer first
154         rbuf.size > 0 and return dst.replace(rbuf.read(length))
156         # read off the socket since there was nothing in rbuf
157         tmp = socket.read
159         # we didn't read too much, good, just return it straight back
160         # to avoid needlessly wasting memory bandwidth
161         tmp.size <= length and return dst.replace(tmp)
163         # ugh, read returned too much, copy + reread to avoid slicing
164         rbuf << tmp[length, tmp.size]
165         dst.replace(tmp[0, length])
166       end
168       # just proxy any remaining methods TeeInput may use
169       def close
170         socket.close
171       end
172     end
174   end