symlink-aware start_ctx[:cwd]
[unicorn.git] / lib / unicorn.rb
blob90fd29dea903b8232931616ac1865a4840cd2934
1 require 'logger'
3 require 'unicorn/socket'
4 require 'unicorn/const'
5 require 'unicorn/http_request'
6 require 'unicorn/http_response'
7 require 'unicorn/configurator'
9 # Unicorn module containing all of the classes (include C extensions) for running
10 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
11 # functionality to service web application requests fast as possible.
12 module Unicorn
13   class << self
14     def run(app, options = {})
15       HttpServer.new(app, options).start.join
16     end
17   end
19   # This is the process manager of Unicorn. This manages worker
20   # processes which in turn handle the I/O and application process.
21   # Listener sockets are started in the master process and shared with
22   # forked worker children.
23   class HttpServer
24     attr_reader :logger
25     include Process
26     include ::Unicorn::SocketHelper
28     DEFAULT_START_CTX = {
29       :argv => ARGV.map { |arg| arg.dup },
30       # don't rely on Dir.pwd here since it's not symlink-aware, and
31       # symlink dirs are the default with Capistrano...
32       :cwd => `/bin/sh -c pwd`.chomp("\n"),
33       :zero => $0.dup,
34       :environ => {}.merge!(ENV),
35       :umask => File.umask,
36     }.freeze
38     Worker = Struct.new(:nr, :tempfile) unless defined?(Worker)
39     class Worker
40       # worker objects may be compared to just plain numbers
41       def ==(other_nr)
42         self.nr == other_nr
43       end
44     end
46     # Creates a working server on host:port (strange things happen if
47     # port isn't a Number).  Use HttpServer::run to start the server and
48     # HttpServer.workers.join to join the thread that's processing
49     # incoming requests on the socket.
50     def initialize(app, options = {})
51       start_ctx = options.delete(:start_ctx)
52       @start_ctx = DEFAULT_START_CTX.dup
53       @start_ctx.merge!(start_ctx) if start_ctx
54       @app = app
55       @mode = :idle
56       @master_pid = $$
57       @workers = Hash.new
58       @io_purgatory = [] # prevents IO objects in here from being GC-ed
59       @request = @rd_sig = @wr_sig = nil
60       @reexec_pid = 0
61       @config = Configurator.new(options.merge(:use_defaults => true))
62       @config.commit!(self, :skip => [:listeners, :pid])
63       @listeners = []
64     end
66     # Runs the thing.  Returns self so you can run join on it
67     def start
68       BasicSocket.do_not_reverse_lookup = true
70       # inherit sockets from parents, they need to be plain Socket objects
71       # before they become UNIXServer or TCPServer
72       inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
73         io = Socket.for_fd(fd.to_i)
74         set_server_sockopt(io)
75         @io_purgatory << io
76         logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
77         server_cast(io)
78       end
80       config_listeners = @config[:listeners].dup
81       @listeners.replace(inherited)
83       # we start out with generic Socket objects that get cast to either
84       # TCPServer or UNIXServer objects; but since the Socket objects
85       # share the same OS-level file descriptor as the higher-level *Server
86       # objects; we need to prevent Socket objects from being garbage-collected
87       config_listeners -= listener_names
88       config_listeners.each { |addr| listen(addr) }
89       listen(Const::DEFAULT_LISTENER) if @listeners.empty?
90       self.pid = @config[:pid]
91       build_app!
92       spawn_missing_workers
93       self
94     end
96     # replaces current listener set with +listeners+.  This will
97     # close the socket if it will not exist in the new listener set
98     def listeners=(listeners)
99       cur_names = listener_names
100       set_names = listener_names(listeners)
101       dead_names = cur_names - set_names
103       @listeners.delete_if do |io|
104         if dead_names.include?(sock_name(io))
105           @io_purgatory.delete_if { |pio| pio.fileno == io.fileno }
106           destroy_safely(io)
107           true
108         else
109           false
110         end
111       end
113       (set_names - cur_names).each { |addr| listen(addr) }
114     end
116     # sets the path for the PID file of the master process
117     def pid=(path)
118       if path
119         if x = valid_pid?(path)
120           return path if @pid && path == @pid && x == $$
121           raise ArgumentError, "Already running on PID:#{x} " \
122                                "(or pid=#{path} is stale)"
123         end
124         File.open(path, 'wb') { |fp| fp.syswrite("#{$$}\n") }
125       end
126       unlink_pid_safe(@pid) if @pid && @pid != path
127       @pid = path
128     end
130     # sets the path for running the master and worker process, useful for
131     # running and reexecuting from a symlinked path like Capistrano allows
132     def directory=(path)
133       Dir.chdir(path) if path
134       @directory = path
135     end
137     # add a given address to the +listeners+ set, idempotently
138     # Allows workers to add a private, per-process listener via the
139     # @after_fork hook.  Very useful for debugging and testing.
140     def listen(address)
141       return if String === address && listener_names.include?(address)
143       if io = bind_listen(address, @backlog)
144         if Socket == io.class
145           @io_purgatory << io
146           io = server_cast(io)
147         end
148         logger.info "#{io} listening on PID:#{$$} " \
149                     "fd=#{io.fileno} addr=#{sock_name(io)}"
150         @listeners << io
151       else
152         logger.error "adding listener failed addr=#{address} (in use)"
153         raise Errno::EADDRINUSE, address
154       end
155     end
157     # monitors children and receives signals forever
158     # (or until a termination signal is sent).  This handles signals
159     # one-at-a-time time and we'll happily drop signals in case somebody
160     # is signalling us too often.
161     def join
162       # this pipe is used to wake us up from select(2) in #join when signals
163       # are trapped.  See trap_deferred
164       @rd_sig, @wr_sig = IO.pipe unless (@rd_sig && @wr_sig)
165       @rd_sig.nonblock = @wr_sig.nonblock = true
167       reset_master
168       $0 = "unicorn master"
169       logger.info "master process ready" # test relies on this message
170       begin
171         loop do
172           reap_all_workers
173           case @mode
174           when :idle
175             murder_lazy_workers
176             spawn_missing_workers
177           when 'QUIT' # graceful shutdown
178             break
179           when 'TERM', 'INT' # immediate shutdown
180             stop(false)
181             break
182           when 'USR1' # user-defined (probably something like log reopening)
183             kill_each_worker('USR1')
184             reset_master
185           when 'USR2' # exec binary, stay alive in case something went wrong
186             reexec
187             reset_master
188           when 'HUP'
189             if @config.config_file
190               load_config!
191               reset_master
192               redo # immediate reaping since we may have QUIT workers
193             else # exec binary and exit if there's no config file
194               logger.info "config_file not present, reexecuting binary"
195               reexec
196               break
197             end
198           else
199             logger.error "master process in unknown mode: #{@mode}, resetting"
200             reset_master
201           end
202           reap_all_workers
204           ready = begin
205             IO.select([@rd_sig], nil, nil, 1) or next
206           rescue Errno::EINTR # next
207           end
208           ready[0] && ready[0][0] or next
209           begin # just consume the pipe when we're awakened, @mode is set
210             loop { @rd_sig.sysread(Const::CHUNK_SIZE) }
211           rescue Errno::EAGAIN, Errno::EINTR # next
212           end
213         end
214       rescue Errno::EINTR
215         retry
216       rescue Object => e
217         logger.error "Unhandled master loop exception #{e.inspect}."
218         logger.error e.backtrace.join("\n")
219         reset_master
220         retry
221       end
222       stop # gracefully shutdown all workers on our way out
223       logger.info "master PID:#{$$} join complete"
224       unlink_pid_safe(@pid) if @pid
225     end
227     # Terminates all workers, but does not exit master process
228     def stop(graceful = true)
229       kill_each_worker(graceful ? 'QUIT' : 'TERM')
230       timeleft = @timeout
231       step = 0.2
232       reap_all_workers
233       until @workers.empty?
234         sleep(step)
235         reap_all_workers
236         (timeleft -= step) > 0 and next
237         kill_each_worker('KILL')
238       end
239     ensure
240       self.listeners = []
241     end
243     private
245     # list of signals we care about and trap in master.
246     TRAP_SIGS = %w(QUIT INT TERM USR1 USR2 HUP).map { |x| x.freeze }.freeze
248     # defer a signal for later processing in #join (master process)
249     def trap_deferred(signal)
250       trap(signal) do |sig_nr|
251         # we only handle/defer one signal at a time and ignore all others
252         # until we're ready again.  Queueing signals can lead to more bugs,
253         # and simplicity is the most important thing
254         TRAP_SIGS.each { |sig| trap(sig, 'IGNORE') }
255         if Symbol === @mode
256           @mode = signal
257           begin
258             @wr_sig.syswrite('.') # wakeup master process from IO.select
259           rescue Errno::EAGAIN
260           rescue Errno::EINTR
261             retry
262           end
263         end
264       end
265     end
268     def reset_master
269       @mode = :idle
270       TRAP_SIGS.each { |sig| trap_deferred(sig) }
271     end
273     # reaps all unreaped workers
274     def reap_all_workers
275       begin
276         loop do
277           pid = waitpid(-1, WNOHANG) or break
278           if @reexec_pid == pid
279             logger.error "reaped exec()-ed PID:#{pid} status=#{$?.exitstatus}"
280             @reexec_pid = 0
281             self.pid = @pid.chomp('.oldbin') if @pid
282           else
283             worker = @workers.delete(pid)
284             worker.tempfile.close rescue nil
285             logger.info "reaped PID:#{pid} " \
286                         "worker=#{worker.nr rescue 'unknown'} " \
287                         "status=#{$?.exitstatus}"
288           end
289         end
290       rescue Errno::ECHILD
291       end
292     end
294     # reexecutes the @start_ctx with a new binary
295     def reexec
296       if @reexec_pid > 0
297         begin
298           Process.kill(0, @reexec_pid)
299           logger.error "reexec-ed child already running PID:#{@reexec_pid}"
300           return
301         rescue Errno::ESRCH
302           @reexec_pid = 0
303         end
304       end
306       if @pid
307         old_pid = "#{@pid}.oldbin"
308         prev_pid = @pid.dup
309         begin
310           self.pid = old_pid  # clear the path for a new pid file
311         rescue ArgumentError
312           logger.error "old PID:#{valid_pid?(old_pid)} running with " \
313                        "existing pid=#{old_pid}, refusing rexec"
314           return
315         rescue Object => e
316           logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}"
317           return
318         end
319       end
321       @reexec_pid = fork do
322         @rd_sig.close if @rd_sig
323         @wr_sig.close if @wr_sig
324         @workers.values.each { |other| other.tempfile.close rescue nil }
326         ENV.replace(@start_ctx[:environ])
327         ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',')
328         File.umask(@start_ctx[:umask])
329         Dir.chdir(@directory || @start_ctx[:cwd])
330         cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
331         logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
332         exec(*cmd)
333       end
334     end
336     # forcibly terminate all workers that haven't checked in in @timeout
337     # seconds.  The timeout is implemented using an unlinked tempfile
338     # shared between the parent process and each worker.  The worker
339     # runs File#chmod to modify the ctime of the tempfile.  If the ctime
340     # is stale for >@timeout seconds, then we'll kill the corresponding
341     # worker.
342     def murder_lazy_workers
343       now = Time.now
344       @workers.each_pair do |pid, worker|
345         (now - worker.tempfile.ctime) <= @timeout and next
346         logger.error "worker=#{worker.nr} PID:#{pid} is too old, killing"
347         kill_worker('KILL', pid) # take no prisoners for @timeout violations
348         worker.tempfile.close rescue nil
349       end
350     end
352     def spawn_missing_workers
353       return if @workers.size == @worker_processes
354       (0...@worker_processes).each do |worker_nr|
355         @workers.values.include?(worker_nr) and next
356         tempfile = Tempfile.new('') # as short as possible to save dir space
357         tempfile.unlink # don't allow other processes to find or see it
358         tempfile.sync = true
359         worker = Worker.new(worker_nr, tempfile)
360         @before_fork.call(self, worker.nr)
361         pid = fork { worker_loop(worker) }
362         @workers[pid] = worker
363       end
364     end
366     # once a client is accepted, it is processed in its entirety here
367     # in 3 easy steps: read request, call app, write app response
368     def process_client(client)
369       env = @request.read(client) or return
370       app_response = @app.call(env)
371       HttpResponse.write(client, app_response)
372     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
373       client.closed? or client.close rescue nil
374     rescue Object => e
375       logger.error "Read error: #{e.inspect}"
376       logger.error e.backtrace.join("\n")
377     ensure
378       begin
379         client.closed? or client.close
380       rescue Object => e
381         logger.error "Client error: #{e.inspect}"
382         logger.error e.backtrace.join("\n")
383       end
384       @request.reset
385     end
387     # gets rid of stuff the worker has no business keeping track of
388     # to free some resources and drops all sig handlers.
389     # traps for USR1, USR2, and HUP may be set in the @after_fork Proc
390     # by the user.
391     def init_worker_process(worker)
392       TRAP_SIGS.each { |sig| trap(sig, 'IGNORE') }
393       trap('CHLD', 'DEFAULT')
394       $0 = "unicorn worker[#{worker.nr}]"
395       @rd_sig.close if @rd_sig
396       @wr_sig.close if @wr_sig
397       @workers.values.each { |other| other.tempfile.close rescue nil }
398       @workers.clear
399       @start_ctx.clear
400       @mode = @start_ctx = @workers = @rd_sig = @wr_sig = nil
401       @listeners.each { |sock| set_cloexec(sock) }
402       ENV.delete('UNICORN_FD')
403       @after_fork.call(self, worker.nr) if @after_fork
404       @request = HttpRequest.new(logger)
405     end
407     # runs inside each forked worker, this sits around and waits
408     # for connections and doesn't die until the parent dies (or is
409     # given a INT, QUIT, or TERM signal)
410     def worker_loop(worker)
411       init_worker_process(worker)
412       nr = 0
413       tempfile = worker.tempfile
414       alive = true
415       ready = @listeners
416       client = nil
417       %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown
418       trap('QUIT') do
419         alive = false
420         @listeners.each { |sock| sock.close rescue nil } # break IO.select
421       end
423       while alive && @master_pid == ppid
424         # we're a goner in @timeout seconds anyways if tempfile.chmod
425         # breaks, so don't trap the exception.  Using fchmod() since
426         # futimes() is not available in base Ruby and I very strongly
427         # prefer temporary files to be unlinked for security,
428         # performance and reliability reasons, so utime is out.  No-op
429         # changes with chmod doesn't update ctime on all filesystems; so
430         # we increment our counter each and every time.
431         tempfile.chmod(nr += 1)
433         begin
434           accepted = false
435           ready.each do |sock|
436             begin
437               client = begin
438                 sock.accept_nonblock
439               rescue Errno::EAGAIN
440                 next
441               end
442               accepted = client.sync = true
443               client.nonblock = false
444               set_client_sockopt(client) if TCPSocket === client
445               process_client(client)
446             rescue Errno::ECONNABORTED
447               # client closed the socket even before accept
448               if client && !client.closed?
449                 client.close rescue nil
450               end
451             end
452             tempfile.chmod(nr += 1)
453           end
454           client = nil
456           # make the following bet: if we accepted clients this round,
457           # we're probably reasonably busy, so avoid calling select(2)
458           # and try to do a blind non-blocking accept(2) on everything
459           # before we sleep again in select
460           if accepted
461             ready = @listeners
462           else
463             begin
464               tempfile.chmod(nr += 1)
465               # timeout used so we can detect parent death:
466               ret = IO.select(@listeners, nil, nil, @timeout/2.0) or next
467               ready = ret[0]
468             rescue Errno::EBADF => e
469               exit(alive ? 1 : 0)
470             end
471           end
472         rescue SystemExit => e
473           exit(e.status)
474         rescue Object => e
475           if alive
476             logger.error "Unhandled listen loop exception #{e.inspect}."
477             logger.error e.backtrace.join("\n")
478           end
479         end
480       end
481     end
483     # delivers a signal to a worker and fails gracefully if the worker
484     # is no longer running.
485     def kill_worker(signal, pid)
486       begin
487         kill(signal, pid)
488       rescue Errno::ESRCH
489         worker = @workers.delete(pid) and worker.tempfile.close rescue nil
490       end
491     end
493     # delivers a signal to each worker
494     def kill_each_worker(signal)
495       @workers.keys.each { |pid| kill_worker(signal, pid) }
496     end
498     # unlinks a PID file at given +path+ if it contains the current PID
499     # useful as an at_exit handler.
500     def unlink_pid_safe(path)
501       (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
502     end
504     # returns a PID if a given path contains a non-stale PID file,
505     # nil otherwise.
506     def valid_pid?(path)
507       if File.exist?(path) && (pid = File.read(path).to_i) > 1
508         begin
509           kill(0, pid)
510           return pid
511         rescue Errno::ESRCH
512         end
513       end
514       nil
515     end
517     def load_config!
518       begin
519         logger.info "reloading config_file=#{@config.config_file}"
520         @config.reload
521         @config.commit!(self)
522         kill_each_worker('QUIT')
523         logger.info "done reloading config_file=#{@config.config_file}"
524       rescue Object => e
525         logger.error "error reloading config_file=#{@config.config_file}: " \
526                      "#{e.class} #{e.message}"
527       end
528     end
530     # returns an array of string names for the given listener array
531     def listener_names(listeners = @listeners)
532       listeners.map { |io| sock_name(io) }
533     end
535     def build_app!
536       @app = @app.call if @app.respond_to?(:arity) && @app.arity == 0
537     end
539   end