Exceptions raised during renaming in rotating file handlers are now passed to handleE...
[python.git] / Lib / test / test_logging.py
blob62c0b08a47c17e5e97d0148387c5b922c6bee7f1
1 #!/usr/bin/env python
3 # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
5 # Permission to use, copy, modify, and distribute this software and its
6 # documentation for any purpose and without fee is hereby granted,
7 # provided that the above copyright notice appear in all copies and that
8 # both that copyright notice and this permission notice appear in
9 # supporting documentation, and that the name of Vinay Sajip
10 # not be used in advertising or publicity pertaining to distribution
11 # of the software without specific, written prior permission.
12 # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
13 # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
14 # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
15 # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
16 # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
17 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19 # This file is part of the Python logging distribution. See
20 # http://www.red-dove.com/python_logging.html
22 """Test harness for the logging module. Run all tests.
24 Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
25 """
27 import select
28 import os, sys, string, struct, types, cPickle, cStringIO
29 import socket, threading, time
30 import logging, logging.handlers, logging.config
32 BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
34 FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
36 #----------------------------------------------------------------------------
37 # Log receiver
38 #----------------------------------------------------------------------------
40 TIMEOUT = 10
42 from SocketServer import ThreadingTCPServer, StreamRequestHandler
44 class LogRecordStreamHandler(StreamRequestHandler):
45 """
46 Handler for a streaming logging request. It basically logs the record
47 using whatever logging policy is configured locally.
48 """
50 def handle(self):
51 """
52 Handle multiple requests - each expected to be a 4-byte length,
53 followed by the LogRecord in pickle format. Logs the record
54 according to whatever policy is configured locally.
55 """
56 while 1:
57 try:
58 chunk = self.connection.recv(4)
59 if len(chunk) < 4:
60 break
61 slen = struct.unpack(">L", chunk)[0]
62 chunk = self.connection.recv(slen)
63 while len(chunk) < slen:
64 chunk = chunk + self.connection.recv(slen - len(chunk))
65 obj = self.unPickle(chunk)
66 record = logging.makeLogRecord(obj)
67 self.handleLogRecord(record)
68 except:
69 raise
71 def unPickle(self, data):
72 return cPickle.loads(data)
74 def handleLogRecord(self, record):
75 logname = "logrecv.tcp." + record.name
76 #If the end-of-messages sentinel is seen, tell the server to terminate
77 if record.msg == FINISH_UP:
78 self.server.abort = 1
79 record.msg = record.msg + " (via " + logname + ")"
80 logger = logging.getLogger(logname)
81 logger.handle(record)
83 # The server sets socketDataProcessed when it's done.
84 socketDataProcessed = threading.Event()
86 class LogRecordSocketReceiver(ThreadingTCPServer):
87 """
88 A simple-minded TCP socket-based logging receiver suitable for test
89 purposes.
90 """
92 allow_reuse_address = 1
94 def __init__(self, host='localhost',
95 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
96 handler=LogRecordStreamHandler):
97 ThreadingTCPServer.__init__(self, (host, port), handler)
98 self.abort = 0
99 self.timeout = 1
101 def serve_until_stopped(self):
102 abort = 0
103 while not abort:
104 rd, wr, ex = select.select([self.socket.fileno()],
105 [], [],
106 self.timeout)
107 if rd:
108 self.handle_request()
109 abort = self.abort
110 #notify the main thread that we're about to exit
111 socketDataProcessed.set()
113 def process_request(self, request, client_address):
114 #import threading
115 t = threading.Thread(target = self.finish_request,
116 args = (request, client_address))
117 t.start()
119 def runTCP(tcpserver):
120 tcpserver.serve_until_stopped()
122 #----------------------------------------------------------------------------
123 # Test 0
124 #----------------------------------------------------------------------------
126 msgcount = 0
128 def nextmessage():
129 global msgcount
130 rv = "Message %d" % msgcount
131 msgcount = msgcount + 1
132 return rv
134 def test0():
135 ERR = logging.getLogger("ERR")
136 ERR.setLevel(logging.ERROR)
137 INF = logging.getLogger("INF")
138 INF.setLevel(logging.INFO)
139 INF_ERR = logging.getLogger("INF.ERR")
140 INF_ERR.setLevel(logging.ERROR)
141 DEB = logging.getLogger("DEB")
142 DEB.setLevel(logging.DEBUG)
144 INF_UNDEF = logging.getLogger("INF.UNDEF")
145 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
146 UNDEF = logging.getLogger("UNDEF")
148 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
149 CHILD = logging.getLogger("INF.BADPARENT")
151 #These should log
152 ERR.log(logging.FATAL, nextmessage())
153 ERR.error(nextmessage())
155 INF.log(logging.FATAL, nextmessage())
156 INF.error(nextmessage())
157 INF.warn(nextmessage())
158 INF.info(nextmessage())
160 INF_UNDEF.log(logging.FATAL, nextmessage())
161 INF_UNDEF.error(nextmessage())
162 INF_UNDEF.warn (nextmessage())
163 INF_UNDEF.info (nextmessage())
165 INF_ERR.log(logging.FATAL, nextmessage())
166 INF_ERR.error(nextmessage())
168 INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
169 INF_ERR_UNDEF.error(nextmessage())
171 DEB.log(logging.FATAL, nextmessage())
172 DEB.error(nextmessage())
173 DEB.warn (nextmessage())
174 DEB.info (nextmessage())
175 DEB.debug(nextmessage())
177 UNDEF.log(logging.FATAL, nextmessage())
178 UNDEF.error(nextmessage())
179 UNDEF.warn (nextmessage())
180 UNDEF.info (nextmessage())
182 GRANDCHILD.log(logging.FATAL, nextmessage())
183 CHILD.log(logging.FATAL, nextmessage())
185 #These should not log
186 ERR.warn(nextmessage())
187 ERR.info(nextmessage())
188 ERR.debug(nextmessage())
190 INF.debug(nextmessage())
191 INF_UNDEF.debug(nextmessage())
193 INF_ERR.warn(nextmessage())
194 INF_ERR.info(nextmessage())
195 INF_ERR.debug(nextmessage())
196 INF_ERR_UNDEF.warn(nextmessage())
197 INF_ERR_UNDEF.info(nextmessage())
198 INF_ERR_UNDEF.debug(nextmessage())
200 INF.info(FINISH_UP)
202 #----------------------------------------------------------------------------
203 # Test 1
204 #----------------------------------------------------------------------------
207 # First, we define our levels. There can be as many as you want - the only
208 # limitations are that they should be integers, the lowest should be > 0 and
209 # larger values mean less information being logged. If you need specific
210 # level values which do not fit into these limitations, you can use a
211 # mapping dictionary to convert between your application levels and the
212 # logging system.
214 SILENT = 10
215 TACITURN = 9
216 TERSE = 8
217 EFFUSIVE = 7
218 SOCIABLE = 6
219 VERBOSE = 5
220 TALKATIVE = 4
221 GARRULOUS = 3
222 CHATTERBOX = 2
223 BORING = 1
225 LEVEL_RANGE = range(BORING, SILENT + 1)
228 # Next, we define names for our levels. You don't need to do this - in which
229 # case the system will use "Level n" to denote the text for the level.
231 my_logging_levels = {
232 SILENT : 'Silent',
233 TACITURN : 'Taciturn',
234 TERSE : 'Terse',
235 EFFUSIVE : 'Effusive',
236 SOCIABLE : 'Sociable',
237 VERBOSE : 'Verbose',
238 TALKATIVE : 'Talkative',
239 GARRULOUS : 'Garrulous',
240 CHATTERBOX : 'Chatterbox',
241 BORING : 'Boring',
245 # Now, to demonstrate filtering: suppose for some perverse reason we only
246 # want to print out all except GARRULOUS messages. Let's create a filter for
247 # this purpose...
249 class SpecificLevelFilter(logging.Filter):
250 def __init__(self, lvl):
251 self.level = lvl
253 def filter(self, record):
254 return self.level != record.levelno
256 class GarrulousFilter(SpecificLevelFilter):
257 def __init__(self):
258 SpecificLevelFilter.__init__(self, GARRULOUS)
261 # Now, let's demonstrate filtering at the logger. This time, use a filter
262 # which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events
263 # are still excluded.
265 class VerySpecificFilter(logging.Filter):
266 def filter(self, record):
267 return record.levelno not in [SOCIABLE, TACITURN]
269 def message(s):
270 sys.stdout.write("%s\n" % s)
272 SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
274 def test1():
276 # Now, tell the logging system to associate names with our levels.
278 for lvl in my_logging_levels.keys():
279 logging.addLevelName(lvl, my_logging_levels[lvl])
282 # Now, define a test function which logs an event at each of our levels.
285 def doLog(log):
286 for lvl in LEVEL_RANGE:
287 log.log(lvl, SHOULD1, logging.getLevelName(lvl))
289 log = logging.getLogger("")
290 hdlr = log.handlers[0]
292 # Set the logging level to each different value and call the utility
293 # function to log events.
294 # In the output, you should see that each time round the loop, the number of
295 # logging events which are actually output decreases.
297 for lvl in LEVEL_RANGE:
298 message("-- setting logging level to '%s' -----" %
299 logging.getLevelName(lvl))
300 log.setLevel(lvl)
301 doLog(log)
303 # Now, we demonstrate level filtering at the handler level. Tell the
304 # handler defined above to filter at level 'SOCIABLE', and repeat the
305 # above loop. Compare the output from the two runs.
307 hdlr.setLevel(SOCIABLE)
308 message("-- Filtering at handler level to SOCIABLE --")
309 for lvl in LEVEL_RANGE:
310 message("-- setting logging level to '%s' -----" %
311 logging.getLevelName(lvl))
312 log.setLevel(lvl)
313 doLog(log)
315 hdlr.setLevel(0) #turn off level filtering at the handler
317 garr = GarrulousFilter()
318 hdlr.addFilter(garr)
319 message("-- Filtering using GARRULOUS filter --")
320 for lvl in LEVEL_RANGE:
321 message("-- setting logging level to '%s' -----" %
322 logging.getLevelName(lvl))
323 log.setLevel(lvl)
324 doLog(log)
325 spec = VerySpecificFilter()
326 log.addFilter(spec)
327 message("-- Filtering using specific filter for SOCIABLE, TACITURN --")
328 for lvl in LEVEL_RANGE:
329 message("-- setting logging level to '%s' -----" %
330 logging.getLevelName(lvl))
331 log.setLevel(lvl)
332 doLog(log)
334 log.removeFilter(spec)
335 hdlr.removeFilter(garr)
336 #Undo the one level which clashes...for regression tests
337 logging.addLevelName(logging.DEBUG, "DEBUG")
339 #----------------------------------------------------------------------------
340 # Test 2
341 #----------------------------------------------------------------------------
343 MSG = "-- logging %d at INFO, messages should be seen every 10 events --"
344 def test2():
345 logger = logging.getLogger("")
346 sh = logger.handlers[0]
347 sh.close()
348 logger.removeHandler(sh)
349 mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh)
350 logger.setLevel(logging.DEBUG)
351 logger.addHandler(mh)
352 message("-- logging at DEBUG, nothing should be seen yet --")
353 logger.debug("Debug message")
354 message("-- logging at INFO, nothing should be seen yet --")
355 logger.info("Info message")
356 message("-- logging at WARNING, 3 messages should be seen --")
357 logger.warn("Warn message")
358 for i in xrange(102):
359 message(MSG % i)
360 logger.info("Info index = %d", i)
361 mh.close()
362 logger.removeHandler(mh)
363 logger.addHandler(sh)
365 #----------------------------------------------------------------------------
366 # Test 3
367 #----------------------------------------------------------------------------
369 FILTER = "a.b"
371 def doLog3():
372 logging.getLogger("a").info("Info 1")
373 logging.getLogger("a.b").info("Info 2")
374 logging.getLogger("a.c").info("Info 3")
375 logging.getLogger("a.b.c").info("Info 4")
376 logging.getLogger("a.b.c.d").info("Info 5")
377 logging.getLogger("a.bb.c").info("Info 6")
378 logging.getLogger("b").info("Info 7")
379 logging.getLogger("b.a").info("Info 8")
380 logging.getLogger("c.a.b").info("Info 9")
381 logging.getLogger("a.bb").info("Info 10")
383 def test3():
384 root = logging.getLogger()
385 root.setLevel(logging.DEBUG)
386 hand = root.handlers[0]
387 message("Unfiltered...")
388 doLog3()
389 message("Filtered with '%s'..." % FILTER)
390 filt = logging.Filter(FILTER)
391 hand.addFilter(filt)
392 doLog3()
393 hand.removeFilter(filt)
395 #----------------------------------------------------------------------------
396 # Test Harness
397 #----------------------------------------------------------------------------
398 def banner(nm, typ):
399 sep = BANNER % (nm, typ)
400 sys.stdout.write(sep)
401 sys.stdout.flush()
403 def test_main_inner():
404 rootLogger = logging.getLogger("")
405 rootLogger.setLevel(logging.DEBUG)
406 hdlr = logging.StreamHandler(sys.stdout)
407 fmt = logging.Formatter(logging.BASIC_FORMAT)
408 hdlr.setFormatter(fmt)
409 rootLogger.addHandler(hdlr)
411 #Set up a handler such that all events are sent via a socket to the log
412 #receiver (logrecv).
413 #The handler will only be added to the rootLogger for some of the tests
414 shdlr = logging.handlers.SocketHandler('localhost',
415 logging.handlers.DEFAULT_TCP_LOGGING_PORT)
417 #Configure the logger for logrecv so events do not propagate beyond it.
418 #The sockLogger output is buffered in memory until the end of the test,
419 #and printed at the end.
420 sockOut = cStringIO.StringIO()
421 sockLogger = logging.getLogger("logrecv")
422 sockLogger.setLevel(logging.DEBUG)
423 sockhdlr = logging.StreamHandler(sockOut)
424 sockhdlr.setFormatter(logging.Formatter(
425 "%(name)s -> %(levelname)s: %(message)s"))
426 sockLogger.addHandler(sockhdlr)
427 sockLogger.propagate = 0
429 #Set up servers
430 threads = []
431 tcpserver = LogRecordSocketReceiver()
432 #sys.stdout.write("About to start TCP server...\n")
433 threads.append(threading.Thread(target=runTCP, args=(tcpserver,)))
435 for thread in threads:
436 thread.start()
437 try:
438 banner("log_test0", "begin")
440 rootLogger.addHandler(shdlr)
441 test0()
442 shdlr.close()
443 rootLogger.removeHandler(shdlr)
445 banner("log_test0", "end")
447 banner("log_test1", "begin")
448 test1()
449 banner("log_test1", "end")
451 banner("log_test2", "begin")
452 test2()
453 banner("log_test2", "end")
455 banner("log_test3", "begin")
456 test3()
457 banner("log_test3", "end")
459 finally:
460 #wait for TCP receiver to terminate
461 socketDataProcessed.wait()
462 for thread in threads:
463 thread.join()
464 banner("logrecv output", "begin")
465 sys.stdout.write(sockOut.getvalue())
466 sockOut.close()
467 sockLogger.removeHandler(sockhdlr)
468 sockhdlr.close()
469 banner("logrecv output", "end")
470 sys.stdout.flush()
471 try:
472 hdlr.close()
473 except:
474 pass
475 rootLogger.removeHandler(hdlr)
477 def test_main():
478 import locale
479 # Set the locale to the platform-dependent default. I have no idea
480 # why the test does this, but in any case we save the current locale
481 # first so we can restore it at the end.
482 try:
483 original_locale = locale.setlocale(locale.LC_ALL)
484 locale.setlocale(locale.LC_ALL, '')
485 except (ValueError, locale.Error):
486 # this happens on a Solaris box which only supports "C" locale
487 # or a Mac OS X box which supports very little locale stuff at all
488 original_locale = None
490 # Save and restore the original root logger level across the tests.
491 # Otherwise, e.g., if any test using cookielib runs after test_logging,
492 # cookielib's debug-level logger tries to log messages, leading to
493 # confusing:
494 # No handlers could be found for logger "cookielib"
495 # output while the tests are running.
496 root_logger = logging.getLogger("")
497 original_logging_level = root_logger.getEffectiveLevel()
498 try:
499 test_main_inner()
500 finally:
501 if original_locale is not None:
502 locale.setlocale(locale.LC_ALL, original_locale)
503 root_logger.setLevel(original_logging_level)
505 if __name__ == "__main__":
506 sys.stdout.write("test_logging\n")
507 test_main()