Add better error reporting for MemoryErrors caused by str->float conversions.
[python.git] / Lib / test / test_logging.py
blobe06a97d0deff8a9a147cf82cde28498e9c2a1070
1 #!/usr/bin/env python
3 # Copyright 2001-2010 by Vinay Sajip. All Rights Reserved.
5 # Permission to use, copy, modify, and distribute this software and its
6 # documentation for any purpose and without fee is hereby granted,
7 # provided that the above copyright notice appear in all copies and that
8 # both that copyright notice and this permission notice appear in
9 # supporting documentation, and that the name of Vinay Sajip
10 # not be used in advertising or publicity pertaining to distribution
11 # of the software without specific, written prior permission.
12 # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
13 # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
14 # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
15 # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
16 # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
17 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19 """Test harness for the logging module. Run all tests.
21 Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
22 """
24 import logging
25 import logging.handlers
26 import logging.config
28 import codecs
29 import copy
30 import cPickle
31 import cStringIO
32 import gc
33 import os
34 import re
35 import select
36 import socket
37 from SocketServer import ThreadingTCPServer, StreamRequestHandler
38 import string
39 import struct
40 import sys
41 import tempfile
42 from test.test_support import captured_stdout, run_with_locale, run_unittest
43 import textwrap
44 import threading
45 import time
46 import types
47 import unittest
48 import warnings
49 import weakref
52 class BaseTest(unittest.TestCase):
54 """Base class for logging tests."""
56 log_format = "%(name)s -> %(levelname)s: %(message)s"
57 expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
58 message_num = 0
60 def setUp(self):
61 """Setup the default logging stream to an internal StringIO instance,
62 so that we can examine log output as we want."""
63 logger_dict = logging.getLogger().manager.loggerDict
64 logging._acquireLock()
65 try:
66 self.saved_handlers = logging._handlers.copy()
67 self.saved_handler_list = logging._handlerList[:]
68 self.saved_loggers = logger_dict.copy()
69 self.saved_level_names = logging._levelNames.copy()
70 finally:
71 logging._releaseLock()
73 self.root_logger = logging.getLogger("")
74 self.original_logging_level = self.root_logger.getEffectiveLevel()
76 self.stream = cStringIO.StringIO()
77 self.root_logger.setLevel(logging.DEBUG)
78 self.root_hdlr = logging.StreamHandler(self.stream)
79 self.root_formatter = logging.Formatter(self.log_format)
80 self.root_hdlr.setFormatter(self.root_formatter)
81 self.root_logger.addHandler(self.root_hdlr)
83 def tearDown(self):
84 """Remove our logging stream, and restore the original logging
85 level."""
86 self.stream.close()
87 self.root_logger.removeHandler(self.root_hdlr)
88 self.root_logger.setLevel(self.original_logging_level)
89 logging._acquireLock()
90 try:
91 logging._levelNames.clear()
92 logging._levelNames.update(self.saved_level_names)
93 logging._handlers.clear()
94 logging._handlers.update(self.saved_handlers)
95 logging._handlerList[:] = self.saved_handler_list
96 loggerDict = logging.getLogger().manager.loggerDict
97 loggerDict.clear()
98 loggerDict.update(self.saved_loggers)
99 finally:
100 logging._releaseLock()
102 def assert_log_lines(self, expected_values, stream=None):
103 """Match the collected log lines against the regular expression
104 self.expected_log_pat, and compare the extracted group values to
105 the expected_values list of tuples."""
106 stream = stream or self.stream
107 pat = re.compile(self.expected_log_pat)
108 try:
109 stream.reset()
110 actual_lines = stream.readlines()
111 except AttributeError:
112 # StringIO.StringIO lacks a reset() method.
113 actual_lines = stream.getvalue().splitlines()
114 self.assertEquals(len(actual_lines), len(expected_values))
115 for actual, expected in zip(actual_lines, expected_values):
116 match = pat.search(actual)
117 if not match:
118 self.fail("Log line does not match expected pattern:\n" +
119 actual)
120 self.assertEquals(tuple(match.groups()), expected)
121 s = stream.read()
122 if s:
123 self.fail("Remaining output at end of log stream:\n" + s)
125 def next_message(self):
126 """Generate a message consisting solely of an auto-incrementing
127 integer."""
128 self.message_num += 1
129 return "%d" % self.message_num
132 class BuiltinLevelsTest(BaseTest):
133 """Test builtin levels and their inheritance."""
135 def test_flat(self):
136 #Logging levels in a flat logger namespace.
137 m = self.next_message
139 ERR = logging.getLogger("ERR")
140 ERR.setLevel(logging.ERROR)
141 INF = logging.getLogger("INF")
142 INF.setLevel(logging.INFO)
143 DEB = logging.getLogger("DEB")
144 DEB.setLevel(logging.DEBUG)
146 # These should log.
147 ERR.log(logging.CRITICAL, m())
148 ERR.error(m())
150 INF.log(logging.CRITICAL, m())
151 INF.error(m())
152 INF.warn(m())
153 INF.info(m())
155 DEB.log(logging.CRITICAL, m())
156 DEB.error(m())
157 DEB.warn (m())
158 DEB.info (m())
159 DEB.debug(m())
161 # These should not log.
162 ERR.warn(m())
163 ERR.info(m())
164 ERR.debug(m())
166 INF.debug(m())
168 self.assert_log_lines([
169 ('ERR', 'CRITICAL', '1'),
170 ('ERR', 'ERROR', '2'),
171 ('INF', 'CRITICAL', '3'),
172 ('INF', 'ERROR', '4'),
173 ('INF', 'WARNING', '5'),
174 ('INF', 'INFO', '6'),
175 ('DEB', 'CRITICAL', '7'),
176 ('DEB', 'ERROR', '8'),
177 ('DEB', 'WARNING', '9'),
178 ('DEB', 'INFO', '10'),
179 ('DEB', 'DEBUG', '11'),
182 def test_nested_explicit(self):
183 # Logging levels in a nested namespace, all explicitly set.
184 m = self.next_message
186 INF = logging.getLogger("INF")
187 INF.setLevel(logging.INFO)
188 INF_ERR = logging.getLogger("INF.ERR")
189 INF_ERR.setLevel(logging.ERROR)
191 # These should log.
192 INF_ERR.log(logging.CRITICAL, m())
193 INF_ERR.error(m())
195 # These should not log.
196 INF_ERR.warn(m())
197 INF_ERR.info(m())
198 INF_ERR.debug(m())
200 self.assert_log_lines([
201 ('INF.ERR', 'CRITICAL', '1'),
202 ('INF.ERR', 'ERROR', '2'),
205 def test_nested_inherited(self):
206 #Logging levels in a nested namespace, inherited from parent loggers.
207 m = self.next_message
209 INF = logging.getLogger("INF")
210 INF.setLevel(logging.INFO)
211 INF_ERR = logging.getLogger("INF.ERR")
212 INF_ERR.setLevel(logging.ERROR)
213 INF_UNDEF = logging.getLogger("INF.UNDEF")
214 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
215 UNDEF = logging.getLogger("UNDEF")
217 # These should log.
218 INF_UNDEF.log(logging.CRITICAL, m())
219 INF_UNDEF.error(m())
220 INF_UNDEF.warn(m())
221 INF_UNDEF.info(m())
222 INF_ERR_UNDEF.log(logging.CRITICAL, m())
223 INF_ERR_UNDEF.error(m())
225 # These should not log.
226 INF_UNDEF.debug(m())
227 INF_ERR_UNDEF.warn(m())
228 INF_ERR_UNDEF.info(m())
229 INF_ERR_UNDEF.debug(m())
231 self.assert_log_lines([
232 ('INF.UNDEF', 'CRITICAL', '1'),
233 ('INF.UNDEF', 'ERROR', '2'),
234 ('INF.UNDEF', 'WARNING', '3'),
235 ('INF.UNDEF', 'INFO', '4'),
236 ('INF.ERR.UNDEF', 'CRITICAL', '5'),
237 ('INF.ERR.UNDEF', 'ERROR', '6'),
240 def test_nested_with_virtual_parent(self):
241 # Logging levels when some parent does not exist yet.
242 m = self.next_message
244 INF = logging.getLogger("INF")
245 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
246 CHILD = logging.getLogger("INF.BADPARENT")
247 INF.setLevel(logging.INFO)
249 # These should log.
250 GRANDCHILD.log(logging.FATAL, m())
251 GRANDCHILD.info(m())
252 CHILD.log(logging.FATAL, m())
253 CHILD.info(m())
255 # These should not log.
256 GRANDCHILD.debug(m())
257 CHILD.debug(m())
259 self.assert_log_lines([
260 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
261 ('INF.BADPARENT.UNDEF', 'INFO', '2'),
262 ('INF.BADPARENT', 'CRITICAL', '3'),
263 ('INF.BADPARENT', 'INFO', '4'),
267 class BasicFilterTest(BaseTest):
269 """Test the bundled Filter class."""
271 def test_filter(self):
272 # Only messages satisfying the specified criteria pass through the
273 # filter.
274 filter_ = logging.Filter("spam.eggs")
275 handler = self.root_logger.handlers[0]
276 try:
277 handler.addFilter(filter_)
278 spam = logging.getLogger("spam")
279 spam_eggs = logging.getLogger("spam.eggs")
280 spam_eggs_fish = logging.getLogger("spam.eggs.fish")
281 spam_bakedbeans = logging.getLogger("spam.bakedbeans")
283 spam.info(self.next_message())
284 spam_eggs.info(self.next_message()) # Good.
285 spam_eggs_fish.info(self.next_message()) # Good.
286 spam_bakedbeans.info(self.next_message())
288 self.assert_log_lines([
289 ('spam.eggs', 'INFO', '2'),
290 ('spam.eggs.fish', 'INFO', '3'),
292 finally:
293 handler.removeFilter(filter_)
297 # First, we define our levels. There can be as many as you want - the only
298 # limitations are that they should be integers, the lowest should be > 0 and
299 # larger values mean less information being logged. If you need specific
300 # level values which do not fit into these limitations, you can use a
301 # mapping dictionary to convert between your application levels and the
302 # logging system.
304 SILENT = 120
305 TACITURN = 119
306 TERSE = 118
307 EFFUSIVE = 117
308 SOCIABLE = 116
309 VERBOSE = 115
310 TALKATIVE = 114
311 GARRULOUS = 113
312 CHATTERBOX = 112
313 BORING = 111
315 LEVEL_RANGE = range(BORING, SILENT + 1)
318 # Next, we define names for our levels. You don't need to do this - in which
319 # case the system will use "Level n" to denote the text for the level.
321 my_logging_levels = {
322 SILENT : 'Silent',
323 TACITURN : 'Taciturn',
324 TERSE : 'Terse',
325 EFFUSIVE : 'Effusive',
326 SOCIABLE : 'Sociable',
327 VERBOSE : 'Verbose',
328 TALKATIVE : 'Talkative',
329 GARRULOUS : 'Garrulous',
330 CHATTERBOX : 'Chatterbox',
331 BORING : 'Boring',
334 class GarrulousFilter(logging.Filter):
336 """A filter which blocks garrulous messages."""
338 def filter(self, record):
339 return record.levelno != GARRULOUS
341 class VerySpecificFilter(logging.Filter):
343 """A filter which blocks sociable and taciturn messages."""
345 def filter(self, record):
346 return record.levelno not in [SOCIABLE, TACITURN]
349 class CustomLevelsAndFiltersTest(BaseTest):
351 """Test various filtering possibilities with custom logging levels."""
353 # Skip the logger name group.
354 expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
356 def setUp(self):
357 BaseTest.setUp(self)
358 for k, v in my_logging_levels.items():
359 logging.addLevelName(k, v)
361 def log_at_all_levels(self, logger):
362 for lvl in LEVEL_RANGE:
363 logger.log(lvl, self.next_message())
365 def test_logger_filter(self):
366 # Filter at logger level.
367 self.root_logger.setLevel(VERBOSE)
368 # Levels >= 'Verbose' are good.
369 self.log_at_all_levels(self.root_logger)
370 self.assert_log_lines([
371 ('Verbose', '5'),
372 ('Sociable', '6'),
373 ('Effusive', '7'),
374 ('Terse', '8'),
375 ('Taciturn', '9'),
376 ('Silent', '10'),
379 def test_handler_filter(self):
380 # Filter at handler level.
381 self.root_logger.handlers[0].setLevel(SOCIABLE)
382 try:
383 # Levels >= 'Sociable' are good.
384 self.log_at_all_levels(self.root_logger)
385 self.assert_log_lines([
386 ('Sociable', '6'),
387 ('Effusive', '7'),
388 ('Terse', '8'),
389 ('Taciturn', '9'),
390 ('Silent', '10'),
392 finally:
393 self.root_logger.handlers[0].setLevel(logging.NOTSET)
395 def test_specific_filters(self):
396 # Set a specific filter object on the handler, and then add another
397 # filter object on the logger itself.
398 handler = self.root_logger.handlers[0]
399 specific_filter = None
400 garr = GarrulousFilter()
401 handler.addFilter(garr)
402 try:
403 self.log_at_all_levels(self.root_logger)
404 first_lines = [
405 # Notice how 'Garrulous' is missing
406 ('Boring', '1'),
407 ('Chatterbox', '2'),
408 ('Talkative', '4'),
409 ('Verbose', '5'),
410 ('Sociable', '6'),
411 ('Effusive', '7'),
412 ('Terse', '8'),
413 ('Taciturn', '9'),
414 ('Silent', '10'),
416 self.assert_log_lines(first_lines)
418 specific_filter = VerySpecificFilter()
419 self.root_logger.addFilter(specific_filter)
420 self.log_at_all_levels(self.root_logger)
421 self.assert_log_lines(first_lines + [
422 # Not only 'Garrulous' is still missing, but also 'Sociable'
423 # and 'Taciturn'
424 ('Boring', '11'),
425 ('Chatterbox', '12'),
426 ('Talkative', '14'),
427 ('Verbose', '15'),
428 ('Effusive', '17'),
429 ('Terse', '18'),
430 ('Silent', '20'),
432 finally:
433 if specific_filter:
434 self.root_logger.removeFilter(specific_filter)
435 handler.removeFilter(garr)
438 class MemoryHandlerTest(BaseTest):
440 """Tests for the MemoryHandler."""
442 # Do not bother with a logger name group.
443 expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
445 def setUp(self):
446 BaseTest.setUp(self)
447 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
448 self.root_hdlr)
449 self.mem_logger = logging.getLogger('mem')
450 self.mem_logger.propagate = 0
451 self.mem_logger.addHandler(self.mem_hdlr)
453 def tearDown(self):
454 self.mem_hdlr.close()
455 BaseTest.tearDown(self)
457 def test_flush(self):
458 # The memory handler flushes to its target handler based on specific
459 # criteria (message count and message level).
460 self.mem_logger.debug(self.next_message())
461 self.assert_log_lines([])
462 self.mem_logger.info(self.next_message())
463 self.assert_log_lines([])
464 # This will flush because the level is >= logging.WARNING
465 self.mem_logger.warn(self.next_message())
466 lines = [
467 ('DEBUG', '1'),
468 ('INFO', '2'),
469 ('WARNING', '3'),
471 self.assert_log_lines(lines)
472 for n in (4, 14):
473 for i in range(9):
474 self.mem_logger.debug(self.next_message())
475 self.assert_log_lines(lines)
476 # This will flush because it's the 10th message since the last
477 # flush.
478 self.mem_logger.debug(self.next_message())
479 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
480 self.assert_log_lines(lines)
482 self.mem_logger.debug(self.next_message())
483 self.assert_log_lines(lines)
486 class ExceptionFormatter(logging.Formatter):
487 """A special exception formatter."""
488 def formatException(self, ei):
489 return "Got a [%s]" % ei[0].__name__
492 class ConfigFileTest(BaseTest):
494 """Reading logging config from a .ini-style config file."""
496 expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
498 # config0 is a standard configuration.
499 config0 = """
500 [loggers]
501 keys=root
503 [handlers]
504 keys=hand1
506 [formatters]
507 keys=form1
509 [logger_root]
510 level=WARNING
511 handlers=hand1
513 [handler_hand1]
514 class=StreamHandler
515 level=NOTSET
516 formatter=form1
517 args=(sys.stdout,)
519 [formatter_form1]
520 format=%(levelname)s ++ %(message)s
521 datefmt=
524 # config1 adds a little to the standard configuration.
525 config1 = """
526 [loggers]
527 keys=root,parser
529 [handlers]
530 keys=hand1
532 [formatters]
533 keys=form1
535 [logger_root]
536 level=WARNING
537 handlers=
539 [logger_parser]
540 level=DEBUG
541 handlers=hand1
542 propagate=1
543 qualname=compiler.parser
545 [handler_hand1]
546 class=StreamHandler
547 level=NOTSET
548 formatter=form1
549 args=(sys.stdout,)
551 [formatter_form1]
552 format=%(levelname)s ++ %(message)s
553 datefmt=
556 # config2 has a subtle configuration error that should be reported
557 config2 = config1.replace("sys.stdout", "sys.stbout")
559 # config3 has a less subtle configuration error
560 config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
562 # config4 specifies a custom formatter class to be loaded
563 config4 = """
564 [loggers]
565 keys=root
567 [handlers]
568 keys=hand1
570 [formatters]
571 keys=form1
573 [logger_root]
574 level=NOTSET
575 handlers=hand1
577 [handler_hand1]
578 class=StreamHandler
579 level=NOTSET
580 formatter=form1
581 args=(sys.stdout,)
583 [formatter_form1]
584 class=""" + __name__ + """.ExceptionFormatter
585 format=%(levelname)s:%(name)s:%(message)s
586 datefmt=
589 # config5 specifies a custom handler class to be loaded
590 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
592 # config6 uses ', ' delimiters in the handlers and formatters sections
593 config6 = """
594 [loggers]
595 keys=root,parser
597 [handlers]
598 keys=hand1, hand2
600 [formatters]
601 keys=form1, form2
603 [logger_root]
604 level=WARNING
605 handlers=
607 [logger_parser]
608 level=DEBUG
609 handlers=hand1
610 propagate=1
611 qualname=compiler.parser
613 [handler_hand1]
614 class=StreamHandler
615 level=NOTSET
616 formatter=form1
617 args=(sys.stdout,)
619 [handler_hand2]
620 class=StreamHandler
621 level=NOTSET
622 formatter=form1
623 args=(sys.stderr,)
625 [formatter_form1]
626 format=%(levelname)s ++ %(message)s
627 datefmt=
629 [formatter_form2]
630 format=%(message)s
631 datefmt=
634 def apply_config(self, conf):
635 try:
636 fn = tempfile.mktemp(".ini")
637 f = open(fn, "w")
638 f.write(textwrap.dedent(conf))
639 f.close()
640 logging.config.fileConfig(fn)
641 finally:
642 os.remove(fn)
644 def test_config0_ok(self):
645 # A simple config file which overrides the default settings.
646 with captured_stdout() as output:
647 self.apply_config(self.config0)
648 logger = logging.getLogger()
649 # Won't output anything
650 logger.info(self.next_message())
651 # Outputs a message
652 logger.error(self.next_message())
653 self.assert_log_lines([
654 ('ERROR', '2'),
655 ], stream=output)
656 # Original logger output is empty.
657 self.assert_log_lines([])
659 def test_config1_ok(self, config=config1):
660 # A config file defining a sub-parser as well.
661 with captured_stdout() as output:
662 self.apply_config(config)
663 logger = logging.getLogger("compiler.parser")
664 # Both will output a message
665 logger.info(self.next_message())
666 logger.error(self.next_message())
667 self.assert_log_lines([
668 ('INFO', '1'),
669 ('ERROR', '2'),
670 ], stream=output)
671 # Original logger output is empty.
672 self.assert_log_lines([])
674 def test_config2_failure(self):
675 # A simple config file which overrides the default settings.
676 self.assertRaises(StandardError, self.apply_config, self.config2)
678 def test_config3_failure(self):
679 # A simple config file which overrides the default settings.
680 self.assertRaises(StandardError, self.apply_config, self.config3)
682 def test_config4_ok(self):
683 # A config file specifying a custom formatter class.
684 with captured_stdout() as output:
685 self.apply_config(self.config4)
686 logger = logging.getLogger()
687 try:
688 raise RuntimeError()
689 except RuntimeError:
690 logging.exception("just testing")
691 sys.stdout.seek(0)
692 self.assertEquals(output.getvalue(),
693 "ERROR:root:just testing\nGot a [RuntimeError]\n")
694 # Original logger output is empty
695 self.assert_log_lines([])
697 def test_config5_ok(self):
698 self.test_config1_ok(config=self.config5)
700 def test_config6_ok(self):
701 self.test_config1_ok(config=self.config6)
703 class LogRecordStreamHandler(StreamRequestHandler):
705 """Handler for a streaming logging request. It saves the log message in the
706 TCP server's 'log_output' attribute."""
708 TCP_LOG_END = "!!!END!!!"
710 def handle(self):
711 """Handle multiple requests - each expected to be of 4-byte length,
712 followed by the LogRecord in pickle format. Logs the record
713 according to whatever policy is configured locally."""
714 while True:
715 chunk = self.connection.recv(4)
716 if len(chunk) < 4:
717 break
718 slen = struct.unpack(">L", chunk)[0]
719 chunk = self.connection.recv(slen)
720 while len(chunk) < slen:
721 chunk = chunk + self.connection.recv(slen - len(chunk))
722 obj = self.unpickle(chunk)
723 record = logging.makeLogRecord(obj)
724 self.handle_log_record(record)
726 def unpickle(self, data):
727 return cPickle.loads(data)
729 def handle_log_record(self, record):
730 # If the end-of-messages sentinel is seen, tell the server to
731 # terminate.
732 if self.TCP_LOG_END in record.msg:
733 self.server.abort = 1
734 return
735 self.server.log_output += record.msg + "\n"
738 class LogRecordSocketReceiver(ThreadingTCPServer):
740 """A simple-minded TCP socket-based logging receiver suitable for test
741 purposes."""
743 allow_reuse_address = 1
744 log_output = ""
746 def __init__(self, host='localhost',
747 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
748 handler=LogRecordStreamHandler):
749 ThreadingTCPServer.__init__(self, (host, port), handler)
750 self.abort = False
751 self.timeout = 0.1
752 self.finished = threading.Event()
754 def serve_until_stopped(self):
755 while not self.abort:
756 rd, wr, ex = select.select([self.socket.fileno()], [], [],
757 self.timeout)
758 if rd:
759 self.handle_request()
760 # Notify the main thread that we're about to exit
761 self.finished.set()
762 # close the listen socket
763 self.server_close()
766 class SocketHandlerTest(BaseTest):
768 """Test for SocketHandler objects."""
770 def setUp(self):
771 """Set up a TCP server to receive log messages, and a SocketHandler
772 pointing to that server's address and port."""
773 BaseTest.setUp(self)
774 self.tcpserver = LogRecordSocketReceiver(port=0)
775 self.port = self.tcpserver.socket.getsockname()[1]
776 self.threads = [
777 threading.Thread(target=self.tcpserver.serve_until_stopped)]
778 for thread in self.threads:
779 thread.start()
781 self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
782 self.sock_hdlr.setFormatter(self.root_formatter)
783 self.root_logger.removeHandler(self.root_logger.handlers[0])
784 self.root_logger.addHandler(self.sock_hdlr)
786 def tearDown(self):
787 """Shutdown the TCP server."""
788 try:
789 self.tcpserver.abort = True
790 del self.tcpserver
791 self.root_logger.removeHandler(self.sock_hdlr)
792 self.sock_hdlr.close()
793 for thread in self.threads:
794 thread.join(2.0)
795 finally:
796 BaseTest.tearDown(self)
798 def get_output(self):
799 """Get the log output as received by the TCP server."""
800 # Signal the TCP receiver and wait for it to terminate.
801 self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
802 self.tcpserver.finished.wait(2.0)
803 return self.tcpserver.log_output
805 def test_output(self):
806 # The log message sent to the SocketHandler is properly received.
807 logger = logging.getLogger("tcp")
808 logger.error("spam")
809 logger.debug("eggs")
810 self.assertEquals(self.get_output(), "spam\neggs\n")
813 class MemoryTest(BaseTest):
815 """Test memory persistence of logger objects."""
817 def setUp(self):
818 """Create a dict to remember potentially destroyed objects."""
819 BaseTest.setUp(self)
820 self._survivors = {}
822 def _watch_for_survival(self, *args):
823 """Watch the given objects for survival, by creating weakrefs to
824 them."""
825 for obj in args:
826 key = id(obj), repr(obj)
827 self._survivors[key] = weakref.ref(obj)
829 def _assertTruesurvival(self):
830 """Assert that all objects watched for survival have survived."""
831 # Trigger cycle breaking.
832 gc.collect()
833 dead = []
834 for (id_, repr_), ref in self._survivors.items():
835 if ref() is None:
836 dead.append(repr_)
837 if dead:
838 self.fail("%d objects should have survived "
839 "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
841 def test_persistent_loggers(self):
842 # Logger objects are persistent and retain their configuration, even
843 # if visible references are destroyed.
844 self.root_logger.setLevel(logging.INFO)
845 foo = logging.getLogger("foo")
846 self._watch_for_survival(foo)
847 foo.setLevel(logging.DEBUG)
848 self.root_logger.debug(self.next_message())
849 foo.debug(self.next_message())
850 self.assert_log_lines([
851 ('foo', 'DEBUG', '2'),
853 del foo
854 # foo has survived.
855 self._assertTruesurvival()
856 # foo has retained its settings.
857 bar = logging.getLogger("foo")
858 bar.debug(self.next_message())
859 self.assert_log_lines([
860 ('foo', 'DEBUG', '2'),
861 ('foo', 'DEBUG', '3'),
865 class EncodingTest(BaseTest):
866 def test_encoding_plain_file(self):
867 # In Python 2.x, a plain file object is treated as having no encoding.
868 log = logging.getLogger("test")
869 fn = tempfile.mktemp(".log")
870 # the non-ascii data we write to the log.
871 data = "foo\x80"
872 try:
873 handler = logging.FileHandler(fn)
874 log.addHandler(handler)
875 try:
876 # write non-ascii data to the log.
877 log.warning(data)
878 finally:
879 log.removeHandler(handler)
880 handler.close()
881 # check we wrote exactly those bytes, ignoring trailing \n etc
882 f = open(fn)
883 try:
884 self.assertEqual(f.read().rstrip(), data)
885 finally:
886 f.close()
887 finally:
888 if os.path.isfile(fn):
889 os.remove(fn)
891 def test_encoding_cyrillic_unicode(self):
892 log = logging.getLogger("test")
893 #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
894 message = u'\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
895 #Ensure it's written in a Cyrillic encoding
896 writer_class = codecs.getwriter('cp1251')
897 writer_class.encoding = 'cp1251'
898 stream = cStringIO.StringIO()
899 writer = writer_class(stream, 'strict')
900 handler = logging.StreamHandler(writer)
901 log.addHandler(handler)
902 try:
903 log.warning(message)
904 finally:
905 log.removeHandler(handler)
906 handler.close()
907 # check we wrote exactly those bytes, ignoring trailing \n etc
908 s = stream.getvalue()
909 #Compare against what the data should be when encoded in CP-1251
910 self.assertEqual(s, '\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
913 class WarningsTest(BaseTest):
915 def test_warnings(self):
916 with warnings.catch_warnings():
917 logging.captureWarnings(True)
918 try:
919 warnings.filterwarnings("always", category=UserWarning)
920 file = cStringIO.StringIO()
921 h = logging.StreamHandler(file)
922 logger = logging.getLogger("py.warnings")
923 logger.addHandler(h)
924 warnings.warn("I'm warning you...")
925 logger.removeHandler(h)
926 s = file.getvalue()
927 h.close()
928 self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
930 #See if an explicit file uses the original implementation
931 file = cStringIO.StringIO()
932 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
933 file, "Dummy line")
934 s = file.getvalue()
935 file.close()
936 self.assertEqual(s,
937 "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
938 finally:
939 logging.captureWarnings(False)
941 # Set the locale to the platform-dependent default. I have no idea
942 # why the test does this, but in any case we save the current locale
943 # first and restore it at the end.
944 @run_with_locale('LC_ALL', '')
945 def test_main():
946 run_unittest(BuiltinLevelsTest, BasicFilterTest,
947 CustomLevelsAndFiltersTest, MemoryHandlerTest,
948 ConfigFileTest, SocketHandlerTest, MemoryTest,
949 EncodingTest, WarningsTest)
951 if __name__ == "__main__":
952 test_main()