2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
28 from multiprocessing
import active_children
29 from Queue
import Empty
31 from shinken
.satellite
import BaseSatellite
, IForArbiter
33 from shinken
.property import PathProp
, IntegerProp
34 from shinken
.util
import sort_by_ids
35 from shinken
.log
import logger
37 import shinken
.pyro_wrapper
as pyro
38 from shinken
.pyro_wrapper
import Pyro
40 from shinken
.external_command
import ExternalCommand
44 class Receiver(BaseSatellite
):
46 properties
= BaseSatellite
.properties
.copy()
48 'pidfile': PathProp(default
='/usr/local/shinken/var/receiverd.pid'),
49 'port': IntegerProp(default
='7773'),
50 'local_log': PathProp(default
='/usr/local/shinken/var/receiverd.log'),
54 def __init__(self
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
56 super(Receiver
, self
).__init
__('receiver', config_file
, is_daemon
, do_replace
, debug
, debug_file
)
61 # Our pollers and reactionners
63 self
.reactionners
= {}
65 # Modules are load one time
66 self
.have_modules
= False
68 # Can have a queue of external_commands give by modules
69 # will be taken by arbiter to process
70 self
.external_commands
= []
73 self
.broks
= [] # broks to manage
74 # broks raised this turn and that need to be put in self.broks
75 self
.broks_internal_raised
= []
78 # Schedulers have some queues. We can simplify call by adding
79 # elements into the proper queue just by looking at their type
81 # TODO : better tag ID?
82 # External commands -> self.external_commands
84 cls_type
= elt
.__class
__.my_type
85 if cls_type
== 'brok':
86 # For brok, we TAG brok with our instance_id
87 elt
.data
['instance_id'] = 0
88 self
.broks_internal_raised
.append(elt
)
90 elif cls_type
== 'externalcommand':
91 print "Adding in queue an external command", ExternalCommand
.__dict
__
92 self
.external_commands
.append(elt
)
95 # # Get teh good tabs for links by the kind. If unknown, return None
96 # def get_links_from_type(self, type):
97 # t = {'scheduler' : self.schedulers, 'arbiter' : self.arbiters, \
98 # 'poller' : self.pollers, 'reactionner' : self.reactionners}
104 # Call by arbiter to get our external commands
105 def get_external_commands(self
):
106 res
= self
.external_commands
107 self
.external_commands
= []
111 # Get a brok. Our role is to put it in the modules
112 # THEY MUST DO NOT CHANGE data of b !!!
113 # REF: doc/receiver-modules.png (4-5)
114 def manage_brok(self
, b
):
116 # Call all modules if they catch the call
117 for mod
in self
.modules_manager
.get_internal_instances():
120 except Exception , exp
:
122 logger
.log("[%s] Warning : The mod %s raise an exception: %s, I kill it" % (self
.name
, mod
.get_name(),str(exp
)))
123 logger
.log("[%s] Exception type : %s" % (self
.name
, type(exp
)))
124 logger
.log("Back trace of this kill: %s" % (traceback
.format_exc()))
126 # Now remove mod that raise an exception
127 self
.modules_manager
.clear_instances(to_del
)
130 # Get 'objects' from external modules
131 # from now nobody use it, but it can be useful
132 # for a moduel like livestatus to raise external
133 # commandsfor example
134 def get_objects_from_from_queues(self
):
135 for f
in self
.modules_manager
.get_external_from_queues():
139 o
= f
.get(block
=False)
145 # modules can have process, and they can die
146 def check_and_del_zombie_modules(self
):
147 # Active children make a join with every one, useful :)
148 act
= active_children()
149 self
.modules_manager
.check_alive_instances()
153 act
= active_children()
157 super(Receiver
, self
).do_stop()
160 def setup_new_conf(self
):
164 # Got our name from the globals
165 if 'receiver_name' in conf
['global']:
166 name
= conf
['global']['receiver_name']
168 name
= 'Unnamed receiver'
170 self
.log
.load_obj(self
, name
)
172 print "[%s] Sending us configuration %s" % (self
.name
, conf
)
174 if not self
.have_modules
:
175 self
.modules
= mods
= conf
['global']['modules']
176 self
.have_modules
= True
177 logger
.log("[%s] We received modules %s " % (self
.name
, mods
))
179 # Set our giving timezone from arbiter
180 use_timezone
= conf
['global']['use_timezone']
181 if use_timezone
!= 'NOTSET':
182 logger
.log("[%s] Setting our timezone to" % (self
.name
, use_timezone
))
183 os
.environ
['TZ'] = use_timezone
188 def do_loop_turn(self
):
190 # Begin to clean modules
191 self
.check_and_del_zombie_modules()
193 # Now we check if arbiter speek to us in the pyro_daemon.
194 # If so, we listen for it
195 # When it push us conf, we reinit connexions
196 self
.watch_for_new_conf(0.0)
198 self
.setup_new_conf()
200 # # Maybe the last loop we raised some broks internally
201 # # we should interger them in broks
202 # self.interger_internal_broks()
204 # # And from schedulers
205 # self.get_new_broks(type='scheduler')
206 # # And for other satellites
207 # self.get_new_broks(type='poller')
208 # self.get_new_broks(type='reactionner')
210 # # Sort the brok list by id
211 # self.broks.sort(sort_by_ids)
213 # # and for external queues
214 # # REF: doc/receiver-modules.png (3)
215 # for b in self.broks:
216 # # if b.type != 'log':
217 # # print "Receiver : put brok id : %d" % b.id
218 # for q in self.modules_manager.get_external_to_queues():
221 # # We must had new broks at the end of the list, so we reverse the list
222 # self.broks.reverse()
225 # while len(self.broks) != 0:
227 # # Do not 'manage' more than 1s, we must get new broks
229 # if now - start > 1:
232 # b = self.broks.pop()
233 # # Ok, we can get the brok, and doing something with it
234 # # REF: doc/receiver-modules.png (4-5)
235 # self.manage_brok(b)
237 # nb_broks = len(self.broks)
239 # # Ok we manage brok, but we still want to listen to arbiter
240 # self.watch_for_new_conf(0.0)
242 # # if we got new broks here from arbiter, we should breack the loop
243 # # because such broks will not be managed by the
244 # # external modules before this loop (we pop them!)
245 # if len(self.broks) != nb_broks:
248 # Maybe external modules raised 'objets'
250 self
.get_objects_from_from_queues()
252 # Maybe we do not have something to do, so we wait a little
253 if len(self
.broks
) == 0:
254 # print "watch new conf 1 : begin", len(self.broks)
255 self
.watch_for_new_conf(1.0)
256 # print "get enw broks watch new conf 1 : end", len(self.broks)
259 # Main function, will loop forever
262 self
.load_config_file()
264 for line
in self
.get_header():
267 logger
.log("[Receiver] Using working directory : %s" % os
.path
.abspath(self
.workdir
))
269 self
.do_daemon_init_and_start()
271 self
.uri2
= self
.pyro_daemon
.register(self
.interface
, "ForArbiter")
272 print "The Arbtier uri it at", self
.uri2
274 # We wait for initial conf
275 self
.wait_for_initial_conf()
276 if not self
.new_conf
:
279 self
.setup_new_conf()
281 self
.modules_manager
.set_modules(self
.modules
)
282 self
.do_load_modules()
284 # Do the modules part, we have our modules in self.modules
285 # REF: doc/receiver-modules.png (1)