5 from yaml
.composer
import *
6 from yaml
.constructor
import *
7 from yaml
.resolver
import *
9 class TestAppliance(unittest
.TestCase
):
14 for filename
in os
.listdir(DATA
):
15 if os
.path
.isfile(os
.path
.join(DATA
, filename
)):
16 root
, ext
= os
.path
.splitext(filename
)
17 all_tests
.setdefault(root
, []).append(ext
)
19 def add_tests(cls
, method_name
, *extensions
):
20 for test
in cls
.all_tests
:
21 available_extensions
= cls
.all_tests
[test
]
22 for ext
in extensions
:
23 if ext
not in available_extensions
:
26 filenames
= [os
.path
.join(cls
.DATA
, test
+ext
) for ext
in extensions
]
27 def test_method(self
, test
=test
, filenames
=filenames
):
28 getattr(self
, '_'+method_name
)(test
, *filenames
)
29 test
= test
.replace('-', '_').replace('.', '_')
31 test_method
.__name
__ = '%s_%s' % (method_name
, test
)
34 test_method
= new
.function(test_method
.func_code
, test_method
.func_globals
,
35 '%s_%s' % (method_name
, test
), test_method
.func_defaults
,
36 test_method
.func_closure
)
37 setattr(cls
, test_method
.__name
__, test_method
)
38 add_tests
= classmethod(add_tests
)
40 class Error(Exception):
43 class CanonicalScanner
:
45 def __init__(self
, data
):
46 self
.data
= unicode(data
, 'utf-8')+u
'\0'
50 def check_token(self
, *choices
):
54 for choice
in choices
:
55 if isinstance(self
.tokens
[0], choice
):
63 def get_token(self
, choice
=None):
64 token
= self
.tokens
.pop(0)
65 if choice
and not isinstance(token
, choice
):
66 raise Error("unexpected token "+repr(token
))
69 def get_token_value(self
):
70 token
= self
.get_token()
75 self
.tokens
.append(StreamStartToken(None, None))
78 ch
= self
.data
[self
.index
]
80 self
.tokens
.append(StreamEndToken(None, None))
83 self
.tokens
.append(self
.scan_directive())
84 elif ch
== u
'-' and self
.data
[self
.index
:self
.index
+3] == u
'---':
86 self
.tokens
.append(DocumentStartToken(None, None))
89 self
.tokens
.append(FlowSequenceStartToken(None, None))
92 self
.tokens
.append(FlowMappingStartToken(None, None))
95 self
.tokens
.append(FlowSequenceEndToken(None, None))
98 self
.tokens
.append(FlowMappingEndToken(None, None))
101 self
.tokens
.append(KeyToken(None, None))
104 self
.tokens
.append(ValueToken(None, None))
107 self
.tokens
.append(FlowEntryToken(None, None))
108 elif ch
== u
'*' or ch
== u
'&':
109 self
.tokens
.append(self
.scan_alias())
111 self
.tokens
.append(self
.scan_tag())
113 self
.tokens
.append(self
.scan_scalar())
115 raise Error("invalid token")
117 DIRECTIVE
= u
'%YAML 1.1'
119 def scan_directive(self
):
120 if self
.data
[self
.index
:self
.index
+len(self
.DIRECTIVE
)] == self
.DIRECTIVE
and \
121 self
.data
[self
.index
+len(self
.DIRECTIVE
)] in u
' \n\0':
122 self
.index
+= len(self
.DIRECTIVE
)
123 return DirectiveToken('YAML', (1, 1), None, None)
125 def scan_alias(self
):
126 if self
.data
[self
.index
] == u
'*':
127 TokenClass
= AliasToken
129 TokenClass
= AnchorToken
132 while self
.data
[self
.index
] not in u
', \n\0':
134 value
= self
.data
[start
:self
.index
]
135 return TokenClass(value
, None, None)
140 while self
.data
[self
.index
] not in u
' \n\0':
142 value
= self
.data
[start
:self
.index
]
144 value
= 'tag:yaml.org,2002:'+value
[1:]
145 elif value
[0] == u
'<' and value
[-1] == u
'>':
149 return TagToken(value
, None, None)
177 def scan_scalar(self
):
181 ignore_spaces
= False
182 while self
.data
[self
.index
] != u
'"':
183 if self
.data
[self
.index
] == u
'\\':
184 ignore_spaces
= False
185 chunks
.append(self
.data
[start
:self
.index
])
187 ch
= self
.data
[self
.index
]
191 elif ch
in self
.QUOTE_CODES
:
192 length
= self
.QUOTE_CODES
[ch
]
193 code
= int(self
.data
[self
.index
:self
.index
+length
], 16)
194 chunks
.append(unichr(code
))
197 chunks
.append(self
.QUOTE_REPLACES
[ch
])
199 elif self
.data
[self
.index
] == u
'\n':
200 chunks
.append(self
.data
[start
:self
.index
])
205 elif ignore_spaces
and self
.data
[self
.index
] == u
' ':
209 ignore_spaces
= False
211 chunks
.append(self
.data
[start
:self
.index
])
213 return ScalarToken(u
''.join(chunks
), False, None, None)
215 def find_token(self
):
218 while self
.data
[self
.index
] in u
' \t':
220 if self
.data
[self
.index
] == u
'#':
221 while self
.data
[self
.index
] != u
'\n':
223 if self
.data
[self
.index
] == u
'\n':
228 class CanonicalParser
:
234 # stream: STREAM-START document* STREAM-END
235 def parse_stream(self
):
236 self
.get_token(StreamStartToken
)
237 self
.events
.append(StreamStartEvent(None, None))
238 while not self
.check_token(StreamEndToken
):
239 if self
.check_token(DirectiveToken
, DocumentStartToken
):
240 self
.parse_document()
242 raise Error("document is expected, got "+repr(self
.tokens
[self
.index
]))
243 self
.get_token(StreamEndToken
)
244 self
.events
.append(StreamEndEvent(None, None))
246 # document: DIRECTIVE? DOCUMENT-START node
247 def parse_document(self
):
249 if self
.check_token(DirectiveToken
):
250 self
.get_token(DirectiveToken
)
251 self
.get_token(DocumentStartToken
)
252 self
.events
.append(DocumentStartEvent(None, None))
254 self
.events
.append(DocumentEndEvent(None, None))
256 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
257 def parse_node(self
):
258 if self
.check_token(AliasToken
):
259 self
.events
.append(AliasEvent(self
.get_token_value(), None, None))
262 if self
.check_token(AnchorToken
):
263 anchor
= self
.get_token_value()
265 if self
.check_token(TagToken
):
266 tag
= self
.get_token_value()
267 if self
.check_token(ScalarToken
):
268 self
.events
.append(ScalarEvent(anchor
, tag
, (False, False), self
.get_token_value(), None, None))
269 elif self
.check_token(FlowSequenceStartToken
):
270 self
.events
.append(SequenceStartEvent(anchor
, tag
, None, None))
271 self
.parse_sequence()
272 elif self
.check_token(FlowMappingStartToken
):
273 self
.events
.append(MappingStartEvent(anchor
, tag
, None, None))
276 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self
.tokens
[self
.index
]))
278 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
279 def parse_sequence(self
):
280 self
.get_token(FlowSequenceStartToken
)
281 if not self
.check_token(FlowSequenceEndToken
):
283 while not self
.check_token(FlowSequenceEndToken
):
284 self
.get_token(FlowEntryToken
)
285 if not self
.check_token(FlowSequenceEndToken
):
287 self
.get_token(FlowSequenceEndToken
)
288 self
.events
.append(SequenceEndEvent(None, None))
290 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
291 def parse_mapping(self
):
292 self
.get_token(FlowMappingStartToken
)
293 if not self
.check_token(FlowMappingEndToken
):
294 self
.parse_map_entry()
295 while not self
.check_token(FlowMappingEndToken
):
296 self
.get_token(FlowEntryToken
)
297 if not self
.check_token(FlowMappingEndToken
):
298 self
.parse_map_entry()
299 self
.get_token(FlowMappingEndToken
)
300 self
.events
.append(MappingEndEvent(None, None))
302 # map_entry: KEY node VALUE node
303 def parse_map_entry(self
):
304 self
.get_token(KeyToken
)
306 self
.get_token(ValueToken
)
313 return self
.events
.pop(0)
315 def check_event(self
, *choices
):
319 for choice
in choices
:
320 if isinstance(self
.events
[0], choice
):
324 def peek_event(self
):
325 return self
.events
[0]
327 class CanonicalLoader(CanonicalScanner
, CanonicalParser
, Composer
, Constructor
, Resolver
):
329 def __init__(self
, stream
):
330 if hasattr(stream
, 'read'):
331 stream
= stream
.read()
332 CanonicalScanner
.__init
__(self
, stream
)
333 CanonicalParser
.__init
__(self
)
334 Composer
.__init
__(self
)
335 Constructor
.__init
__(self
)
336 Resolver
.__init
__(self
)
338 def canonical_scan(stream
):
339 return scan(stream
, Loader
=CanonicalLoader
)
341 def canonical_parse(stream
):
342 return parse(stream
, Loader
=CanonicalLoader
)
344 def canonical_compose(stream
):
345 return compose(stream
, Loader
=CanonicalLoader
)
347 def canonical_compose_all(stream
):
348 return compose_all(stream
, Loader
=CanonicalLoader
)
350 def canonical_load(stream
):
351 return load(stream
, Loader
=CanonicalLoader
)
353 def canonical_load_all(stream
):
354 return load_all(stream
, Loader
=CanonicalLoader
)