Allow overriding :directory or via -C/--directory
[unicorn.git] / lib / unicorn.rb
blobaad4c3d7c709965d6ffa7db6eb9194952a3875dd
1 require 'logger'
3 require 'unicorn/socket'
4 require 'unicorn/const'
5 require 'unicorn/http_request'
6 require 'unicorn/http_response'
7 require 'unicorn/configurator'
9 # Unicorn module containing all of the classes (include C extensions) for running
10 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
11 # functionality to service web application requests fast as possible.
12 module Unicorn
13   class << self
14     def run(app, options = {})
15       HttpServer.new(app, options).start.join
16     end
17   end
19   # This is the process manager of Unicorn. This manages worker
20   # processes which in turn handle the I/O and application process.
21   # Listener sockets are started in the master process and shared with
22   # forked worker children.
23   class HttpServer
24     attr_reader :logger
25     include Process
26     include ::Unicorn::SocketHelper
28     DEFAULT_START_CTX = {
29       :argv => ARGV.map { |arg| arg.dup },
30       :cwd => Dir.pwd,
31       :zero => $0.dup,
32       :environ => {}.merge!(ENV),
33       :umask => File.umask,
34     }.freeze
36     Worker = Struct.new(:nr, :tempfile) unless defined?(Worker)
37     class Worker
38       # worker objects may be compared to just plain numbers
39       def ==(other_nr)
40         self.nr == other_nr
41       end
42     end
44     # Creates a working server on host:port (strange things happen if
45     # port isn't a Number).  Use HttpServer::run to start the server and
46     # HttpServer.workers.join to join the thread that's processing
47     # incoming requests on the socket.
48     def initialize(app, options = {})
49       start_ctx = options.delete(:start_ctx)
50       @start_ctx = DEFAULT_START_CTX.dup
51       @start_ctx.merge!(start_ctx) if start_ctx
52       @app = app
53       @mode = :idle
54       @master_pid = $$
55       @workers = Hash.new
56       @io_purgatory = [] # prevents IO objects in here from being GC-ed
57       @request = @rd_sig = @wr_sig = nil
58       @reexec_pid = 0
59       @config = Configurator.new(options.merge(:use_defaults => true))
60       @config.commit!(self, :skip => [:listeners, :pid])
61       @listeners = []
62     end
64     # Runs the thing.  Returns self so you can run join on it
65     def start
66       BasicSocket.do_not_reverse_lookup = true
68       # inherit sockets from parents, they need to be plain Socket objects
69       # before they become UNIXServer or TCPServer
70       inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
71         io = Socket.for_fd(fd.to_i)
72         set_server_sockopt(io)
73         @io_purgatory << io
74         logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
75         server_cast(io)
76       end
78       config_listeners = @config[:listeners].dup
79       @listeners.replace(inherited)
81       # we start out with generic Socket objects that get cast to either
82       # TCPServer or UNIXServer objects; but since the Socket objects
83       # share the same OS-level file descriptor as the higher-level *Server
84       # objects; we need to prevent Socket objects from being garbage-collected
85       config_listeners -= listener_names
86       config_listeners.each { |addr| listen(addr) }
87       listen(Const::DEFAULT_LISTENER) if @listeners.empty?
88       self.pid = @config[:pid]
89       spawn_missing_workers
90       self
91     end
93     # replaces current listener set with +listeners+.  This will
94     # close the socket if it will not exist in the new listener set
95     def listeners=(listeners)
96       cur_names = listener_names
97       set_names = listener_names(listeners)
98       dead_names = cur_names - set_names
100       @listeners.delete_if do |io|
101         if dead_names.include?(sock_name(io))
102           @io_purgatory.delete_if { |pio| pio.fileno == io.fileno }
103           destroy_safely(io)
104           true
105         else
106           false
107         end
108       end
110       (set_names - cur_names).each { |addr| listen(addr) }
111     end
113     # sets the path for the PID file of the master process
114     def pid=(path)
115       if path
116         if x = valid_pid?(path)
117           return path if @pid && path == @pid && x == $$
118           raise ArgumentError, "Already running on PID:#{x} " \
119                                "(or pid=#{path} is stale)"
120         end
121         File.open(path, 'wb') { |fp| fp.syswrite("#{$$}\n") }
122         at_exit { unlink_pid_safe(path) }
123       end
124       unlink_pid_safe(@pid) if @pid && @pid != path
125       @pid = path
126     end
128     # sets the path for running the master and worker process, useful for
129     # running and reexecuting from a symlinked path like Capistrano allows
130     def directory=(path)
131       Dir.chdir(path) if path
132       @directory = path
133     end
135     # add a given address to the +listeners+ set, idempotently
136     # Allows workers to add a private, per-process listener via the
137     # @after_fork hook.  Very useful for debugging and testing.
138     def listen(address)
139       return if String === address && listener_names.include?(address)
141       if io = bind_listen(address, @backlog)
142         if Socket == io.class
143           @io_purgatory << io
144           io = server_cast(io)
145         end
146         logger.info "#{io} listening on PID:#{$$} " \
147                     "fd=#{io.fileno} addr=#{sock_name(io)}"
148         @listeners << io
149       else
150         logger.error "adding listener failed addr=#{address} (in use)"
151         raise Errno::EADDRINUSE, address
152       end
153     end
155     # monitors children and receives signals forever
156     # (or until a termination signal is sent).  This handles signals
157     # one-at-a-time time and we'll happily drop signals in case somebody
158     # is signalling us too often.
159     def join
160       # this pipe is used to wake us up from select(2) in #join when signals
161       # are trapped.  See trap_deferred
162       @rd_sig, @wr_sig = IO.pipe unless (@rd_sig && @wr_sig)
163       @rd_sig.nonblock = @wr_sig.nonblock = true
165       reset_master
166       $0 = "unicorn master"
167       logger.info "master process ready" # test relies on this message
168       begin
169         loop do
170           reap_all_workers
171           case @mode
172           when :idle
173             murder_lazy_workers
174             spawn_missing_workers
175           when 'QUIT' # graceful shutdown
176             break
177           when 'TERM', 'INT' # immediate shutdown
178             stop(false)
179             break
180           when 'USR1' # user-defined (probably something like log reopening)
181             kill_each_worker('USR1')
182             reset_master
183           when 'USR2' # exec binary, stay alive in case something went wrong
184             reexec
185             reset_master
186           when 'HUP'
187             if @config.config_file
188               load_config!
189               reset_master
190               redo # immediate reaping since we may have QUIT workers
191             else # exec binary and exit if there's no config file
192               logger.info "config_file not present, reexecuting binary"
193               reexec
194               break
195             end
196           else
197             logger.error "master process in unknown mode: #{@mode}, resetting"
198             reset_master
199           end
200           reap_all_workers
202           ready = begin
203             IO.select([@rd_sig], nil, nil, 1) or next
204           rescue Errno::EINTR # next
205           end
206           ready[0] && ready[0][0] or next
207           begin # just consume the pipe when we're awakened, @mode is set
208             loop { @rd_sig.sysread(Const::CHUNK_SIZE) }
209           rescue Errno::EAGAIN, Errno::EINTR # next
210           end
211         end
212       rescue Errno::EINTR
213         retry
214       rescue Object => e
215         logger.error "Unhandled master loop exception #{e.inspect}."
216         logger.error e.backtrace.join("\n")
217         reset_master
218         retry
219       end
220       stop # gracefully shutdown all workers on our way out
221       logger.info "master PID:#{$$} join complete"
222     end
224     # Terminates all workers, but does not exit master process
225     def stop(graceful = true)
226       kill_each_worker(graceful ? 'QUIT' : 'TERM')
227       timeleft = @timeout
228       step = 0.2
229       reap_all_workers
230       until @workers.empty?
231         sleep(step)
232         reap_all_workers
233         (timeleft -= step) > 0 and next
234         kill_each_worker('KILL')
235       end
236     ensure
237       self.listeners = []
238     end
240     private
242     # list of signals we care about and trap in master.
243     TRAP_SIGS = %w(QUIT INT TERM USR1 USR2 HUP).map { |x| x.freeze }.freeze
245     # defer a signal for later processing in #join (master process)
246     def trap_deferred(signal)
247       trap(signal) do |sig_nr|
248         # we only handle/defer one signal at a time and ignore all others
249         # until we're ready again.  Queueing signals can lead to more bugs,
250         # and simplicity is the most important thing
251         TRAP_SIGS.each { |sig| trap(sig, 'IGNORE') }
252         if Symbol === @mode
253           @mode = signal
254           begin
255             @wr_sig.syswrite('.') # wakeup master process from IO.select
256           rescue Errno::EAGAIN
257           rescue Errno::EINTR
258             retry
259           end
260         end
261       end
262     end
265     def reset_master
266       @mode = :idle
267       TRAP_SIGS.each { |sig| trap_deferred(sig) }
268     end
270     # reaps all unreaped workers
271     def reap_all_workers
272       begin
273         loop do
274           pid = waitpid(-1, WNOHANG) or break
275           if @reexec_pid == pid
276             logger.error "reaped exec()-ed PID:#{pid} status=#{$?.exitstatus}"
277             @reexec_pid = 0
278             self.pid = @pid.chomp('.oldbin') if @pid
279           else
280             worker = @workers.delete(pid)
281             worker.tempfile.close rescue nil
282             logger.info "reaped PID:#{pid} " \
283                         "worker=#{worker.nr rescue 'unknown'} " \
284                         "status=#{$?.exitstatus}"
285           end
286         end
287       rescue Errno::ECHILD
288       end
289     end
291     # reexecutes the @start_ctx with a new binary
292     def reexec
293       if @reexec_pid > 0
294         begin
295           Process.kill(0, @reexec_pid)
296           logger.error "reexec-ed child already running PID:#{@reexec_pid}"
297           return
298         rescue Errno::ESRCH
299           @reexec_pid = 0
300         end
301       end
303       if @pid
304         old_pid = "#{@pid}.oldbin"
305         prev_pid = @pid.dup
306         begin
307           self.pid = old_pid  # clear the path for a new pid file
308         rescue ArgumentError
309           logger.error "old PID:#{valid_pid?(old_pid)} running with " \
310                        "existing pid=#{old_pid}, refusing rexec"
311           return
312         rescue Object => e
313           logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}"
314           return
315         end
316       end
318       @reexec_pid = fork do
319         @rd_sig.close if @rd_sig
320         @wr_sig.close if @wr_sig
321         @workers.values.each { |other| other.tempfile.close rescue nil }
323         ENV.replace(@start_ctx[:environ])
324         ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',')
325         File.umask(@start_ctx[:umask])
326         Dir.chdir(@cwd || @start_ctx[:cwd])
327         cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
328         logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
329         exec(*cmd)
330       end
331     end
333     # forcibly terminate all workers that haven't checked in in @timeout
334     # seconds.  The timeout is implemented using an unlinked tempfile
335     # shared between the parent process and each worker.  The worker
336     # runs File#chmod to modify the ctime of the tempfile.  If the ctime
337     # is stale for >@timeout seconds, then we'll kill the corresponding
338     # worker.
339     def murder_lazy_workers
340       now = Time.now
341       @workers.each_pair do |pid, worker|
342         (now - worker.tempfile.ctime) <= @timeout and next
343         logger.error "worker=#{worker.nr} PID:#{pid} is too old, killing"
344         kill_worker('KILL', pid) # take no prisoners for @timeout violations
345         worker.tempfile.close rescue nil
346       end
347     end
349     def spawn_missing_workers
350       return if @workers.size == @worker_processes
351       (0...@worker_processes).each do |worker_nr|
352         @workers.values.include?(worker_nr) and next
353         tempfile = Tempfile.new('') # as short as possible to save dir space
354         tempfile.unlink # don't allow other processes to find or see it
355         tempfile.sync = true
356         worker = Worker.new(worker_nr, tempfile)
357         @before_fork.call(self, worker.nr)
358         pid = fork { worker_loop(worker) }
359         @workers[pid] = worker
360       end
361     end
363     # once a client is accepted, it is processed in its entirety here
364     # in 3 easy steps: read request, call app, write app response
365     def process_client(client)
366       env = @request.read(client) or return
367       app_response = @app.call(env)
368       HttpResponse.write(client, app_response)
369     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
370       client.closed? or client.close rescue nil
371     rescue Object => e
372       logger.error "Read error: #{e.inspect}"
373       logger.error e.backtrace.join("\n")
374     ensure
375       begin
376         client.closed? or client.close
377       rescue Object => e
378         logger.error "Client error: #{e.inspect}"
379         logger.error e.backtrace.join("\n")
380       end
381       @request.reset
382     end
384     # gets rid of stuff the worker has no business keeping track of
385     # to free some resources and drops all sig handlers.
386     # traps for USR1, USR2, and HUP may be set in the @after_fork Proc
387     # by the user.
388     def init_worker_process(worker)
389       TRAP_SIGS.each { |sig| trap(sig, 'IGNORE') }
390       trap('CHLD', 'DEFAULT')
391       $0 = "unicorn worker[#{worker.nr}]"
392       @rd_sig.close if @rd_sig
393       @wr_sig.close if @wr_sig
394       @workers.values.each { |other| other.tempfile.close rescue nil }
395       @workers.clear
396       @start_ctx.clear
397       @mode = @start_ctx = @workers = @rd_sig = @wr_sig = nil
398       @listeners.each { |sock| set_cloexec(sock) }
399       ENV.delete('UNICORN_FD')
400       @after_fork.call(self, worker.nr) if @after_fork
401       @request = HttpRequest.new(logger)
402     end
404     # runs inside each forked worker, this sits around and waits
405     # for connections and doesn't die until the parent dies (or is
406     # given a INT, QUIT, or TERM signal)
407     def worker_loop(worker)
408       init_worker_process(worker)
409       nr = 0
410       tempfile = worker.tempfile
411       alive = true
412       ready = @listeners
413       client = nil
414       %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown
415       trap('QUIT') do
416         alive = false
417         @listeners.each { |sock| sock.close rescue nil } # break IO.select
418       end
420       while alive && @master_pid == ppid
421         # we're a goner in @timeout seconds anyways if tempfile.chmod
422         # breaks, so don't trap the exception.  Using fchmod() since
423         # futimes() is not available in base Ruby and I very strongly
424         # prefer temporary files to be unlinked for security,
425         # performance and reliability reasons, so utime is out.  No-op
426         # changes with chmod doesn't update ctime on all filesystems; so
427         # we increment our counter each and every time.
428         tempfile.chmod(nr += 1)
430         begin
431           accepted = false
432           ready.each do |sock|
433             begin
434               client = begin
435                 sock.accept_nonblock
436               rescue Errno::EAGAIN
437                 next
438               end
439               accepted = client.sync = true
440               client.nonblock = false
441               set_client_sockopt(client) if TCPSocket === client
442               process_client(client)
443             rescue Errno::ECONNABORTED
444               # client closed the socket even before accept
445               if client && !client.closed?
446                 client.close rescue nil
447               end
448             end
449             tempfile.chmod(nr += 1)
450           end
451           client = nil
453           # make the following bet: if we accepted clients this round,
454           # we're probably reasonably busy, so avoid calling select(2)
455           # and try to do a blind non-blocking accept(2) on everything
456           # before we sleep again in select
457           if accepted
458             ready = @listeners
459           else
460             begin
461               tempfile.chmod(nr += 1)
462               # timeout used so we can detect parent death:
463               ret = IO.select(@listeners, nil, nil, @timeout/2.0) or next
464               ready = ret[0]
465             rescue Errno::EBADF => e
466               exit(alive ? 1 : 0)
467             end
468           end
469         rescue SystemExit => e
470           exit(e.status)
471         rescue Object => e
472           if alive
473             logger.error "Unhandled listen loop exception #{e.inspect}."
474             logger.error e.backtrace.join("\n")
475           end
476         end
477       end
478     end
480     # delivers a signal to a worker and fails gracefully if the worker
481     # is no longer running.
482     def kill_worker(signal, pid)
483       begin
484         kill(signal, pid)
485       rescue Errno::ESRCH
486         worker = @workers.delete(pid) and worker.tempfile.close rescue nil
487       end
488     end
490     # delivers a signal to each worker
491     def kill_each_worker(signal)
492       @workers.keys.each { |pid| kill_worker(signal, pid) }
493     end
495     # unlinks a PID file at given +path+ if it contains the current PID
496     # useful as an at_exit handler.
497     def unlink_pid_safe(path)
498       (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
499     end
501     # returns a PID if a given path contains a non-stale PID file,
502     # nil otherwise.
503     def valid_pid?(path)
504       if File.exist?(path) && (pid = File.read(path).to_i) > 1
505         begin
506           kill(0, pid)
507           return pid
508         rescue Errno::ESRCH
509         end
510       end
511       nil
512     end
514     def load_config!
515       begin
516         logger.info "reloading config_file=#{@config.config_file}"
517         @config.reload
518         @config.commit!(self)
519         kill_each_worker('QUIT')
520         logger.info "done reloading config_file=#{@config.config_file}"
521       rescue Object => e
522         logger.error "error reloading config_file=#{@config.config_file}: " \
523                      "#{e.class} #{e.message}"
524       end
525     end
527     # returns an array of string names for the given listener array
528     def listener_names(listeners = @listeners)
529       listeners.map { |io| sock_name(io) }
530     end
532   end