4 class TestAppliance(unittest
.TestCase
):
9 for filename
in os
.listdir(DATA
):
10 if os
.path
.isfile(os
.path
.join(DATA
, filename
)):
11 root
, ext
= os
.path
.splitext(filename
)
12 tests
.setdefault(root
, []).append(ext
)
14 def add_tests(cls
, method_name
, *extensions
):
15 for test
in cls
.tests
:
16 available_extensions
= cls
.tests
[test
]
17 for ext
in extensions
:
18 if ext
not in available_extensions
:
21 filenames
= [os
.path
.join(cls
.DATA
, test
+ext
) for ext
in extensions
]
22 def test_method(self
, test
=test
, filenames
=filenames
):
23 getattr(self
, '_'+method_name
)(test
, *filenames
)
24 test
= test
.replace('-', '_')
25 test_method
.__name
__ = '%s_%s' % (method_name
, test
)
26 setattr(cls
, test_method
.__name
__, test_method
)
27 add_tests
= classmethod(add_tests
)
32 for attribute
in ['anchor', 'tag', 'value']:
33 if hasattr(self
, attribute
):
34 args
.append(repr(getattr(self
, attribute
)))
35 return "%s(%s)" % (self
.__class
__.__name
__, ', '.join(args
))
37 class AliasNode(Node
):
38 def __init__(self
, anchor
):
41 class ScalarNode(Node
):
42 def __init__(self
, anchor
, tag
, value
):
47 class SequenceNode(Node
):
48 def __init__(self
, anchor
, tag
, value
):
53 class MappingNode(Node
):
54 def __init__(self
, anchor
, tag
, value
):
62 if hasattr(self
, 'value'):
63 args
.append(repr(self
.value
))
64 return "%s(%s)" % (self
.__class
__.__name
__, ''.join(args
))
66 class EndToken(Token
):
69 class DirectiveToken(Token
):
72 class DocumentStartToken(Token
):
75 class SequenceStartToken(Token
):
78 class MappingStartToken(Token
):
81 class SequenceEndToken(Token
):
84 class MappingEndToken(Token
):
87 class KeyToken(Token
):
90 class ValueToken(Token
):
93 class EntryToken(Token
):
96 class AliasToken(Token
):
97 def __init__(self
, value
):
100 class AnchorToken(Token
):
101 def __init__(self
, value
):
104 class TagToken(Token
):
105 def __init__(self
, value
):
108 class ScalarToken(Token
):
109 def __init__(self
, value
):
112 class Error(Exception):
115 class CanonicalScanner
:
117 def __init__(self
, source
, data
):
119 self
.data
= unicode(data
, 'utf-8')+u
'\0'
123 #print self.data[self.index:]
127 ch
= self
.data
[self
.index
]
129 tokens
.append(EndToken())
132 tokens
.append(self
.scan_directive())
133 elif ch
== u
'-' and self
.data
[self
.index
:self
.index
+3] == u
'---':
135 tokens
.append(DocumentStartToken())
138 tokens
.append(SequenceStartToken())
141 tokens
.append(MappingStartToken())
144 tokens
.append(SequenceEndToken())
147 tokens
.append(MappingEndToken())
150 tokens
.append(KeyToken())
153 tokens
.append(ValueToken())
156 tokens
.append(EntryToken())
157 elif ch
== u
'*' or ch
== u
'&':
158 tokens
.append(self
.scan_alias())
160 tokens
.append(self
.scan_tag())
162 tokens
.append(self
.scan_scalar())
164 raise Error("invalid token")
167 DIRECTIVE
= u
'%YAML 1.1'
169 def scan_directive(self
):
170 if self
.data
[self
.index
:self
.index
+len(self
.DIRECTIVE
)] == self
.DIRECTIVE
and \
171 self
.data
[self
.index
+len(self
.DIRECTIVE
)] in u
' \n\0':
172 self
.index
+= len(self
.DIRECTIVE
)
173 return DirectiveToken()
175 def scan_alias(self
):
176 if self
.data
[self
.index
] == u
'*':
177 TokenClass
= AliasToken
179 TokenClass
= AnchorToken
182 while self
.data
[self
.index
] not in u
', \n\0':
184 value
= self
.data
[start
:self
.index
]
185 return TokenClass(value
)
190 while self
.data
[self
.index
] not in u
' \n\0':
192 value
= self
.data
[start
:self
.index
]
194 value
= 'tag:yaml.org,2002:'+value
[1:]
197 return TagToken(value
)
225 def scan_scalar(self
):
229 ignore_spaces
= False
230 while self
.data
[self
.index
] != u
'"':
231 if self
.data
[self
.index
] == u
'\\':
232 ignore_spaces
= False
233 chunks
.append(self
.data
[start
:self
.index
])
235 ch
= self
.data
[self
.index
]
239 elif ch
in self
.QUOTE_CODES
:
240 length
= self
.QUOTE_CODES
[ch
]
241 code
= int(self
.data
[self
.index
:self
.index
+length
], 16)
242 chunks
.append(unichr(code
))
245 chunks
.append(self
.QUOTE_REPLACES
[ch
])
247 elif self
.data
[self
.index
] == u
'\n':
251 elif ignore_spaces
and self
.data
[self
.index
] == u
' ':
255 ignore_spaces
= False
257 chunks
.append(self
.data
[start
:self
.index
])
259 return ScalarToken(u
''.join(chunks
))
261 def find_token(self
):
264 while self
.data
[self
.index
] in u
' \t':
266 if self
.data
[self
.index
] == u
'#':
267 while self
.data
[self
.index
] != u
'\n':
269 if self
.data
[self
.index
] == u
'\n':
274 class CanonicalParser
:
276 def __init__(self
, source
, data
):
277 self
.scanner
= CanonicalScanner(source
, data
)
279 # stream: document* END
280 def parse_stream(self
):
282 while not self
.test_token(EndToken
):
283 if self
.test_token(DirectiveToken
, DocumentStartToken
):
284 documents
.append(self
.parse_document())
286 raise Error("document is expected, got "+repr(self
.tokens
[self
.index
]))
289 # document: DIRECTIVE? DOCUMENT-START node?
290 def parse_document(self
):
292 if self
.test_token(DirectiveToken
):
293 self
.consume_token(DirectiveToken
)
294 self
.consume_token(DocumentStartToken
)
295 if self
.test_token(TagToken
, AliasToken
, AnchorToken
, TagToken
,
296 SequenceStartToken
, MappingStartToken
, ScalarToken
):
297 node
= self
.parse_node()
300 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
301 def parse_node(self
):
302 if self
.test_token(AliasToken
):
303 return AliasNode(self
.get_value())
306 if self
.test_token(AnchorToken
):
307 anchor
= self
.get_value()
309 if self
.test_token(TagToken
):
310 tag
= self
.get_value()
311 if self
.test_token(ScalarToken
):
312 return ScalarNode(anchor
, tag
, self
.get_value())
313 elif self
.test_token(SequenceStartToken
):
314 return SequenceNode(anchor
, tag
, self
.parse_sequence())
315 elif self
.test_token(MappingStartToken
):
316 return MappingNode(anchor
, tag
, self
.parse_mapping())
318 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self
.tokens
[self
.index
]))
320 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
321 def parse_sequence(self
):
323 self
.consume_token(SequenceStartToken
)
324 if not self
.test_token(SequenceEndToken
):
325 values
.append(self
.parse_node())
326 while not self
.test_token(SequenceEndToken
):
327 self
.consume_token(EntryToken
)
328 if not self
.test_token(SequenceEndToken
):
329 values
.append(self
.parse_node())
330 self
.consume_token(SequenceEndToken
)
333 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
334 def parse_mapping(self
):
336 self
.consume_token(MappingStartToken
)
337 if not self
.test_token(MappingEndToken
):
338 values
.append(self
.parse_map_entry())
339 while not self
.test_token(MappingEndToken
):
340 self
.consume_token(EntryToken
)
341 if not self
.test_token(MappingEndToken
):
342 values
.append(self
.parse_map_entry())
343 self
.consume_token(MappingEndToken
)
346 # map_entry: KEY node VALUE node
347 def parse_map_entry(self
):
348 self
.consume_token(KeyToken
)
349 key
= self
.parse_node()
350 self
.consume_token(ValueToken
)
351 value
= self
.parse_node()
354 def test_token(self
, *choices
):
355 for choice
in choices
:
356 if isinstance(self
.tokens
[self
.index
], choice
):
360 def consume_token(self
, cls
):
361 if not isinstance(self
.tokens
[self
.index
], cls
):
362 raise Error("unexpected token "+repr(self
.tokens
[self
.index
]))
366 value
= self
.tokens
[self
.index
].value
371 self
.tokens
= self
.scanner
.scan()
373 return self
.parse_stream()