3 # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
5 # Permission to use, copy, modify, and distribute this software and its
6 # documentation for any purpose and without fee is hereby granted,
7 # provided that the above copyright notice appear in all copies and that
8 # both that copyright notice and this permission notice appear in
9 # supporting documentation, and that the name of Vinay Sajip
10 # not be used in advertising or publicity pertaining to distribution
11 # of the software without specific, written prior permission.
12 # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
13 # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
14 # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
15 # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
16 # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
17 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
19 """Test harness for the logging module. Run all tests.
21 Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
25 import logging
.handlers
36 from SocketServer
import ThreadingTCPServer
, StreamRequestHandler
41 from test
.test_support
import captured_stdout
, run_with_locale
, run_unittest
50 class BaseTest(unittest
.TestCase
):
52 """Base class for logging tests."""
54 log_format
= "%(name)s -> %(levelname)s: %(message)s"
55 expected_log_pat
= r
"^([\w.]+) -> ([\w]+): ([\d]+)$"
59 """Setup the default logging stream to an internal StringIO instance,
60 so that we can examine log output as we want."""
61 logger_dict
= logging
.getLogger().manager
.loggerDict
62 logging
._acquireLock
()
64 self
.saved_handlers
= logging
._handlers
.copy()
65 self
.saved_handler_list
= logging
._handlerList
[:]
66 self
.saved_loggers
= logger_dict
.copy()
67 self
.saved_level_names
= logging
._levelNames
.copy()
69 logging
._releaseLock
()
71 self
.root_logger
= logging
.getLogger("")
72 self
.original_logging_level
= self
.root_logger
.getEffectiveLevel()
74 self
.stream
= cStringIO
.StringIO()
75 self
.root_logger
.setLevel(logging
.DEBUG
)
76 self
.root_hdlr
= logging
.StreamHandler(self
.stream
)
77 self
.root_formatter
= logging
.Formatter(self
.log_format
)
78 self
.root_hdlr
.setFormatter(self
.root_formatter
)
79 self
.root_logger
.addHandler(self
.root_hdlr
)
82 """Remove our logging stream, and restore the original logging
85 self
.root_logger
.removeHandler(self
.root_hdlr
)
86 self
.root_logger
.setLevel(self
.original_logging_level
)
87 logging
._acquireLock
()
89 logging
._levelNames
.clear()
90 logging
._levelNames
.update(self
.saved_level_names
)
91 logging
._handlers
.clear()
92 logging
._handlers
.update(self
.saved_handlers
)
93 logging
._handlerList
[:] = self
.saved_handler_list
94 loggerDict
= logging
.getLogger().manager
.loggerDict
96 loggerDict
.update(self
.saved_loggers
)
98 logging
._releaseLock
()
100 def assert_log_lines(self
, expected_values
, stream
=None):
101 """Match the collected log lines against the regular expression
102 self.expected_log_pat, and compare the extracted group values to
103 the expected_values list of tuples."""
104 stream
= stream
or self
.stream
105 pat
= re
.compile(self
.expected_log_pat
)
108 actual_lines
= stream
.readlines()
109 except AttributeError:
110 # StringIO.StringIO lacks a reset() method.
111 actual_lines
= stream
.getvalue().splitlines()
112 self
.assertEquals(len(actual_lines
), len(expected_values
))
113 for actual
, expected
in zip(actual_lines
, expected_values
):
114 match
= pat
.search(actual
)
116 self
.fail("Log line does not match expected pattern:\n" +
118 self
.assertEquals(tuple(match
.groups()), expected
)
121 self
.fail("Remaining output at end of log stream:\n" + s
)
123 def next_message(self
):
124 """Generate a message consisting solely of an auto-incrementing
126 self
.message_num
+= 1
127 return "%d" % self
.message_num
130 class BuiltinLevelsTest(BaseTest
):
131 """Test builtin levels and their inheritance."""
134 #Logging levels in a flat logger namespace.
135 m
= self
.next_message
137 ERR
= logging
.getLogger("ERR")
138 ERR
.setLevel(logging
.ERROR
)
139 INF
= logging
.getLogger("INF")
140 INF
.setLevel(logging
.INFO
)
141 DEB
= logging
.getLogger("DEB")
142 DEB
.setLevel(logging
.DEBUG
)
145 ERR
.log(logging
.CRITICAL
, m())
148 INF
.log(logging
.CRITICAL
, m())
153 DEB
.log(logging
.CRITICAL
, m())
159 # These should not log.
166 self
.assert_log_lines([
167 ('ERR', 'CRITICAL', '1'),
168 ('ERR', 'ERROR', '2'),
169 ('INF', 'CRITICAL', '3'),
170 ('INF', 'ERROR', '4'),
171 ('INF', 'WARNING', '5'),
172 ('INF', 'INFO', '6'),
173 ('DEB', 'CRITICAL', '7'),
174 ('DEB', 'ERROR', '8'),
175 ('DEB', 'WARNING', '9'),
176 ('DEB', 'INFO', '10'),
177 ('DEB', 'DEBUG', '11'),
180 def test_nested_explicit(self
):
181 # Logging levels in a nested namespace, all explicitly set.
182 m
= self
.next_message
184 INF
= logging
.getLogger("INF")
185 INF
.setLevel(logging
.INFO
)
186 INF_ERR
= logging
.getLogger("INF.ERR")
187 INF_ERR
.setLevel(logging
.ERROR
)
190 INF_ERR
.log(logging
.CRITICAL
, m())
193 # These should not log.
198 self
.assert_log_lines([
199 ('INF.ERR', 'CRITICAL', '1'),
200 ('INF.ERR', 'ERROR', '2'),
203 def test_nested_inherited(self
):
204 #Logging levels in a nested namespace, inherited from parent loggers.
205 m
= self
.next_message
207 INF
= logging
.getLogger("INF")
208 INF
.setLevel(logging
.INFO
)
209 INF_ERR
= logging
.getLogger("INF.ERR")
210 INF_ERR
.setLevel(logging
.ERROR
)
211 INF_UNDEF
= logging
.getLogger("INF.UNDEF")
212 INF_ERR_UNDEF
= logging
.getLogger("INF.ERR.UNDEF")
213 UNDEF
= logging
.getLogger("UNDEF")
216 INF_UNDEF
.log(logging
.CRITICAL
, m())
220 INF_ERR_UNDEF
.log(logging
.CRITICAL
, m())
221 INF_ERR_UNDEF
.error(m())
223 # These should not log.
225 INF_ERR_UNDEF
.warn(m())
226 INF_ERR_UNDEF
.info(m())
227 INF_ERR_UNDEF
.debug(m())
229 self
.assert_log_lines([
230 ('INF.UNDEF', 'CRITICAL', '1'),
231 ('INF.UNDEF', 'ERROR', '2'),
232 ('INF.UNDEF', 'WARNING', '3'),
233 ('INF.UNDEF', 'INFO', '4'),
234 ('INF.ERR.UNDEF', 'CRITICAL', '5'),
235 ('INF.ERR.UNDEF', 'ERROR', '6'),
238 def test_nested_with_virtual_parent(self
):
239 # Logging levels when some parent does not exist yet.
240 m
= self
.next_message
242 INF
= logging
.getLogger("INF")
243 GRANDCHILD
= logging
.getLogger("INF.BADPARENT.UNDEF")
244 CHILD
= logging
.getLogger("INF.BADPARENT")
245 INF
.setLevel(logging
.INFO
)
248 GRANDCHILD
.log(logging
.FATAL
, m())
250 CHILD
.log(logging
.FATAL
, m())
253 # These should not log.
254 GRANDCHILD
.debug(m())
257 self
.assert_log_lines([
258 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
259 ('INF.BADPARENT.UNDEF', 'INFO', '2'),
260 ('INF.BADPARENT', 'CRITICAL', '3'),
261 ('INF.BADPARENT', 'INFO', '4'),
265 class BasicFilterTest(BaseTest
):
267 """Test the bundled Filter class."""
269 def test_filter(self
):
270 # Only messages satisfying the specified criteria pass through the
272 filter_
= logging
.Filter("spam.eggs")
273 handler
= self
.root_logger
.handlers
[0]
275 handler
.addFilter(filter_
)
276 spam
= logging
.getLogger("spam")
277 spam_eggs
= logging
.getLogger("spam.eggs")
278 spam_eggs_fish
= logging
.getLogger("spam.eggs.fish")
279 spam_bakedbeans
= logging
.getLogger("spam.bakedbeans")
281 spam
.info(self
.next_message())
282 spam_eggs
.info(self
.next_message()) # Good.
283 spam_eggs_fish
.info(self
.next_message()) # Good.
284 spam_bakedbeans
.info(self
.next_message())
286 self
.assert_log_lines([
287 ('spam.eggs', 'INFO', '2'),
288 ('spam.eggs.fish', 'INFO', '3'),
291 handler
.removeFilter(filter_
)
295 # First, we define our levels. There can be as many as you want - the only
296 # limitations are that they should be integers, the lowest should be > 0 and
297 # larger values mean less information being logged. If you need specific
298 # level values which do not fit into these limitations, you can use a
299 # mapping dictionary to convert between your application levels and the
313 LEVEL_RANGE
= range(BORING
, SILENT
+ 1)
316 # Next, we define names for our levels. You don't need to do this - in which
317 # case the system will use "Level n" to denote the text for the level.
319 my_logging_levels
= {
321 TACITURN
: 'Taciturn',
323 EFFUSIVE
: 'Effusive',
324 SOCIABLE
: 'Sociable',
326 TALKATIVE
: 'Talkative',
327 GARRULOUS
: 'Garrulous',
328 CHATTERBOX
: 'Chatterbox',
332 class GarrulousFilter(logging
.Filter
):
334 """A filter which blocks garrulous messages."""
336 def filter(self
, record
):
337 return record
.levelno
!= GARRULOUS
339 class VerySpecificFilter(logging
.Filter
):
341 """A filter which blocks sociable and taciturn messages."""
343 def filter(self
, record
):
344 return record
.levelno
not in [SOCIABLE
, TACITURN
]
347 class CustomLevelsAndFiltersTest(BaseTest
):
349 """Test various filtering possibilities with custom logging levels."""
351 # Skip the logger name group.
352 expected_log_pat
= r
"^[\w.]+ -> ([\w]+): ([\d]+)$"
356 for k
, v
in my_logging_levels
.items():
357 logging
.addLevelName(k
, v
)
359 def log_at_all_levels(self
, logger
):
360 for lvl
in LEVEL_RANGE
:
361 logger
.log(lvl
, self
.next_message())
363 def test_logger_filter(self
):
364 # Filter at logger level.
365 self
.root_logger
.setLevel(VERBOSE
)
366 # Levels >= 'Verbose' are good.
367 self
.log_at_all_levels(self
.root_logger
)
368 self
.assert_log_lines([
377 def test_handler_filter(self
):
378 # Filter at handler level.
379 self
.root_logger
.handlers
[0].setLevel(SOCIABLE
)
381 # Levels >= 'Sociable' are good.
382 self
.log_at_all_levels(self
.root_logger
)
383 self
.assert_log_lines([
391 self
.root_logger
.handlers
[0].setLevel(logging
.NOTSET
)
393 def test_specific_filters(self
):
394 # Set a specific filter object on the handler, and then add another
395 # filter object on the logger itself.
396 handler
= self
.root_logger
.handlers
[0]
397 specific_filter
= None
398 garr
= GarrulousFilter()
399 handler
.addFilter(garr
)
401 self
.log_at_all_levels(self
.root_logger
)
403 # Notice how 'Garrulous' is missing
414 self
.assert_log_lines(first_lines
)
416 specific_filter
= VerySpecificFilter()
417 self
.root_logger
.addFilter(specific_filter
)
418 self
.log_at_all_levels(self
.root_logger
)
419 self
.assert_log_lines(first_lines
+ [
420 # Not only 'Garrulous' is still missing, but also 'Sociable'
423 ('Chatterbox', '12'),
432 self
.root_logger
.removeFilter(specific_filter
)
433 handler
.removeFilter(garr
)
436 class MemoryHandlerTest(BaseTest
):
438 """Tests for the MemoryHandler."""
440 # Do not bother with a logger name group.
441 expected_log_pat
= r
"^[\w.]+ -> ([\w]+): ([\d]+)$"
445 self
.mem_hdlr
= logging
.handlers
.MemoryHandler(10, logging
.WARNING
,
447 self
.mem_logger
= logging
.getLogger('mem')
448 self
.mem_logger
.propagate
= 0
449 self
.mem_logger
.addHandler(self
.mem_hdlr
)
452 self
.mem_hdlr
.close()
453 BaseTest
.tearDown(self
)
455 def test_flush(self
):
456 # The memory handler flushes to its target handler based on specific
457 # criteria (message count and message level).
458 self
.mem_logger
.debug(self
.next_message())
459 self
.assert_log_lines([])
460 self
.mem_logger
.info(self
.next_message())
461 self
.assert_log_lines([])
462 # This will flush because the level is >= logging.WARNING
463 self
.mem_logger
.warn(self
.next_message())
469 self
.assert_log_lines(lines
)
472 self
.mem_logger
.debug(self
.next_message())
473 self
.assert_log_lines(lines
)
474 # This will flush because it's the 10th message since the last
476 self
.mem_logger
.debug(self
.next_message())
477 lines
= lines
+ [('DEBUG', str(i
)) for i
in range(n
, n
+ 10)]
478 self
.assert_log_lines(lines
)
480 self
.mem_logger
.debug(self
.next_message())
481 self
.assert_log_lines(lines
)
484 class ExceptionFormatter(logging
.Formatter
):
485 """A special exception formatter."""
486 def formatException(self
, ei
):
487 return "Got a [%s]" % ei
[0].__name
__
490 class ConfigFileTest(BaseTest
):
492 """Reading logging config from a .ini-style config file."""
494 expected_log_pat
= r
"^([\w]+) \+\+ ([\w]+)$"
496 # config0 is a standard configuration.
518 format=%(levelname)s ++ %(message)s
522 # config1 adds a little to the standard configuration.
541 qualname=compiler.parser
550 format=%(levelname)s ++ %(message)s
554 # config2 has a subtle configuration error that should be reported
555 config2
= config1
.replace("sys.stdout", "sys.stbout")
557 # config3 has a less subtle configuration error
558 config3
= config1
.replace("formatter=form1", "formatter=misspelled_name")
560 # config4 specifies a custom formatter class to be loaded
582 class=""" + __name__
+ """.ExceptionFormatter
583 format=%(levelname)s:%(name)s:%(message)s
587 # config5 specifies a custom handler class to be loaded
588 config5
= config1
.replace('class=StreamHandler', 'class=logging.StreamHandler')
590 # config6 uses ', ' delimiters in the handlers and formatters sections
609 qualname=compiler.parser
624 format=%(levelname)s ++ %(message)s
632 def apply_config(self
, conf
):
634 fn
= tempfile
.mktemp(".ini")
636 f
.write(textwrap
.dedent(conf
))
638 logging
.config
.fileConfig(fn
)
642 def test_config0_ok(self
):
643 # A simple config file which overrides the default settings.
644 with
captured_stdout() as output
:
645 self
.apply_config(self
.config0
)
646 logger
= logging
.getLogger()
647 # Won't output anything
648 logger
.info(self
.next_message())
650 logger
.error(self
.next_message())
651 self
.assert_log_lines([
654 # Original logger output is empty.
655 self
.assert_log_lines([])
657 def test_config1_ok(self
, config
=config1
):
658 # A config file defining a sub-parser as well.
659 with
captured_stdout() as output
:
660 self
.apply_config(config
)
661 logger
= logging
.getLogger("compiler.parser")
662 # Both will output a message
663 logger
.info(self
.next_message())
664 logger
.error(self
.next_message())
665 self
.assert_log_lines([
669 # Original logger output is empty.
670 self
.assert_log_lines([])
672 def test_config2_failure(self
):
673 # A simple config file which overrides the default settings.
674 self
.assertRaises(StandardError, self
.apply_config
, self
.config2
)
676 def test_config3_failure(self
):
677 # A simple config file which overrides the default settings.
678 self
.assertRaises(StandardError, self
.apply_config
, self
.config3
)
680 def test_config4_ok(self
):
681 # A config file specifying a custom formatter class.
682 with
captured_stdout() as output
:
683 self
.apply_config(self
.config4
)
684 logger
= logging
.getLogger()
688 logging
.exception("just testing")
690 self
.assertEquals(output
.getvalue(),
691 "ERROR:root:just testing\nGot a [RuntimeError]\n")
692 # Original logger output is empty
693 self
.assert_log_lines([])
695 def test_config5_ok(self
):
696 self
.test_config1_ok(config
=self
.config5
)
698 def test_config6_ok(self
):
699 self
.test_config1_ok(config
=self
.config6
)
701 class LogRecordStreamHandler(StreamRequestHandler
):
703 """Handler for a streaming logging request. It saves the log message in the
704 TCP server's 'log_output' attribute."""
706 TCP_LOG_END
= "!!!END!!!"
709 """Handle multiple requests - each expected to be of 4-byte length,
710 followed by the LogRecord in pickle format. Logs the record
711 according to whatever policy is configured locally."""
713 chunk
= self
.connection
.recv(4)
716 slen
= struct
.unpack(">L", chunk
)[0]
717 chunk
= self
.connection
.recv(slen
)
718 while len(chunk
) < slen
:
719 chunk
= chunk
+ self
.connection
.recv(slen
- len(chunk
))
720 obj
= self
.unpickle(chunk
)
721 record
= logging
.makeLogRecord(obj
)
722 self
.handle_log_record(record
)
724 def unpickle(self
, data
):
725 return cPickle
.loads(data
)
727 def handle_log_record(self
, record
):
728 # If the end-of-messages sentinel is seen, tell the server to
730 if self
.TCP_LOG_END
in record
.msg
:
731 self
.server
.abort
= 1
733 self
.server
.log_output
+= record
.msg
+ "\n"
736 class LogRecordSocketReceiver(ThreadingTCPServer
):
738 """A simple-minded TCP socket-based logging receiver suitable for test
741 allow_reuse_address
= 1
744 def __init__(self
, host
='localhost',
745 port
=logging
.handlers
.DEFAULT_TCP_LOGGING_PORT
,
746 handler
=LogRecordStreamHandler
):
747 ThreadingTCPServer
.__init
__(self
, (host
, port
), handler
)
750 self
.finished
= threading
.Event()
752 def serve_until_stopped(self
):
753 while not self
.abort
:
754 rd
, wr
, ex
= select
.select([self
.socket
.fileno()], [], [],
757 self
.handle_request()
758 # Notify the main thread that we're about to exit
760 # close the listen socket
764 class SocketHandlerTest(BaseTest
):
766 """Test for SocketHandler objects."""
769 """Set up a TCP server to receive log messages, and a SocketHandler
770 pointing to that server's address and port."""
772 self
.tcpserver
= LogRecordSocketReceiver(port
=0)
773 self
.port
= self
.tcpserver
.socket
.getsockname()[1]
775 threading
.Thread(target
=self
.tcpserver
.serve_until_stopped
)]
776 for thread
in self
.threads
:
779 self
.sock_hdlr
= logging
.handlers
.SocketHandler('localhost', self
.port
)
780 self
.sock_hdlr
.setFormatter(self
.root_formatter
)
781 self
.root_logger
.removeHandler(self
.root_logger
.handlers
[0])
782 self
.root_logger
.addHandler(self
.sock_hdlr
)
785 """Shutdown the TCP server."""
787 self
.tcpserver
.abort
= True
789 self
.root_logger
.removeHandler(self
.sock_hdlr
)
790 self
.sock_hdlr
.close()
791 for thread
in self
.threads
:
794 BaseTest
.tearDown(self
)
796 def get_output(self
):
797 """Get the log output as received by the TCP server."""
798 # Signal the TCP receiver and wait for it to terminate.
799 self
.root_logger
.critical(LogRecordStreamHandler
.TCP_LOG_END
)
800 self
.tcpserver
.finished
.wait(2.0)
801 return self
.tcpserver
.log_output
803 def test_output(self
):
804 # The log message sent to the SocketHandler is properly received.
805 logger
= logging
.getLogger("tcp")
808 self
.assertEquals(self
.get_output(), "spam\neggs\n")
811 class MemoryTest(BaseTest
):
813 """Test memory persistence of logger objects."""
816 """Create a dict to remember potentially destroyed objects."""
820 def _watch_for_survival(self
, *args
):
821 """Watch the given objects for survival, by creating weakrefs to
824 key
= id(obj
), repr(obj
)
825 self
._survivors
[key
] = weakref
.ref(obj
)
827 def _assert_survival(self
):
828 """Assert that all objects watched for survival have survived."""
829 # Trigger cycle breaking.
832 for (id_
, repr_
), ref
in self
._survivors
.items():
836 self
.fail("%d objects should have survived "
837 "but have been destroyed: %s" % (len(dead
), ", ".join(dead
)))
839 def test_persistent_loggers(self
):
840 # Logger objects are persistent and retain their configuration, even
841 # if visible references are destroyed.
842 self
.root_logger
.setLevel(logging
.INFO
)
843 foo
= logging
.getLogger("foo")
844 self
._watch
_for
_survival
(foo
)
845 foo
.setLevel(logging
.DEBUG
)
846 self
.root_logger
.debug(self
.next_message())
847 foo
.debug(self
.next_message())
848 self
.assert_log_lines([
849 ('foo', 'DEBUG', '2'),
853 self
._assert
_survival
()
854 # foo has retained its settings.
855 bar
= logging
.getLogger("foo")
856 bar
.debug(self
.next_message())
857 self
.assert_log_lines([
858 ('foo', 'DEBUG', '2'),
859 ('foo', 'DEBUG', '3'),
862 class EncodingTest(BaseTest
):
863 def test_encoding_plain_file(self
):
864 # In Python 2.x, a plain file object is treated as having no encoding.
865 log
= logging
.getLogger("test")
866 fn
= tempfile
.mktemp(".log")
867 # the non-ascii data we write to the log.
870 handler
= logging
.FileHandler(fn
)
871 log
.addHandler(handler
)
873 # write non-ascii data to the log.
876 log
.removeHandler(handler
)
878 # check we wrote exactly those bytes, ignoring trailing \n etc
881 self
.failUnlessEqual(f
.read().rstrip(), data
)
885 if os
.path
.isfile(fn
):
888 # Set the locale to the platform-dependent default. I have no idea
889 # why the test does this, but in any case we save the current locale
890 # first and restore it at the end.
891 @run_with_locale('LC_ALL', '')
893 run_unittest(BuiltinLevelsTest
, BasicFilterTest
,
894 CustomLevelsAndFiltersTest
, MemoryHandlerTest
,
895 ConfigFileTest
, SocketHandlerTest
, MemoryTest
,
898 if __name__
== "__main__":