4 from yaml
.tokens
import *
5 from yaml
.events
import *
7 class TestAppliance(unittest
.TestCase
):
12 for filename
in os
.listdir(DATA
):
13 if os
.path
.isfile(os
.path
.join(DATA
, filename
)):
14 root
, ext
= os
.path
.splitext(filename
)
15 all_tests
.setdefault(root
, []).append(ext
)
17 def add_tests(cls
, method_name
, *extensions
):
18 for test
in cls
.all_tests
:
19 available_extensions
= cls
.all_tests
[test
]
20 for ext
in extensions
:
21 if ext
not in available_extensions
:
24 filenames
= [os
.path
.join(cls
.DATA
, test
+ext
) for ext
in extensions
]
25 def test_method(self
, test
=test
, filenames
=filenames
):
26 getattr(self
, '_'+method_name
)(test
, *filenames
)
27 test
= test
.replace('-', '_')
29 test_method
.__name
__ = '%s_%s' % (method_name
, test
)
32 test_method
= new
.function(test_method
.func_code
, test_method
.func_globals
,
33 '%s_%s' % (method_name
, test
), test_method
.func_defaults
,
34 test_method
.func_closure
)
35 setattr(cls
, test_method
.__name
__, test_method
)
36 add_tests
= classmethod(add_tests
)
38 class Error(Exception):
41 class CanonicalScanner
:
43 def __init__(self
, data
):
44 self
.data
= unicode(data
, 'utf-8')+u
'\0'
48 #print self.data[self.index:]
50 tokens
.append(StreamStartToken(None, None))
53 ch
= self
.data
[self
.index
]
55 tokens
.append(StreamEndToken(None, None))
58 tokens
.append(self
.scan_directive())
59 elif ch
== u
'-' and self
.data
[self
.index
:self
.index
+3] == u
'---':
61 tokens
.append(DocumentStartToken(None, None))
64 tokens
.append(FlowSequenceStartToken(None, None))
67 tokens
.append(FlowMappingStartToken(None, None))
70 tokens
.append(FlowSequenceEndToken(None, None))
73 tokens
.append(FlowMappingEndToken(None, None))
76 tokens
.append(KeyToken(None, None))
79 tokens
.append(ValueToken(None, None))
82 tokens
.append(FlowEntryToken(None, None))
83 elif ch
== u
'*' or ch
== u
'&':
84 tokens
.append(self
.scan_alias())
86 tokens
.append(self
.scan_tag())
88 tokens
.append(self
.scan_scalar())
90 raise Error("invalid token")
93 DIRECTIVE
= u
'%YAML 1.1'
95 def scan_directive(self
):
96 if self
.data
[self
.index
:self
.index
+len(self
.DIRECTIVE
)] == self
.DIRECTIVE
and \
97 self
.data
[self
.index
+len(self
.DIRECTIVE
)] in u
' \n\0':
98 self
.index
+= len(self
.DIRECTIVE
)
99 return DirectiveToken('YAML', (1, 1), None, None)
101 def scan_alias(self
):
102 if self
.data
[self
.index
] == u
'*':
103 TokenClass
= AliasToken
105 TokenClass
= AnchorToken
108 while self
.data
[self
.index
] not in u
', \n\0':
110 value
= self
.data
[start
:self
.index
]
111 return TokenClass(value
, None, None)
116 while self
.data
[self
.index
] not in u
' \n\0':
118 value
= self
.data
[start
:self
.index
]
120 value
= 'tag:yaml.org,2002:'+value
[1:]
121 elif value
[0] == u
'<' and value
[-1] == u
'>':
125 return TagToken(value
, None, None)
153 def scan_scalar(self
):
157 ignore_spaces
= False
158 while self
.data
[self
.index
] != u
'"':
159 if self
.data
[self
.index
] == u
'\\':
160 ignore_spaces
= False
161 chunks
.append(self
.data
[start
:self
.index
])
163 ch
= self
.data
[self
.index
]
167 elif ch
in self
.QUOTE_CODES
:
168 length
= self
.QUOTE_CODES
[ch
]
169 code
= int(self
.data
[self
.index
:self
.index
+length
], 16)
170 chunks
.append(unichr(code
))
173 chunks
.append(self
.QUOTE_REPLACES
[ch
])
175 elif self
.data
[self
.index
] == u
'\n':
176 chunks
.append(self
.data
[start
:self
.index
])
181 elif ignore_spaces
and self
.data
[self
.index
] == u
' ':
185 ignore_spaces
= False
187 chunks
.append(self
.data
[start
:self
.index
])
189 return ScalarToken(u
''.join(chunks
), False, None, None)
191 def find_token(self
):
194 while self
.data
[self
.index
] in u
' \t':
196 if self
.data
[self
.index
] == u
'#':
197 while self
.data
[self
.index
] != u
'\n':
199 if self
.data
[self
.index
] == u
'\n':
204 class CanonicalParser
:
206 def __init__(self
, data
):
207 self
.scanner
= CanonicalScanner(data
)
210 # stream: STREAM-START document* STREAM-END
211 def parse_stream(self
):
212 self
.consume_token(StreamStartToken
)
213 self
.events
.append(StreamStartEvent(None, None))
214 while not self
.test_token(StreamEndToken
):
215 if self
.test_token(DirectiveToken
, DocumentStartToken
):
216 self
.parse_document()
218 raise Error("document is expected, got "+repr(self
.tokens
[self
.index
]))
219 self
.consume_token(StreamEndToken
)
220 self
.events
.append(StreamEndEvent(None, None))
222 # document: DIRECTIVE? DOCUMENT-START node
223 def parse_document(self
):
225 if self
.test_token(DirectiveToken
):
226 self
.consume_token(DirectiveToken
)
227 self
.consume_token(DocumentStartToken
)
228 self
.events
.append(DocumentStartEvent(None, None))
230 self
.events
.append(DocumentEndEvent(None, None))
232 # node: ALIAS | ANCHOR? TAG? (SCALAR|sequence|mapping)
233 def parse_node(self
):
234 if self
.test_token(AliasToken
):
235 self
.events
.append(AliasEvent(self
.get_value(), None, None))
238 if self
.test_token(AnchorToken
):
239 anchor
= self
.get_value()
241 if self
.test_token(TagToken
):
242 tag
= self
.get_value()
243 if self
.test_token(ScalarToken
):
244 self
.events
.append(ScalarEvent(anchor
, tag
, self
.get_value(), None, None))
245 elif self
.test_token(FlowSequenceStartToken
):
246 self
.events
.append(SequenceEvent(anchor
, tag
, None, None))
247 self
.parse_sequence()
248 elif self
.test_token(FlowMappingStartToken
):
249 self
.events
.append(MappingEvent(anchor
, tag
, None, None))
252 raise Error("SCALAR, '[', or '{' is expected, got "+repr(self
.tokens
[self
.index
]))
254 # sequence: SEQUENCE-START (node (ENTRY node)*)? ENTRY? SEQUENCE-END
255 def parse_sequence(self
):
256 self
.consume_token(FlowSequenceStartToken
)
257 if not self
.test_token(FlowSequenceEndToken
):
259 while not self
.test_token(FlowSequenceEndToken
):
260 self
.consume_token(FlowEntryToken
)
261 if not self
.test_token(FlowSequenceEndToken
):
263 self
.consume_token(FlowSequenceEndToken
)
264 self
.events
.append(CollectionEndEvent(None, None))
266 # mapping: MAPPING-START (map_entry (ENTRY map_entry)*)? ENTRY? MAPPING-END
267 def parse_mapping(self
):
268 self
.consume_token(FlowMappingStartToken
)
269 if not self
.test_token(FlowMappingEndToken
):
270 self
.parse_map_entry()
271 while not self
.test_token(FlowMappingEndToken
):
272 self
.consume_token(FlowEntryToken
)
273 if not self
.test_token(FlowMappingEndToken
):
274 self
.parse_map_entry()
275 self
.consume_token(FlowMappingEndToken
)
276 self
.events
.append(CollectionEndEvent(None, None))
278 # map_entry: KEY node VALUE node
279 def parse_map_entry(self
):
280 self
.consume_token(KeyToken
)
282 self
.consume_token(ValueToken
)
285 def test_token(self
, *choices
):
286 for choice
in choices
:
287 if isinstance(self
.tokens
[self
.index
], choice
):
291 def consume_token(self
, cls
):
292 if not isinstance(self
.tokens
[self
.index
], cls
):
293 raise Error("unexpected token "+repr(self
.tokens
[self
.index
]))
297 value
= self
.tokens
[self
.index
].value
302 self
.tokens
= self
.scanner
.scan()
308 return self
.events
.pop(0)
310 def check(self
, *choices
):
311 for choice
in choices
:
312 if isinstance(self
.events
[0], choice
):
317 return self
.events
[0]