Refactor and get exec + FD inheritance working
[unicorn.git] / lib / unicorn.rb
blob9c6aab73dc47abc7ea7f8c6b207937cda65e7202
1 require 'logger'
3 require 'unicorn/socket'
4 require 'unicorn/const'
5 require 'unicorn/http_request'
6 require 'unicorn/http_response'
8 # Unicorn module containing all of the classes (include C extensions) for running
9 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
10 # functionality to service web application requests fast as possible.
11 module Unicorn
12   class << self
13     def run(app, options = {})
14       HttpServer.new(app, options).start.join
15     end
16   end
18   # This is the process manager of Unicorn. This manages worker
19   # processes which in turn handle the I/O and application process.
20   # Listener sockets are started in the master process and shared with
21   # forked worker children.
22   class HttpServer
23     attr_reader :logger
24     include Process
25     include ::Unicorn::SocketHelper
27     DEFAULT_START_CTX = {
28       :argv => ARGV.map { |arg| arg.dup },
29       :cwd => (ENV['PWD'] || Dir.pwd),
30       :zero => $0.dup,
31       :environ => {}.merge!(ENV),
32       :umask => File.umask,
33     }.freeze
35     DEFAULTS = {
36       :timeout => 60,
37       :listeners => %w(0.0.0.0:8080),
38       :logger => Logger.new(STDERR),
39       :nr_workers => 1,
40       :after_fork => lambda { |server, worker_nr|
41           server.logger.info("worker=#{worker_nr} spawned pid=#{$$}")
42         },
43       :before_fork => lambda { |server, worker_nr|
44           server.logger.info("worker=#{worker_nr} spawning...")
45         },
46     }
47     # Creates a working server on host:port (strange things happen if
48     # port isn't a Number).  Use HttpServer::run to start the server and
49     # HttpServer.workers.join to join the thread that's processing
50     # incoming requests on the socket.
51     def initialize(app, options = {})
52       (DEFAULTS.to_a + options.to_a).each do |key, value|
53         instance_variable_set("@#{key.to_s.downcase}", value)
54       end
56       @app = app
57       @mode = :idle
58       @master_pid = $$
59       @workers = Hash.new
60       @request = HttpRequest.new(logger) # shared between all worker processes
61       @start_ctx = DEFAULT_START_CTX.dup
62       @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx]
63       @purgatory = [] # prevents objects in here from being GC-ed
64     end
66     # Runs the thing.  Returns self so you can run join on it
67     def start
68       BasicSocket.do_not_reverse_lookup = true
70       # inherit sockets from parents, they need to be plain Socket objects
71       # before they become UNIXServer or TCPServer
72       inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
73         io = Socket.for_fd(fd.to_i)
74         set_server_sockopt(io)
75         logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
76         io
77       end
79       # avoid binding inherited sockets, probably not perfect for TCPSockets
80       # but it works for UNIXSockets
81       @listeners -= inherited.map { |io| sock_name(io) }
83       # try binding new listeners
84       @listeners.map! do |addr|
85         if sock = bind_listen(addr, 1024)
86           sock
87         elsif inherited.empty? || addr[0..0] == "/"
88           raise Errno::EADDRINUSE, "couldn't bind #{addr}"
89         else
90           logger.info "couldn't bind #{addr}, inherited?"
91           nil
92         end
93       end
94       @listeners += inherited
95       @listeners.compact!
96       @listeners.empty? and raise ArgumentError, 'No listener sockets'
98       # we start out with generic Socket objects that get cast to either
99       # TCPServer or UNIXServer objects; but since the Socket objects
100       # share the same OS-level file descriptor as the higher-level *Server
101       # objects; we need to prevent Socket objects from being garbage-collected
102       @purgatory += @listeners
103       @listeners.map! { |io| server_cast(io) }
104       @listeners.each do |io|
105         logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}"
106       end
107       spawn_missing_workers
108       self
109     end
111     # monitors children and receives signals forever
112     # (or until a termination signal is sent)
113     def join
114       %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) }
115       begin
116         loop do
117           reap_all_workers
118           case @mode
119           when :idle
120             kill_each_worker(0) # ensure they're running
121             spawn_missing_workers
122           when 'QUIT' # graceful shutdown
123             break
124           when 'TERM', 'INT' # immediate shutdown
125             stop(false)
126             break
127           when 'USR1' # user-defined (probably something like log reopening)
128             kill_each_worker('USR1')
129             @mode = :idle
130             trap_deferred('USR1')
131           when 'USR2' # exec binary, stay alive in case something went wrong
132             reexec
133             @mode = :idle
134             trap_deferred('USR2')
135           when 'HUP' # exec binary and exit
136             reexec
137             break
138           else
139             logger.error "master process in unknown mode: #{@mode}, resetting"
140             @mode = :idle
141           end
142           reap_all_workers
143           sleep 1
144         end
145       rescue Errno::EINTR
146         retry
147       rescue Object => e
148         logger.error "Unhandled master loop exception #{e.inspect}."
149         logger.error e.backtrace.join("\n")
150         sleep 1 rescue nil
151         retry
152       end
153       stop # gracefully shutdown all workers on our way out
154       logger.info "master pid=#{$$} exit"
155     end
157     # Terminates all workers, but does not exit master process
158     def stop(graceful = true)
159       kill_each_worker(graceful ? 'QUIT' : 'TERM')
160       timeleft = @timeout
161       step = 0.2
162       reap_all_workers
163       until @workers.empty?
164         sleep(step)
165         reap_all_workers
166         (timeleft -= step) > 0 and next
167         kill_each_worker('KILL')
168       end
169     ensure
170       @listeners.each { |sock| sock.close rescue nil }
171       @listeners.clear
172     end
174     private
176     # defer a signal for later processing
177     def trap_deferred(signal)
178       trap(signal) do |sig_nr|
179         trap(signal, 'IGNORE') # prevent double signalling
180         @mode = signal if Symbol === @mode
181       end
182     end
184     # reaps all unreaped workers
185     def reap_all_workers
186       begin
187         loop do
188           pid = waitpid(-1, WNOHANG) or break
189           worker_nr = @workers.delete(pid)
190           logger.info "reaped pid=#{pid} worker=#{worker_nr || 'unknown'} " \
191                       "status=#{$?.exitstatus}"
192         end
193       rescue Errno::ECHILD
194       end
195     end
197     # Forks, sets current environment, sets the umask, chdirs to the desired
198     # start directory, and execs the command line originally passed to us to
199     # start Unicorn.
200     # Returns the pid of the forked process
201     def spawn_start_ctx(check = nil)
202       fork do
203         ENV.replace(@start_ctx[:environ])
204         ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',')
205         File.umask(@start_ctx[:umask])
206         Dir.chdir(@start_ctx[:cwd])
207         cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
208         cmd << 'check' if check
209         logger.info "executing #{cmd.inspect}"
210         exec *cmd
211       end
212     end
214     # ensures @start_ctx is reusable for re-execution
215     def check_reexec
216       pid = waitpid(spawn_start_ctx(:check))
217       $?.success? and return true
218       logger.error "exec check failed with #{$?.exitstatus}"
219     end
221     # reexecutes the @start_ctx with a new binary
222     def reexec
223       check_reexec or return false
224       pid = spawn_start_ctx
225       if waitpid(pid, WNOHANG)
226         logger.error "rexec pid=#{pid} died with #{$?.exitstatus}"
227       end
228     end
230     def spawn_missing_workers
231       return if @workers.size == @nr_workers
232       (0...@nr_workers).each do |worker_nr|
233         @workers.values.include?(worker_nr) and next
234         @before_fork.call(self, worker_nr)
235         pid = fork { worker_loop(worker_nr) }
236         @workers[pid] = worker_nr
237       end
238     end
240     # once a client is accepted, it is processed in its entirety here
241     # in 3 easy steps: read request, call app, write app response
242     def process_client(client, client_nr)
243       env = @request.read(client) or return
244       app_response = @app.call(env)
245       HttpResponse.write(client, app_response)
246     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
247       client.closed? or client.close rescue nil
248     rescue Object => e
249       logger.error "Read error: #{e.inspect}"
250       logger.error e.backtrace.join("\n")
251     ensure
252       begin
253         client.closed? or client.close
254       rescue Object => e
255         logger.error "Client error: #{e.inspect}"
256         logger.error e.backtrace.join("\n")
257       end
258       @request.reset
259     end
261     # runs inside each forked worker, this sits around and waits
262     # for connections and doesn't die until the parent dies
263     def worker_loop(worker_nr)
264       # allow @after_fork to override these signals:
265       %w(USR1 USR2 HUP).each { |sig| trap(sig, 'IGNORE') }
266       @after_fork.call(self, worker_nr) if @after_fork
268       if defined?(Fcntl::FD_CLOEXEC)
269         @listeners.each { |s| s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
270       end
271       nr_before = nr = 0
272       client = nil
273       alive = true
274       ready = @listeners
275       %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown
276       trap('QUIT') do
277         alive = false
278         @listeners.each { |sock| sock.close rescue nil } # break IO.select
279       end
281       while alive && @master_pid == ppid
282         begin
283           nr_before = nr
284           ready.each do |sock|
285             begin
286               client = begin
287                 sock.accept_nonblock
288               rescue Errno::EAGAIN
289                 next
290               end
291               client.sync = true
292               client.nonblock = false
293               set_client_sockopt(client) if client.class == TCPSocket
294               nr += 1
295               process_client(client, nr)
296             rescue Errno::ECONNABORTED
297               # client closed the socket even before accept
298               if client && !client.closed?
299                 client.close rescue nil
300               end
301             end
302           end
304           # make the following bet: if we accepted clients this round,
305           # we're probably reasonably busy, so avoid calling select(2)
306           # and try to do a blind non-blocking accept(2) on everything
307           # before we sleep again in select
308           if nr != nr_before
309             ready = @listeners
310           else
311             begin
312               # timeout used so we can detect parent death:
313               ret = IO.select(@listeners, nil, nil, @timeout) or next
314               ready = ret[0]
315             rescue Errno::EBADF => e
316               exit(alive ? 1 : 0)
317             end
318           end
319         rescue SystemExit => e
320           exit(e.status)
321         rescue Object => e
322           if alive
323             logger.error "Unhandled listen loop exception #{e.inspect}."
324             logger.error e.backtrace.join("\n")
325           end
326         end
327       end
328     end
330     # delivers a signal to each worker
331     def kill_each_worker(signal)
332       @workers.keys.each do |pid|
333         begin
334           Process.kill(signal, pid)
335         rescue Errno::ESRCH
336           @workers.delete(pid)
337         end
338       end
339     end
341   end