From 5a458470cf8fbbb2329db163325e365b72e4f43d Mon Sep 17 00:00:00 2001 From: tom Date: Fri, 21 Dec 2007 15:10:30 -0800 Subject: [PATCH] tighten up Hub and add comments --- lib/god/conditions/process_exits.rb | 3 +- lib/god/hub.rb | 243 ++++++++++++++++++++++-------------- 2 files changed, 152 insertions(+), 94 deletions(-) diff --git a/lib/god/conditions/process_exits.rb b/lib/god/conditions/process_exits.rb index de3a265..be955e2 100644 --- a/lib/god/conditions/process_exits.rb +++ b/lib/god/conditions/process_exits.rb @@ -38,7 +38,8 @@ module God begin EventHandler.register(pid, :proc_exit) do |extra| - self.info = "process #{pid} exited #{extra.inspect}" + formatted_extra = extra.size > 0 ? " #{extra.inspect}" : "" + self.info = "process #{pid} exited#{formatted_extra}" Hub.trigger(self) end diff --git a/lib/god/hub.rb b/lib/god/hub.rb index 6b8eb7a..13ec325 100644 --- a/lib/god/hub.rb +++ b/lib/god/hub.rb @@ -5,93 +5,123 @@ module God # directory to hold conditions and their corresponding metric # {condition => metric} attr_accessor :directory + + # mutex to keep the directory consistent + attr_accessor :mutex end self.directory = {} + self.mutex = Monitor.new + # Attach the condition to the hub and schedule/register it + # +condition+ is the Condition to attach + # +metric+ is the Metric to which the condition belongs + # + # Returns nothing def self.attach(condition, metric) - self.directory[condition] = metric - condition.reset - - case condition - when PollCondition - Timer.get.schedule(condition, 0) - when EventCondition, TriggerCondition - condition.register + self.mutex.synchronize do + self.directory[condition] = metric + condition.reset + + case condition + when PollCondition + Timer.get.schedule(condition, 0) + when EventCondition, TriggerCondition + condition.register + end end end + # Detach the condition from the hub and unschedule/deregister it + # +condition+ is the Condition to detach + # + # Returns nothing def self.detach(condition) - self.directory.delete(condition) - - case condition - when PollCondition - Timer.get.unschedule(condition) - when EventCondition, TriggerCondition - condition.deregister + self.mutex.synchronize do + self.directory.delete(condition) + + case condition + when PollCondition + Timer.get.unschedule(condition) + when EventCondition, TriggerCondition + condition.deregister + end end end + # Trigger evaluation of the condition + # +condition+ is the Condition to evaluate + # + # Returns nothing def self.trigger(condition) - case condition - when PollCondition - self.handle_poll(condition) - when EventCondition, TriggerCondition - self.handle_event(condition) + self.mutex.synchronize do + case condition + when PollCondition + self.handle_poll(condition) + when EventCondition, TriggerCondition + self.handle_event(condition) + end end end + # private + + # Asynchronously evaluate and handle the given poll condition. Handles logging + # notifications, and moving to the new state if necessary + # +condition+ is the Condition to handle + # + # Returns nothing def self.handle_poll(condition) + metric = self.directory[condition] + + # it's possible that the timer will trigger an event before it can be cleared + # by an exiting metric, in which case it should be ignored + return if metric.nil? + Thread.new do begin - metric = self.directory[condition] + watch = metric.watch - # it's possible that the timer will trigger an event before it can be cleared - # by an exiting metric, in which case it should be ignored - unless metric.nil? - watch = metric.watch + watch.mutex.synchronize do + # run the test + result = condition.test - watch.mutex.synchronize do - # run the test - result = condition.test - - # log - messages = self.log(watch, metric, condition, result) - - # notify - if condition.notify && self.trigger?(metric, result) - self.notify(condition, messages.last) - end - - # after-condition - condition.after - - # get the destination - dest = - if result && condition.transition - # condition override - condition.transition - else - # regular - metric.destination && metric.destination[result] - end - - # transition or reschedule - if dest - # transition - begin - watch.move(dest) - rescue EventRegistrationFailedError - msg = watch.name + ' Event registration failed, moving back to previous state' - applog(watch, :info, msg) - - dest = watch.state - retry - end - else - # reschedule - Timer.get.schedule(condition) + # log + messages = self.log(watch, metric, condition, result) + + # notify + if condition.notify && self.trigger?(metric, result) + self.notify(condition, messages.last) + end + + # after-condition + condition.after + + # get the destination + dest = + if result && condition.transition + # condition override + condition.transition + else + # regular + metric.destination && metric.destination[result] + end + + # transition or reschedule + if dest + # transition + begin + watch.move(dest) + rescue EventRegistrationFailedError + msg = watch.name + ' Event registration failed, moving back to previous state' + applog(watch, :info, msg) + + dest = watch.state + retry end + else + # reschedule + Timer.get.schedule(condition) end end rescue Exception => e @@ -102,36 +132,43 @@ module God end end + # Asynchronously evaluate and handle the given event condition. Handles logging + # notifications, and moving to the new state if necessary + # +condition+ is the Condition to handle + # + # Returns nothing def self.handle_event(condition) + metric = self.directory[condition] + + # it's possible that the timer will trigger an event before it can be cleared + # by an exiting metric, in which case it should be ignored + return if metric.nil? + Thread.new do begin - metric = self.directory[condition] + watch = metric.watch - unless metric.nil? - watch = metric.watch + watch.mutex.synchronize do + # log + messages = self.log(watch, metric, condition, true) - watch.mutex.synchronize do - # log - messages = self.log(watch, metric, condition, true) - - # notify - if condition.notify && self.trigger?(metric, true) - self.notify(condition, messages.last) - end - - # get the destination - dest = - if condition.transition - # condition override - condition.transition - else - # regular - metric.destination && metric.destination[true] - end - - if dest - watch.move(dest) - end + # notify + if condition.notify && self.trigger?(metric, true) + self.notify(condition, messages.last) + end + + # get the destination + dest = + if condition.transition + # condition override + condition.transition + else + # regular + metric.destination && metric.destination[true] + end + + if dest + watch.move(dest) end end rescue Exception => e @@ -142,12 +179,22 @@ module God end end - # helpers - + # Determine whether a trigger happened + # +metric+ is the Metric + # +result+ is the result from the condition's test + # + # Returns Boolean def self.trigger?(metric, result) - (metric.destination && metric.destination.keys.size == 2) || result == true + metric.destination && metric.destination[result] end + # Log info about the condition and return the list of messages logged + # +watch+ is the Watch + # +metric+ is the Metric + # +condition+ is the Condition + # +result+ is the Boolean result of the condition test evaluation + # + # Returns String[] def self.log(watch, metric, condition, result) status = if self.trigger?(metric, result) @@ -176,6 +223,11 @@ module God messages end + # Format the destination specification for use in debug logging + # +metric+ is the Metric + # +condition+ is the Condition + # + # Returns String def self.dest_desc(metric, condition) if condition.transition {true => condition.transition}.inspect @@ -188,6 +240,11 @@ module God end end + # Notify all recipeients of the given condition with the specified message + # +condition+ is the Condition + # +message+ is the String message to send + # + # Returns nothing def self.notify(condition, message) spec = Contact.normalize(condition.notify) unmatched = [] -- 2.11.4.GIT