Switch to autoload to defer requires
[unicorn.git] / lib / unicorn.rb
blobcb36fc8fe2db2bcc688566dca5fcb72989d8e2a1
1 require 'fcntl'
2 require 'unicorn/socket_helper'
3 autoload :Rack, 'rack'
5 # Unicorn module containing all of the classes (include C extensions) for running
6 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
7 # functionality to service web application requests fast as possible.
8 module Unicorn
9   autoload :Const, 'unicorn/const'
10   autoload :HttpRequest, 'unicorn/http_request'
11   autoload :HttpResponse, 'unicorn/http_response'
12   autoload :Configurator, 'unicorn/configurator'
13   autoload :Util, 'unicorn/util'
15   class << self
16     def run(app, options = {})
17       HttpServer.new(app, options).start.join
18     end
19   end
21   # This is the process manager of Unicorn. This manages worker
22   # processes which in turn handle the I/O and application process.
23   # Listener sockets are started in the master process and shared with
24   # forked worker children.
25   class HttpServer
26     attr_reader :logger
27     include ::Unicorn::SocketHelper
29     # prevents IO objects in here from being GC-ed
30     IO_PURGATORY = []
32     # all bound listener sockets
33     LISTENERS = []
35     # This hash maps PIDs to Workers
36     WORKERS = {}
38     # See: http://cr.yp.to/docs/selfpipe.html
39     SELF_PIPE = []
41     # signal queue used for self-piping
42     SIG_QUEUE = []
44     # We populate this at startup so we can figure out how to reexecute
45     # and upgrade the currently running instance of Unicorn
46     START_CTX = {
47       :argv => ARGV.map { |arg| arg.dup },
48       # don't rely on Dir.pwd here since it's not symlink-aware, and
49       # symlink dirs are the default with Capistrano...
50       :cwd => `/bin/sh -c pwd`.chomp("\n"),
51       :zero => $0.dup,
52     }
54     Worker = Struct.new(:nr, :tempfile) unless defined?(Worker)
55     class Worker
56       # worker objects may be compared to just plain numbers
57       def ==(other_nr)
58         self.nr == other_nr
59       end
60     end
62     # Creates a working server on host:port (strange things happen if
63     # port isn't a Number).  Use HttpServer::run to start the server and
64     # HttpServer.run.join to join the thread that's processing
65     # incoming requests on the socket.
66     def initialize(app, options = {})
67       @app = app
68       @pid = nil
69       @reexec_pid = 0
70       @init_listeners = options[:listeners] ? options[:listeners].dup : []
71       @config = Configurator.new(options.merge(:use_defaults => true))
72       @listener_opts = {}
73       @config.commit!(self, :skip => [:listeners, :pid])
74       @request = HttpRequest.new(@logger)
75     end
77     # Runs the thing.  Returns self so you can run join on it
78     def start
79       BasicSocket.do_not_reverse_lookup = true
81       # inherit sockets from parents, they need to be plain Socket objects
82       # before they become UNIXServer or TCPServer
83       inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
84         io = Socket.for_fd(fd.to_i)
85         set_server_sockopt(io, @listener_opts[sock_name(io)])
86         IO_PURGATORY << io
87         logger.info "inherited addr=#{sock_name(io)} fd=#{fd}"
88         server_cast(io)
89       end
91       config_listeners = @config[:listeners].dup
92       LISTENERS.replace(inherited)
94       # we start out with generic Socket objects that get cast to either
95       # TCPServer or UNIXServer objects; but since the Socket objects
96       # share the same OS-level file descriptor as the higher-level *Server
97       # objects; we need to prevent Socket objects from being garbage-collected
98       config_listeners -= listener_names
99       if config_listeners.empty? && LISTENERS.empty?
100         config_listeners << Unicorn::Const::DEFAULT_LISTEN
101       end
102       config_listeners.each { |addr| listen(addr) }
103       raise ArgumentError, "no listeners" if LISTENERS.empty?
104       self.pid = @config[:pid]
105       build_app! if @preload_app
106       maintain_worker_count
107       self
108     end
110     # replaces current listener set with +listeners+.  This will
111     # close the socket if it will not exist in the new listener set
112     def listeners=(listeners)
113       cur_names, dead_names = [], []
114       listener_names.each do |name|
115         if "/" == name[0..0]
116           # mark unlinked sockets as dead so we can rebind them
117           (File.socket?(name) ? cur_names : dead_names) << name
118         else
119           cur_names << name
120         end
121       end
122       set_names = listener_names(listeners)
123       dead_names += cur_names - set_names
124       dead_names.uniq!
126       LISTENERS.delete_if do |io|
127         if dead_names.include?(sock_name(io))
128           IO_PURGATORY.delete_if do |pio|
129             pio.fileno == io.fileno && (pio.close rescue nil).nil? # true
130           end
131           (io.close rescue nil).nil? # true
132         else
133           set_server_sockopt(io, @listener_opts[sock_name(io)])
134           false
135         end
136       end
138       (set_names - cur_names).each { |addr| listen(addr) }
139     end
141     def stdout_path=(path); redirect_io($stdout, path); end
142     def stderr_path=(path); redirect_io($stderr, path); end
144     # sets the path for the PID file of the master process
145     def pid=(path)
146       if path
147         if x = valid_pid?(path)
148           return path if @pid && path == @pid && x == $$
149           raise ArgumentError, "Already running on PID:#{x} " \
150                                "(or pid=#{path} is stale)"
151         end
152       end
153       unlink_pid_safe(@pid) if @pid
154       File.open(path, 'wb') { |fp| fp.syswrite("#$$\n") } if path
155       @pid = path
156     end
158     # add a given address to the +listeners+ set, idempotently
159     # Allows workers to add a private, per-process listener via the
160     # @after_fork hook.  Very useful for debugging and testing.
161     def listen(address, opt = {}.merge(@listener_opts[address] || {}))
162       return if String === address && listener_names.include?(address)
164       if io = bind_listen(address, opt)
165         unless TCPServer === io || UNIXServer === io
166           IO_PURGATORY << io
167           io = server_cast(io)
168         end
169         logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
170         LISTENERS << io
171       else
172         logger.error "adding listener failed addr=#{address} (in use)"
173         raise Errno::EADDRINUSE, address
174       end
175     end
177     # monitors children and receives signals forever
178     # (or until a termination signal is sent).  This handles signals
179     # one-at-a-time time and we'll happily drop signals in case somebody
180     # is signalling us too often.
181     def join
182       # this pipe is used to wake us up from select(2) in #join when signals
183       # are trapped.  See trap_deferred
184       init_self_pipe!
185       respawn = true
187       QUEUE_SIGS.each { |sig| trap_deferred(sig) }
188       trap(:CHLD) { |sig_nr| awaken_master }
189       proc_name 'master'
190       logger.info "master process ready" # test_exec.rb relies on this message
191       begin
192         loop do
193           reap_all_workers
194           case SIG_QUEUE.shift
195           when nil
196             murder_lazy_workers
197             maintain_worker_count if respawn
198             master_sleep
199           when :QUIT # graceful shutdown
200             break
201           when :TERM, :INT # immediate shutdown
202             stop(false)
203             break
204           when :USR1 # rotate logs
205             logger.info "master reopening logs..."
206             Unicorn::Util.reopen_logs
207             logger.info "master done reopening logs"
208             kill_each_worker(:USR1)
209           when :USR2 # exec binary, stay alive in case something went wrong
210             reexec
211           when :WINCH
212             if Process.ppid == 1 || Process.getpgrp != $$
213               respawn = false
214               logger.info "gracefully stopping all workers"
215               kill_each_worker(:QUIT)
216             else
217               logger.info "SIGWINCH ignored because we're not daemonized"
218             end
219           when :TTIN
220             @worker_processes += 1
221           when :TTOU
222             @worker_processes -= 1 if @worker_processes > 0
223           when :HUP
224             respawn = true
225             if @config.config_file
226               load_config!
227               redo # immediate reaping since we may have QUIT workers
228             else # exec binary and exit if there's no config file
229               logger.info "config_file not present, reexecuting binary"
230               reexec
231               break
232             end
233           end
234         end
235       rescue Errno::EINTR
236         retry
237       rescue Object => e
238         logger.error "Unhandled master loop exception #{e.inspect}."
239         logger.error e.backtrace.join("\n")
240         retry
241       end
242       stop # gracefully shutdown all workers on our way out
243       logger.info "master complete"
244       unlink_pid_safe(@pid) if @pid
245     end
247     # Terminates all workers, but does not exit master process
248     def stop(graceful = true)
249       kill_each_worker(graceful ? :QUIT : :TERM)
250       timeleft = @timeout
251       step = 0.2
252       reap_all_workers
253       until WORKERS.empty?
254         sleep(step)
255         reap_all_workers
256         (timeleft -= step) > 0 and next
257         kill_each_worker(:KILL)
258       end
259     ensure
260       self.listeners = []
261     end
263     private
265     # list of signals we care about and trap in master.
266     QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP,
267                    :TTIN, :TTOU ].freeze
269     # defer a signal for later processing in #join (master process)
270     def trap_deferred(signal)
271       trap(signal) do |sig_nr|
272         if SIG_QUEUE.size < 5
273           SIG_QUEUE << signal
274           awaken_master
275         else
276           logger.error "ignoring SIG#{signal}, queue=#{SIG_QUEUE.inspect}"
277         end
278       end
279     end
281     # wait for a signal hander to wake us up and then consume the pipe
282     # Wake up every second anyways to run murder_lazy_workers
283     def master_sleep
284       begin
285         ready = IO.select([SELF_PIPE.first], nil, nil, 1) or return
286         ready.first && ready.first.first or return
287         loop { SELF_PIPE.first.read_nonblock(Const::CHUNK_SIZE) }
288       rescue Errno::EAGAIN, Errno::EINTR
289       end
290     end
292     def awaken_master
293       begin
294         SELF_PIPE.last.write_nonblock('.') # wakeup master process from select
295       rescue Errno::EAGAIN, Errno::EINTR
296         # pipe is full, master should wake up anyways
297         retry
298       end
299     end
301     # reaps all unreaped workers
302     def reap_all_workers
303       begin
304         loop do
305           pid, status = Process.waitpid2(-1, Process::WNOHANG)
306           pid or break
307           if @reexec_pid == pid
308             logger.error "reaped #{status.inspect} exec()-ed"
309             @reexec_pid = 0
310             self.pid = @pid.chomp('.oldbin') if @pid
311             proc_name 'master'
312           else
313             worker = WORKERS.delete(pid)
314             worker.tempfile.close rescue nil
315             logger.info "reaped #{status.inspect} " \
316                         "worker=#{worker.nr rescue 'unknown'}"
317           end
318         end
319       rescue Errno::ECHILD
320       end
321     end
323     # reexecutes the START_CTX with a new binary
324     def reexec
325       if @reexec_pid > 0
326         begin
327           Process.kill(0, @reexec_pid)
328           logger.error "reexec-ed child already running PID:#{@reexec_pid}"
329           return
330         rescue Errno::ESRCH
331           @reexec_pid = 0
332         end
333       end
335       if @pid
336         old_pid = "#{@pid}.oldbin"
337         prev_pid = @pid.dup
338         begin
339           self.pid = old_pid  # clear the path for a new pid file
340         rescue ArgumentError
341           logger.error "old PID:#{valid_pid?(old_pid)} running with " \
342                        "existing pid=#{old_pid}, refusing rexec"
343           return
344         rescue Object => e
345           logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}"
346           return
347         end
348       end
350       @reexec_pid = fork do
351         listener_fds = LISTENERS.map { |sock| sock.fileno }
352         ENV['UNICORN_FD'] = listener_fds.join(',')
353         Dir.chdir(START_CTX[:cwd])
354         cmd = [ START_CTX[:zero] ] + START_CTX[:argv]
356         # avoid leaking FDs we don't know about, but let before_exec
357         # unset FD_CLOEXEC, if anything else in the app eventually
358         # relies on FD inheritence.
359         (3..1024).each do |io|
360           next if listener_fds.include?(io)
361           io = IO.for_fd(io) rescue nil
362           io or next
363           IO_PURGATORY << io
364           io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
365         end
366         logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
367         @before_exec.call(self)
368         exec(*cmd)
369       end
370       proc_name 'master (old)'
371     end
373     # forcibly terminate all workers that haven't checked in in @timeout
374     # seconds.  The timeout is implemented using an unlinked tempfile
375     # shared between the parent process and each worker.  The worker
376     # runs File#chmod to modify the ctime of the tempfile.  If the ctime
377     # is stale for >@timeout seconds, then we'll kill the corresponding
378     # worker.
379     def murder_lazy_workers
380       WORKERS.each_pair do |pid, worker|
381         stat = worker.tempfile.stat
382         stat.mode == 0100000 and next
383         Time.now - stat.ctime <= @timeout and next
384         logger.error "worker=#{worker.nr} PID:#{pid} is too old, killing"
385         kill_worker(:KILL, pid) # take no prisoners for @timeout violations
386         worker.tempfile.close rescue nil
387       end
388     end
390     def spawn_missing_workers
391       (0...@worker_processes).each do |worker_nr|
392         WORKERS.values.include?(worker_nr) and next
393         begin
394           Dir.chdir(START_CTX[:cwd])
395         rescue Errno::ENOENT => err
396           logger.fatal "#{err.inspect} (#{START_CTX[:cwd]})"
397           SIG_QUEUE << :QUIT # forcibly emulate SIGQUIT
398           return
399         end
400         tempfile = Tempfile.new(nil) # as short as possible to save dir space
401         tempfile.unlink # don't allow other processes to find or see it
402         worker = Worker.new(worker_nr, tempfile)
403         @before_fork.call(self, worker)
404         pid = fork { worker_loop(worker) }
405         WORKERS[pid] = worker
406       end
407     end
409     def maintain_worker_count
410       (off = WORKERS.size - @worker_processes) == 0 and return
411       off < 0 and return spawn_missing_workers
412       WORKERS.each_pair { |pid,w|
413         w.nr >= @worker_processes and kill_worker(:QUIT, pid) rescue nil
414       }
415     end
417     # once a client is accepted, it is processed in its entirety here
418     # in 3 easy steps: read request, call app, write app response
419     def process_client(client)
420       HttpResponse.write(client, @app.call(@request.read(client)))
421     # if we get any error, try to write something back to the client
422     # assuming we haven't closed the socket, but don't get hung up
423     # if the socket is already closed or broken.  We'll always ensure
424     # the socket is closed at the end of this function
425     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
426       client.write_nonblock(Const::ERROR_500_RESPONSE) rescue nil
427       client.close rescue nil
428     rescue HttpParserError # try to tell the client they're bad
429       client.write_nonblock(Const::ERROR_400_RESPONSE) rescue nil
430       client.close rescue nil
431     rescue Object => e
432       client.write_nonblock(Const::ERROR_500_RESPONSE) rescue nil
433       client.close rescue nil
434       logger.error "Read error: #{e.inspect}"
435       logger.error e.backtrace.join("\n")
436     end
438     # gets rid of stuff the worker has no business keeping track of
439     # to free some resources and drops all sig handlers.
440     # traps for USR1, USR2, and HUP may be set in the @after_fork Proc
441     # by the user.
442     def init_worker_process(worker)
443       QUEUE_SIGS.each { |sig| trap(sig, 'IGNORE') }
444       trap(:CHLD, 'DEFAULT')
445       SIG_QUEUE.clear
446       proc_name "worker[#{worker.nr}]"
447       START_CTX.clear
448       init_self_pipe!
449       WORKERS.values.each { |other| other.tempfile.close! rescue nil }
450       WORKERS.clear
451       LISTENERS.each { |sock| sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
452       worker.tempfile.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
453       @after_fork.call(self, worker) # can drop perms
454       @timeout /= 2.0 # halve it for select()
455       build_app! unless @config[:preload_app]
456     end
458     def reopen_worker_logs(worker_nr)
459       @logger.info "worker=#{worker_nr} reopening logs..."
460       Unicorn::Util.reopen_logs
461       @logger.info "worker=#{worker_nr} done reopening logs"
462       init_self_pipe!
463     end
465     # runs inside each forked worker, this sits around and waits
466     # for connections and doesn't die until the parent dies (or is
467     # given a INT, QUIT, or TERM signal)
468     def worker_loop(worker)
469       master_pid = Process.ppid # slightly racy, but less memory usage
470       init_worker_process(worker)
471       nr = 0 # this becomes negative if we need to reopen logs
472       alive = worker.tempfile # tempfile is our lifeline to the master process
473       ready = LISTENERS
474       t = ti = 0
476       # closing anything we IO.select on will raise EBADF
477       trap(:USR1) { nr = -65536; SELF_PIPE.first.close rescue nil }
478       trap(:QUIT) { alive = nil; LISTENERS.each { |s| s.close rescue nil } }
479       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
480       @logger.info "worker=#{worker.nr} ready"
482       begin
483         nr < 0 and reopen_worker_logs(worker.nr)
484         nr = 0
486         # we're a goner in @timeout seconds anyways if alive.chmod
487         # breaks, so don't trap the exception.  Using fchmod() since
488         # futimes() is not available in base Ruby and I very strongly
489         # prefer temporary files to be unlinked for security,
490         # performance and reliability reasons, so utime is out.  No-op
491         # changes with chmod doesn't update ctime on all filesystems; so
492         # we change our counter each and every time (after process_client
493         # and before IO.select).
494         t == (ti = Time.now.to_i) or alive.chmod(t = ti)
496         ready.each do |sock|
497           begin
498             process_client(sock.accept_nonblock)
499             nr += 1
500             t == (ti = Time.now.to_i) or alive.chmod(t = ti)
501           rescue Errno::EAGAIN, Errno::ECONNABORTED
502           end
503           break if nr < 0
504         end
506         # make the following bet: if we accepted clients this round,
507         # we're probably reasonably busy, so avoid calling select()
508         # and do a speculative accept_nonblock on every listener
509         # before we sleep again in select().
510         redo unless nr == 0 # (nr < 0) => reopen logs
512         master_pid == Process.ppid or return
513         alive.chmod(t = 0)
514         begin
515           # timeout used so we can detect parent death:
516           ret = IO.select(LISTENERS, nil, SELF_PIPE, @timeout) or redo
517           ready = ret.first
518         rescue Errno::EINTR
519           ready = LISTENERS
520         rescue Errno::EBADF
521           nr < 0 or return
522         end
523       rescue Object => e
524         if alive
525           logger.error "Unhandled listen loop exception #{e.inspect}."
526           logger.error e.backtrace.join("\n")
527         end
528       end while alive
529     end
531     # delivers a signal to a worker and fails gracefully if the worker
532     # is no longer running.
533     def kill_worker(signal, pid)
534       begin
535         Process.kill(signal, pid)
536       rescue Errno::ESRCH
537         worker = WORKERS.delete(pid) and worker.tempfile.close rescue nil
538       end
539     end
541     # delivers a signal to each worker
542     def kill_each_worker(signal)
543       WORKERS.keys.each { |pid| kill_worker(signal, pid) }
544     end
546     # unlinks a PID file at given +path+ if it contains the current PID
547     # useful as an at_exit handler.
548     def unlink_pid_safe(path)
549       (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
550     end
552     # returns a PID if a given path contains a non-stale PID file,
553     # nil otherwise.
554     def valid_pid?(path)
555       if File.exist?(path) && (pid = File.read(path).to_i) > 1
556         begin
557           Process.kill(0, pid)
558           return pid
559         rescue Errno::ESRCH
560         end
561       end
562       nil
563     end
565     def load_config!
566       begin
567         logger.info "reloading config_file=#{@config.config_file}"
568         @config[:listeners].replace(@init_listeners)
569         @config.reload
570         @config.commit!(self)
571         kill_each_worker(:QUIT)
572         Unicorn::Util.reopen_logs
573         logger.info "done reloading config_file=#{@config.config_file}"
574       rescue Object => e
575         logger.error "error reloading config_file=#{@config.config_file}: " \
576                      "#{e.class} #{e.message}"
577       end
578     end
580     # returns an array of string names for the given listener array
581     def listener_names(listeners = LISTENERS)
582       listeners.map { |io| sock_name(io) }
583     end
585     def build_app!
586       if @app.respond_to?(:arity) && @app.arity == 0
587         if defined?(Gem) && Gem.respond_to?(:refresh)
588           logger.info "Refreshing Gem list"
589           Gem.refresh
590         end
591         @app = @app.call
592       end
593     end
595     def proc_name(tag)
596       $0 = ([ File.basename(START_CTX[:zero]), tag ] +
597               START_CTX[:argv]).join(' ')
598     end
600     def redirect_io(io, path)
601       File.open(path, 'a') { |fp| io.reopen(fp) } if path
602       io.sync = true
603     end
605     def init_self_pipe!
606       SELF_PIPE.each { |io| io.close rescue nil }
607       SELF_PIPE.replace(IO.pipe)
608       SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
609     end
611   end