Merge branch 'master' of ssh://lausser,shinken@shinken.git.sourceforge.net/gitroot...
[shinken.git] / shinken / daemons / receiverdaemon.py
blob38408dc35e5591905ad2002dbc56f0f2988ac2cc
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 import os
24 import sys
25 import time
26 import traceback
28 from multiprocessing import active_children
29 from Queue import Empty
31 from shinken.satellite import BaseSatellite, IForArbiter
33 from shinken.property import PathProp, IntegerProp
34 from shinken.util import sort_by_ids
35 from shinken.log import logger
37 import shinken.pyro_wrapper as pyro
38 from shinken.pyro_wrapper import Pyro
40 from shinken.external_command import ExternalCommand
43 # Our main APP class
44 class Receiver(BaseSatellite):
46 properties = BaseSatellite.properties.copy()
47 properties.update({
48 'pidfile': PathProp(default='/usr/local/shinken/var/receiverd.pid'),
49 'port': IntegerProp(default='7773'),
50 'local_log': PathProp(default='/usr/local/shinken/var/receiverd.log'),
54 def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
56 super(Receiver, self).__init__('receiver', config_file, is_daemon, do_replace, debug, debug_file)
58 # Our arbiters
59 self.arbiters = {}
61 # Our pollers and reactionners
62 self.pollers = {}
63 self.reactionners = {}
65 # Modules are load one time
66 self.have_modules = False
68 # Can have a queue of external_commands give by modules
69 # will be taken by arbiter to process
70 self.external_commands = []
72 # All broks to manage
73 self.broks = [] # broks to manage
74 # broks raised this turn and that need to be put in self.broks
75 self.broks_internal_raised = []
78 # Schedulers have some queues. We can simplify call by adding
79 # elements into the proper queue just by looking at their type
80 # Brok -> self.broks
81 # TODO : better tag ID?
82 # External commands -> self.external_commands
83 def add(self, elt):
84 cls_type = elt.__class__.my_type
85 if cls_type == 'brok':
86 # For brok, we TAG brok with our instance_id
87 elt.data['instance_id'] = 0
88 self.broks_internal_raised.append(elt)
89 return
90 elif cls_type == 'externalcommand':
91 print "Adding in queue an external command", ExternalCommand.__dict__
92 self.external_commands.append(elt)
95 # # Get teh good tabs for links by the kind. If unknown, return None
96 # def get_links_from_type(self, type):
97 # t = {'scheduler' : self.schedulers, 'arbiter' : self.arbiters, \
98 # 'poller' : self.pollers, 'reactionner' : self.reactionners}
99 # if type in t :
100 # return t[type]
101 # return None
104 # Call by arbiter to get our external commands
105 def get_external_commands(self):
106 res = self.external_commands
107 self.external_commands = []
108 return res
111 # Get a brok. Our role is to put it in the modules
112 # THEY MUST DO NOT CHANGE data of b !!!
113 # REF: doc/receiver-modules.png (4-5)
114 def manage_brok(self, b):
115 to_del = []
116 # Call all modules if they catch the call
117 for mod in self.modules_manager.get_internal_instances():
118 try:
119 mod.manage_brok(b)
120 except Exception , exp:
121 print exp.__dict__
122 logger.log("[%s] Warning : The mod %s raise an exception: %s, I kill it" % (self.name, mod.get_name(),str(exp)))
123 logger.log("[%s] Exception type : %s" % (self.name, type(exp)))
124 logger.log("Back trace of this kill: %s" % (traceback.format_exc()))
125 to_del.append(mod)
126 # Now remove mod that raise an exception
127 self.modules_manager.clear_instances(to_del)
130 # Get 'objects' from external modules
131 # from now nobody use it, but it can be useful
132 # for a moduel like livestatus to raise external
133 # commandsfor example
134 def get_objects_from_from_queues(self):
135 for f in self.modules_manager.get_external_from_queues():
136 full_queue = True
137 while full_queue:
138 try:
139 o = f.get(block=False)
140 self.add(o)
141 except Empty :
142 full_queue = False
145 # modules can have process, and they can die
146 def check_and_del_zombie_modules(self):
147 # Active children make a join with every one, useful :)
148 act = active_children()
149 self.modules_manager.check_alive_instances()
152 def do_stop(self):
153 act = active_children()
154 for a in act:
155 a.terminate()
156 a.join(1)
157 super(Receiver, self).do_stop()
160 def setup_new_conf(self):
161 conf = self.new_conf
162 self.new_conf = None
163 self.cur_conf = conf
164 # Got our name from the globals
165 if 'receiver_name' in conf['global']:
166 name = conf['global']['receiver_name']
167 else:
168 name = 'Unnamed receiver'
169 self.name = name
170 self.log.load_obj(self, name)
172 print "[%s] Sending us configuration %s" % (self.name, conf)
174 if not self.have_modules:
175 self.modules = mods = conf['global']['modules']
176 self.have_modules = True
177 logger.log("[%s] We received modules %s " % (self.name, mods))
179 # Set our giving timezone from arbiter
180 use_timezone = conf['global']['use_timezone']
181 if use_timezone != 'NOTSET':
182 logger.log("[%s] Setting our timezone to" % (self.name, use_timezone))
183 os.environ['TZ'] = use_timezone
184 time.tzset()
188 def do_loop_turn(self):
189 print "."
190 # Begin to clean modules
191 self.check_and_del_zombie_modules()
193 # Now we check if arbiter speek to us in the pyro_daemon.
194 # If so, we listen for it
195 # When it push us conf, we reinit connexions
196 self.watch_for_new_conf(0.0)
197 if self.new_conf:
198 self.setup_new_conf()
200 # # Maybe the last loop we raised some broks internally
201 # # we should interger them in broks
202 # self.interger_internal_broks()
204 # # And from schedulers
205 # self.get_new_broks(type='scheduler')
206 # # And for other satellites
207 # self.get_new_broks(type='poller')
208 # self.get_new_broks(type='reactionner')
210 # # Sort the brok list by id
211 # self.broks.sort(sort_by_ids)
213 # # and for external queues
214 # # REF: doc/receiver-modules.png (3)
215 # for b in self.broks:
216 # # if b.type != 'log':
217 # # print "Receiver : put brok id : %d" % b.id
218 # for q in self.modules_manager.get_external_to_queues():
219 # q.put(b)
221 # # We must had new broks at the end of the list, so we reverse the list
222 # self.broks.reverse()
224 start = time.time()
225 # while len(self.broks) != 0:
226 # now = time.time()
227 # # Do not 'manage' more than 1s, we must get new broks
228 # # every 1s
229 # if now - start > 1:
230 # break
232 # b = self.broks.pop()
233 # # Ok, we can get the brok, and doing something with it
234 # # REF: doc/receiver-modules.png (4-5)
235 # self.manage_brok(b)
237 # nb_broks = len(self.broks)
239 # # Ok we manage brok, but we still want to listen to arbiter
240 # self.watch_for_new_conf(0.0)
242 # # if we got new broks here from arbiter, we should breack the loop
243 # # because such broks will not be managed by the
244 # # external modules before this loop (we pop them!)
245 # if len(self.broks) != nb_broks:
246 # break
248 # Maybe external modules raised 'objets'
249 # we should get them
250 self.get_objects_from_from_queues()
252 # Maybe we do not have something to do, so we wait a little
253 if len(self.broks) == 0:
254 # print "watch new conf 1 : begin", len(self.broks)
255 self.watch_for_new_conf(1.0)
256 # print "get enw broks watch new conf 1 : end", len(self.broks)
259 # Main function, will loop forever
260 def main(self):
262 self.load_config_file()
264 for line in self.get_header():
265 self.log.log(line)
267 logger.log("[Receiver] Using working directory : %s" % os.path.abspath(self.workdir))
269 self.do_daemon_init_and_start()
271 self.uri2 = self.pyro_daemon.register(self.interface, "ForArbiter")
272 print "The Arbtier uri it at", self.uri2
274 # We wait for initial conf
275 self.wait_for_initial_conf()
276 if not self.new_conf:
277 return
279 self.setup_new_conf()
281 self.modules_manager.set_modules(self.modules)
282 self.do_load_modules()
284 # Do the modules part, we have our modules in self.modules
285 # REF: doc/receiver-modules.png (1)
288 # Now the main loop
289 self.do_mainloop()