up verstion to 0.5.3 (internal testing only)
[god.git] / lib / god.rb
blob89befe760e49f58aa6a024bfb149aa268a3e05c1
1 $:.unshift File.dirname(__FILE__)     # For use/testing when no gem is installed
3 # core
4 require 'stringio'
5 require 'logger'
7 # stdlib
8 require 'syslog'
10 # internal requires
11 require 'god/errors'
12 require 'god/logger'
13 require 'god/system/process'
14 require 'god/dependency_graph'
15 require 'god/timeline'
16 require 'god/configurable'
18 require 'god/task'
20 require 'god/behavior'
21 require 'god/behaviors/clean_pid_file'
22 require 'god/behaviors/notify_when_flapping'
24 require 'god/condition'
25 require 'god/conditions/process_running'
26 require 'god/conditions/process_exits'
27 require 'god/conditions/tries'
28 require 'god/conditions/memory_usage'
29 require 'god/conditions/cpu_usage'
30 require 'god/conditions/always'
31 require 'god/conditions/lambda'
32 require 'god/conditions/degrading_lambda'
33 require 'god/conditions/flapping'
34 require 'god/conditions/http_response_code'
35 require 'god/conditions/disk_usage'
36 require 'god/conditions/complex'
38 require 'god/contact'
39 require 'god/contacts/email'
41 require 'god/socket'
42 require 'god/timer'
43 require 'god/hub'
45 require 'god/metric'
46 require 'god/watch'
48 require 'god/trigger'
49 require 'god/event_handler'
50 require 'god/registry'
51 require 'god/process'
53 require 'god/sugar'
55 $:.unshift File.join(File.dirname(__FILE__), *%w[.. ext god])
57 # App wide logging system
58 LOG = God::Logger.new
59 LOG.datetime_format = "%Y-%m-%d %H:%M:%S "
61 def applog(watch, level, text)
62   LOG.log(watch, level, text)
63 end
65 # The $run global determines whether god should be started when the
66 # program would normally end. This should be set to true if when god
67 # should be started (e.g. `god -c <config file>`) and false otherwise
68 # (e.g. `god status`)
69 $run ||= nil
71 GOD_ROOT = File.expand_path(File.join(File.dirname(__FILE__), '..'))
73 # Ensure that Syslog is open
74 begin
75   Syslog.open('god')
76 rescue RuntimeError
77   Syslog.reopen('god')
78 end
80 # Return the binding of god's root level
81 def root_binding
82   binding
83 end
85 # Load the event handler system
86 God::EventHandler.load
88 module Kernel
89   alias_method :abort_orig, :abort
90   
91   def abort(text = nil)
92     $run = false
93     applog(nil, :error, text) if text
94     # text ? abort_orig(text) : exit(1)
95     exit(1)
96   end
97   
98   alias_method :exit_orig, :exit
99   
100   def exit(code = 0)
101     $run = false
102     exit_orig(code)
103   end
106 class Module
107   def safe_attr_accessor(*args)
108     args.each do |arg|
109       define_method((arg.to_s + "=").intern) do |other|
110         if !self.running && self.inited
111           abort "God.#{arg} must be set before any Tasks are defined"
112         end
113         
114         if self.running && self.inited
115           applog(nil, :warn, "God.#{arg} can't be set while god is running")
116           return
117         end
118         
119         instance_variable_set(('@' + arg.to_s).intern, other)
120       end
121       
122       define_method(arg) do
123         instance_variable_get(('@' + arg.to_s).intern)
124       end
125     end
126   end
129 module God
130   VERSION = '0.5.3'
131   
132   LOG_BUFFER_SIZE_DEFAULT = 1000
133   PID_FILE_DIRECTORY_DEFAULT = '/var/run/god'
134   DRB_PORT_DEFAULT = 17165
135   DRB_ALLOW_DEFAULT = ['127.0.0.1']
136   
137   class << self
138     # user configurable
139     safe_attr_accessor :pid,
140                        :host,
141                        :port,
142                        :allow,
143                        :log_buffer_size,
144                        :pid_file_directory
145     
146     # internal
147     attr_accessor :inited,
148                   :running,
149                   :pending_watches,
150                   :pending_watch_states,
151                   :server,
152                   :watches,
153                   :groups,
154                   :contacts,
155                   :contact_groups
156   end
157   
158   # initialize class instance variables
159   self.pid = nil
160   self.host = nil
161   self.port = nil
162   self.allow = nil
163   self.log_buffer_size = nil
164   self.pid_file_directory = nil
165   
166   def self.internal_init
167     # only do this once
168     return if self.inited
169     
170     # variable init
171     self.watches = {}
172     self.groups = {}
173     self.pending_watches = []
174     self.pending_watch_states = {}
175     self.contacts = {}
176     self.contact_groups = {}
177     
178     # set defaults
179     self.log_buffer_size ||= LOG_BUFFER_SIZE_DEFAULT
180     self.pid_file_directory ||= PID_FILE_DIRECTORY_DEFAULT
181     self.port ||= DRB_PORT_DEFAULT
182     self.allow ||= DRB_ALLOW_DEFAULT
183     LOG.level = Logger::INFO
184     
185     # init has been executed
186     self.inited = true
187     
188     # not yet running
189     self.running = false
190   end
191   
192   # Instantiate a new, empty Watch object and pass it to the mandatory
193   # block. The attributes of the watch will be set by the configuration
194   # file.
195   def self.watch(&block)
196     self.task(Watch, &block)
197   end
198   
199   # Instantiate a new, empty Task object and pass it to the mandatory
200   # block. The attributes of the task will be set by the configuration
201   # file.
202   def self.task(klass = Task)
203     self.internal_init
204     
205     t = klass.new
206     yield(t)
207     
208     # do the post-configuration
209     t.prepare
210     
211     # if running, completely remove the watch (if necessary) to
212     # prepare for the reload
213     existing_watch = self.watches[t.name]
214     if self.running && existing_watch
215       self.pending_watch_states[existing_watch.name] = existing_watch.state
216       self.unwatch(existing_watch)
217     end
218     
219     # ensure the new watch has a unique name
220     if self.watches[t.name] || self.groups[t.name]
221       abort "Task name '#{t.name}' already used for a Task or Group"
222     end
223     
224     # ensure watch is internally valid
225     t.valid? || abort("Task '#{t.name}' is not valid (see above)")
226     
227     # add to list of watches
228     self.watches[t.name] = t
229     
230     # add to pending watches
231     self.pending_watches << t
232     
233     # add to group if specified
234     if t.group
235       # ensure group name hasn't been used for a watch already
236       if self.watches[t.group]
237         abort "Group name '#{t.group}' already used for a Task"
238       end
239       
240       self.groups[t.group] ||= []
241       self.groups[t.group] << t
242     end
243     
244     # register watch
245     t.register!
246     
247     # log
248     if self.running && existing_watch
249       applog(t, :info, "#{t.name} Reloaded config")
250     elsif self.running
251       applog(t, :info, "#{t.name} Loaded config")
252     end
253   end
254   
255   def self.unwatch(watch)
256     # unmonitor
257     watch.unmonitor unless watch.state == :unmonitored
258     
259     # unregister
260     watch.unregister!
261     
262     # remove from watches
263     self.watches.delete(watch.name)
264     
265     # remove from groups
266     if watch.group
267       self.groups[watch.group].delete(watch)
268     end
269   end
270   
271   def self.contact(kind)
272     self.internal_init
273     
274     # create the condition
275     begin
276       c = Contact.generate(kind)
277     rescue NoSuchContactError => e
278       abort e.message
279     end
280     
281     # send to block so config can set attributes
282     yield(c) if block_given?
283     
284     # call prepare on the contact
285     c.prepare
286     
287     # remove existing contacts of same name
288     existing_contact = self.contacts[c.name]
289     if self.running && existing_contact
290       self.uncontact(existing_contact)
291     end
292     
293     # ensure the new contact has a unique name
294     if self.contacts[c.name] || self.contact_groups[c.name]
295       abort "Contact name '#{c.name}' already used for a Contact or Contact Group"
296     end
297     
298     # abort if the Contact is invalid, the Contact will have printed
299     # out its own error messages by now
300     unless Contact.valid?(c) && c.valid?
301       abort "Exiting on invalid contact"
302     end
303     
304     # add to list of contacts
305     self.contacts[c.name] = c
306     
307     # add to contact group if specified
308     if c.group
309       # ensure group name hasn't been used for a contact already
310       if self.contacts[c.group]
311         abort "Contact Group name '#{c.group}' already used for a Contact"
312       end
313       
314       self.contact_groups[c.group] ||= []
315       self.contact_groups[c.group] << c
316     end
317   end
318   
319   def self.uncontact(contact)
320     self.contacts.delete(contact.name)
321     if contact.group
322       self.contact_groups[contact.group].delete(contact)
323     end
324   end
325     
326   def self.control(name, command)
327     # get the list of watches
328     watches = Array(self.watches[name] || self.groups[name])
329     
330     jobs = []
331     
332     # do the command
333     case command
334       when "start", "monitor"
335         watches.each { |w| jobs << Thread.new { w.monitor if w.state != :up } }
336       when "restart"
337         watches.each { |w| jobs << Thread.new { w.move(:restart) } }
338       when "stop"
339         watches.each { |w| jobs << Thread.new { w.unmonitor.action(:stop) if w.state != :unmonitored } }
340       when "unmonitor"
341         watches.each { |w| jobs << Thread.new { w.unmonitor if w.state != :unmonitored } }
342       else
343         raise InvalidCommandError.new
344     end
345     
346     jobs.each { |j| j.join }
347     
348     watches.map { |x| x.name }
349   end
350   
351   def self.stop_all
352     self.watches.sort.each do |name, w|
353       Thread.new do
354         w.unmonitor if w.state != :unmonitored
355         w.action(:stop) if w.alive?
356       end
357     end
358     
359     10.times do
360       return true unless self.watches.map { |name, w| w.alive? }.any?
361       sleep 1
362     end
363     
364     return false
365   end
366   
367   # Force the termination of god.
368   #   * Clean up pid file if one exists
369   #   * Stop DRb service
370   #   * Hard exit using exit!
371   #
372   # Never returns because the process will no longer exist!
373   def self.terminate
374     FileUtils.rm_f(self.pid) if self.pid
375     self.server.stop if self.server
376     exit!(0)
377   end
378   
379   def self.status
380     info = {}
381     self.watches.map do |name, w|
382       info[name] = {:state => w.state}
383     end
384     info
385   end
386   
387   def self.running_log(watch_name, since)
388     unless self.watches[watch_name]
389       raise NoSuchWatchError.new
390     end
391     
392     LOG.watch_log_since(watch_name, since)
393   end
394   
395   def self.running_load(code, filename)
396     errors = ""
397     watches = []
398     
399     begin
400       LOG.start_capture
401       
402       eval(code, root_binding, filename)
403       self.pending_watches.each do |w|
404         if previous_state = self.pending_watch_states[w.name]
405           w.monitor unless previous_state == :unmonitored
406         else
407           w.monitor if w.autostart?
408         end
409       end
410       watches = self.pending_watches.dup
411       self.pending_watches.clear
412       self.pending_watch_states.clear
413     rescue Exception => e
414       # don't ever let running_load take down god
415       errors << LOG.finish_capture
416       
417       unless e.instance_of?(SystemExit)
418         errors << e.message << "\n"
419         errors << e.backtrace.join("\n")
420       end
421     end
422     
423     names = watches.map { |x| x.name }
424     [names, errors]
425   end
426   
427   def self.load(glob)
428     Dir[glob].each do |f|
429       Kernel.load f
430     end
431   end
432   
433   def self.setup
434     # Make pid directory
435     unless test(?d, self.pid_file_directory)
436       begin
437         FileUtils.mkdir_p(self.pid_file_directory)
438       rescue Errno::EACCES => e
439         abort "Failed to create pid file directory: #{e.message}"
440       end
441     end
442   end
443   
444   def self.validater
445     unless test(?w, self.pid_file_directory)
446       abort "The pid file directory (#{self.pid_file_directory}) is not writable by #{Etc.getlogin}"
447     end
448   end
449   
450   def self.start
451     self.internal_init
452     self.setup
453     self.validater
454     
455     # instantiate server
456     self.server = Socket.new(self.port)
457     
458     # start event handler system
459     EventHandler.start if EventHandler.loaded?
460     
461     # start the timer system
462     Timer.get
463     
464     # start monitoring any watches set to autostart
465     self.watches.values.each { |w| w.monitor if w.autostart? }
466     
467     # clear pending watches
468     self.pending_watches.clear
469     
470     # mark as running
471     self.running = true
472     
473     # join the timer thread so we don't exit
474     Timer.get.join
475   end
476   
477   def self.at_exit
478     self.start
479   end
482 at_exit do
483   God.at_exit if $run