Fix timestamp constructing and representing (close #25).
[pyyaml/python3.git] / tests / test_emitter.py
blobd45cd8f4107cc8427da112e16cc5f0c6d53b5b4c
2 import test_appliance, sys, StringIO
4 from yaml import *
5 import yaml
7 class TestEmitter(test_appliance.TestAppliance):
9 def _testEmitterOnData(self, test_name, canonical_filename, data_filename):
10 self._testEmitter(test_name, data_filename)
12 def _testEmitterOnCanonicalNormally(self, test_name, canonical_filename):
13 self._testEmitter(test_name, canonical_filename, False)
15 def _testEmitterOnCanonicalCanonically(self, test_name, canonical_filename):
16 self._testEmitter(test_name, canonical_filename, True)
18 def _testEmitter(self, test_name, filename, canonical=None):
19 events = list(parse(file(filename, 'rb')))
20 #self._dump(filename, events, canonical)
21 stream = StringIO.StringIO()
22 emit(events, stream, canonical=canonical)
23 data = stream.getvalue()
24 new_events = list(parse(data))
25 for event, new_event in zip(events, new_events):
26 self.failUnlessEqual(event.__class__, new_event.__class__)
27 if isinstance(event, NodeEvent):
28 self.failUnlessEqual(event.anchor, new_event.anchor)
29 if isinstance(event, CollectionStartEvent):
30 self.failUnlessEqual(event.tag, new_event.tag)
31 if isinstance(event, ScalarEvent):
32 #self.failUnlessEqual(event.implicit, new_event.implicit)
33 if True not in event.implicit+new_event.implicit:
34 self.failUnlessEqual(event.tag, new_event.tag)
35 self.failUnlessEqual(event.value, new_event.value)
37 def _dump(self, filename, events, canonical):
38 print "="*30
39 print "ORIGINAL DOCUMENT:"
40 print file(filename, 'rb').read()
41 print '-'*30
42 print "EMITTED DOCUMENT:"
43 emit(events, sys.stdout, canonical=canonical)
45 TestEmitter.add_tests('testEmitterOnData', '.canonical', '.data')
46 TestEmitter.add_tests('testEmitterOnCanonicalNormally', '.canonical')
47 TestEmitter.add_tests('testEmitterOnCanonicalCanonically', '.canonical')
49 class EventsLoader(Loader):
51 def construct_event(self, node):
52 if isinstance(node, ScalarNode):
53 mapping = {}
54 else:
55 mapping = self.construct_mapping(node)
56 class_name = str(node.tag[1:])+'Event'
57 if class_name in ['AliasEvent', 'ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
58 mapping.setdefault('anchor', None)
59 if class_name in ['ScalarEvent', 'SequenceStartEvent', 'MappingStartEvent']:
60 mapping.setdefault('tag', None)
61 if class_name in ['SequenceStartEvent', 'MappingStartEvent']:
62 mapping.setdefault('implicit', True)
63 if class_name == 'ScalarEvent':
64 mapping.setdefault('implicit', (False, True))
65 mapping.setdefault('value', '')
66 value = getattr(yaml, class_name)(**mapping)
67 return value
69 EventsLoader.add_constructor(None, EventsLoader.construct_event)
71 class TestEmitterEvents(test_appliance.TestAppliance):
73 def _testEmitterEvents(self, test_name, events_filename):
74 events = list(load(file(events_filename, 'rb'), Loader=EventsLoader))
75 #self._dump(events_filename, events)
76 stream = StringIO.StringIO()
77 emit(events, stream)
78 data = stream.getvalue()
79 new_events = list(parse(data))
80 self.failUnlessEqual(len(events), len(new_events))
81 for event, new_event in zip(events, new_events):
82 self.failUnlessEqual(event.__class__, new_event.__class__)
83 if isinstance(event, NodeEvent):
84 self.failUnlessEqual(event.anchor, new_event.anchor)
85 if isinstance(event, CollectionStartEvent):
86 self.failUnlessEqual(event.tag, new_event.tag)
87 if isinstance(event, ScalarEvent):
88 self.failUnless(event.implicit == new_event.implicit
89 or event.tag == new_event.tag)
90 self.failUnlessEqual(event.value, new_event.value)
92 def _dump(self, events_filename, events):
93 print "="*30
94 print "EVENTS:"
95 print file(events_filename, 'rb').read()
96 print '-'*30
97 print "OUTPUT:"
98 emit(events, sys.stdout)
100 TestEmitterEvents.add_tests('testEmitterEvents', '.events')