Fix a test.
[pyyaml.git] / tests / test_appliance.py
blobeb54faa5c090bc6f8e104e6e368624ee6454eec0
2 import unittest, os
4 from yaml.tokens import *
5 from yaml.events import *
7 class TestAppliance(unittest.TestCase):
9 DATA = 'tests/data'
11 all_tests = {}
12 for filename in os.listdir(DATA):
13 if os.path.isfile(os.path.join(DATA, filename)):
14 root, ext = os.path.splitext(filename)
15 all_tests.setdefault(root, []).append(ext)
17 def add_tests(cls, method_name, *extensions):
18 for test in cls.all_tests:
19 available_extensions = cls.all_tests[test]
20 for ext in extensions:
21 if ext not in available_extensions:
22 break
23 else:
24 filenames = [os.path.join(cls.DATA, test+ext) for ext in extensions]
25 def test_method(self, test=test, filenames=filenames):
26 getattr(self, '_'+method_name)(test, *filenames)
27 test = test.replace('-', '_')
28 try:
29 test_method.__name__ = '%s_%s' % (method_name, test)
30 except TypeError:
31 import new
32 test_method = new.function(test_method.func_code, test_method.func_globals,
33 '%s_%s' % (method_name, test), test_method.func_defaults,
34 test_method.func_closure)
35 setattr(cls, test_method.__name__, test_method)
36 add_tests = classmethod(add_tests)
38 class Error(Exception):
39 pass
41 class CanonicalScanner:
43 def __init__(self, data):
44 self.data = unicode(data, 'utf-8')+u'\0'
45 self.index = 0
47 def scan(self):
48 #print self.data[self.index:]
49 tokens = []
50 tokens.append(StreamStartToken(None, None))
51 while True:
52 self.find_token()
53 ch = self.data[self.index]
54 if ch == u'\0':
55 tokens.append(StreamEndToken(None, None))
56 break
57 elif ch == u'%':
58 tokens.append(self.scan_directive())
59 elif ch == u'-' and self.data[self.index:self.index+3] == u'---':
60 self.index += 3
61 tokens.append(DocumentStartToken(None, None))
62 elif ch == u'[':
63 self.index += 1
64 tokens.append(FlowSequenceStartToken(None, None))
65 elif ch == u'{':
66 self.index += 1
67 tokens.append(FlowMappingStartToken(None, None))
68 elif ch == u']':
69 self.index += 1
70 tokens.append(FlowSequenceEndToken(None, None))
71 elif ch == u'}':
72 self.index += 1
73 tokens.append(FlowMappingEndToken(None, None))
74 elif ch == u'?':
75 self.index += 1
76 tokens.append(KeyToken(None, None))
77 elif ch == u':':
78 self.index += 1
79 tokens.append(ValueToken(None, None))
80 elif ch == u',':
81 self.index += 1
82 tokens.append(FlowEntryToken(None, None))
83 elif ch == u'*' or ch == u'&':
84 tokens.append(self.scan_alias())
85 elif ch == u'!':
86 tokens.append(self.scan_tag())
87 elif ch == u'"':
88 tokens.append(self.scan_scalar())
89 else:
90 raise Error("invalid token")
91 return tokens
93 DIRECTIVE = u'%YAML 1.1'
95 def scan_directive(self):
96 if self.data[self.index:self.index+len(self.DIRECTIVE)] == self.DIRECTIVE and \
97 self.data[self.index+len(self.DIRECTIVE)] in u' \n\0':
98 self.index += len(self.DIRECTIVE)
99 return DirectiveToken('YAML', (1, 1), None, None)
101 def scan_alias(self):
102 if self.data[self.index] == u'*':
103 TokenClass = AliasToken
104 else:
105 TokenClass = AnchorToken
106 self.index += 1
107 start = self.index
108 while self.data[self.index] not in u', \n\0':
109 self.index += 1
110 value = self.data[start:self.index]
111 return TokenClass(value, None, None)
113 def scan_tag(self):
114 self.index += 1
115 start = self.index
116 while self.data[self.index] not in u' \n\0':
117 self.index += 1
118 value = self.data[start:self.index]
119 if value[0] == u'!':
120 value = 'tag:yaml.org,2002:'+value[1:]
121 elif value[0] == u'<' and value[-1] == u'>':
122 value = value[1:-1]
123 else:
124 value = u'!'+value
125 return TagToken(value, None, None)
127 QUOTE_CODES = {
128 'x': 2,
129 'u': 4,
130 'U': 8,
133 QUOTE_REPLACES = {
134 u'\\': u'\\',
135 u'\"': u'\"',
136 u' ': u' ',
137 u'a': u'\x07',
138 u'b': u'\x08',
139 u'e': u'\x1B',
140 u'f': u'\x0C',
141 u'n': u'\x0A',
142 u'r': u'\x0D',
143 u't': u'\x09',
144 u'v': u'\x0B',
145 u'N': u'\u0085',
146 u'L': u'\u2028',
147 u'P': u'\u2029',
148 u'_': u'_',
149 u'0': u'\x00',
153 def scan_scalar(self):
154 self.index += 1
155 chunks = []
156 start = self.index
157 ignore_spaces = False
158 while self.data[self.index] != u'"':
159 if self.data[self.index] == u'\\':
160 ignore_spaces = False
161 chunks.append(self.data[start:self.index])
162 self.index += 1
163 ch = self.data[self.index]
164 self.index += 1
165 if ch == u'\n':
166 ignore_spaces = True
167 elif ch in self.QUOTE_CODES:
168 length = self.QUOTE_CODES[ch]
169 code = int(self.data[self.index:self.index+length], 16)
170 chunks.append(unichr(code))
171 self.index += length
172 else:
173 chunks.append(self.QUOTE_REPLACES[ch])
174 start = self.index
175 elif self.data[self.index] == u'\n':
176 chunks.append(self.data[start:self.index])
177 chunks.append(u' ')
178 self.index += 1
179 start = self.index
180 ignore_spaces = True
181 elif ignore_spaces and self.data[self.index] == u' ':
182 self.index += 1
183 start = self.index
184 else:
185 ignore_spaces = False
186 self.index += 1
187 chunks.append(self.data[start:self.index])
188 self.index += 1
189 return ScalarToken(u''.join(chunks), False, None, None)
191 def find_token(self):
192 found = False
193 while not found:
194 while self.data[self.index] in u' \t':
195 self.index += 1
196 if self.data[self.index] == u'#':
197 while self.data[self.index] != u'\n':
198 self.index += 1
199 if self.data[self.index] == u'\n':
200 self.index += 1
201 else:
202 found = True
204 class CanonicalParser:
206 def __init__(self, data):
207 self.scanner = CanonicalScanner(data)
208 self.events = []
210 # stream: STREAM-START document* STREAM-END
211 def parse_stream(self):
212 self.consume_token(StreamStartToken)
213 self.events.append(StreamStartEvent(None, None))
214 while not self.test_token(StreamEndToken):
215 if self.test_token(DirectiveToken, DocumentStartToken):
216 self.parse_document()
217 else:
218 raise Error("document is expected, got "+repr(self.tokens[self.index]))
219 self.consume_token(StreamEndToken)
220 self.events.append(StreamEndEvent(None, None))
222 # document: DIRECTIVE? DOCUMENT-START node
223 def parse_document(self):
224 node = None
225 if self.test_token(DirectiveToken):
226 self.consume_token(DirectiveToken)
227 self.consume_token(DocumentStartToken)
228 self.events.append(DocumentStartEvent(None, None))
229 self.parse_node()
230 self.events.append(DocumentEndEvent(None, None))
232 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
233 def parse_node(self):
234 if self.test_token(AliasToken):
235 self.events.append(AliasEvent(self.get_value(), None, None))
236 else:
237 anchor = None
238 if self.test_token(AnchorToken):
239 anchor = self.get_value()
240 tag = u'!'
241 if self.test_token(TagToken):
242 tag = self.get_value()
243 if self.test_token(ScalarToken):
244 self.events.append(ScalarEvent(anchor, tag, self.get_value(), None, None))
245 elif self.test_token(FlowSequenceStartToken):
246 self.events.append(SequenceEvent(anchor, tag, None, None))
247 self.parse_sequence()
248 elif self.test_token(FlowMappingStartToken):
249 self.events.append(MappingEvent(anchor, tag, None, None))
250 self.parse_mapping()
251 else:
252 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self.tokens[self.index]))
254 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
255 def parse_sequence(self):
256 self.consume_token(FlowSequenceStartToken)
257 if not self.test_token(FlowSequenceEndToken):
258 self.parse_node()
259 while not self.test_token(FlowSequenceEndToken):
260 self.consume_token(FlowEntryToken)
261 if not self.test_token(FlowSequenceEndToken):
262 self.parse_node()
263 self.consume_token(FlowSequenceEndToken)
264 self.events.append(CollectionEndEvent(None, None))
266 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
267 def parse_mapping(self):
268 self.consume_token(FlowMappingStartToken)
269 if not self.test_token(FlowMappingEndToken):
270 self.parse_map_entry()
271 while not self.test_token(FlowMappingEndToken):
272 self.consume_token(FlowEntryToken)
273 if not self.test_token(FlowMappingEndToken):
274 self.parse_map_entry()
275 self.consume_token(FlowMappingEndToken)
276 self.events.append(CollectionEndEvent(None, None))
278 # map_entry: KEY node VALUE node
279 def parse_map_entry(self):
280 self.consume_token(KeyToken)
281 self.parse_node()
282 self.consume_token(ValueToken)
283 self.parse_node()
285 def test_token(self, *choices):
286 for choice in choices:
287 if isinstance(self.tokens[self.index], choice):
288 return True
289 return False
291 def consume_token(self, cls):
292 if not isinstance(self.tokens[self.index], cls):
293 raise Error("unexpected token "+repr(self.tokens[self.index]))
294 self.index += 1
296 def get_value(self):
297 value = self.tokens[self.index].value
298 self.index += 1
299 return value
301 def parse(self):
302 self.tokens = self.scanner.scan()
303 self.index = 0
304 self.parse_stream()
305 return self.events
307 def get(self):
308 return self.events.pop(0)
310 def check(self, *choices):
311 for choice in choices:
312 if isinstance(self.events[0], choice):
313 return True
314 return False
316 def peek(self):
317 return self.events[0]