wafsamba: use additional xml catalog file (bug #9512)
[Samba/gebeck_regimport.git] / lib / testtools / testtools / content.py
blob8bd4a228ed9af0d0802005b2908af49dd2ad8046
1 # Copyright (c) 2009-2012 testtools developers. See LICENSE for details.
3 """Content - a MIME-like Content object."""
5 __all__ = [
6 'attach_file',
7 'Content',
8 'content_from_file',
9 'content_from_stream',
10 'text_content',
11 'TracebackContent',
14 import codecs
15 import json
16 import os
17 import sys
18 import traceback
20 from testtools import try_import
21 from testtools.compat import _b, _format_exc_info, str_is_unicode, _u
22 from testtools.content_type import ContentType, JSON, UTF8_TEXT
25 functools = try_import('functools')
27 _join_b = _b("").join
30 DEFAULT_CHUNK_SIZE = 4096
32 STDOUT_LINE = '\nStdout:\n%s'
33 STDERR_LINE = '\nStderr:\n%s'
36 def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
37 """Read 'stream' in chunks of 'chunk_size'.
39 :param stream: A file-like object to read from.
40 :param chunk_size: The size of each read from 'stream'.
41 :param seek_offset: If non-None, seek before iterating.
42 :param seek_whence: Pass through to the seek call, if seeking.
43 """
44 if seek_offset is not None:
45 stream.seek(seek_offset, seek_whence)
46 chunk = stream.read(chunk_size)
47 while chunk:
48 yield chunk
49 chunk = stream.read(chunk_size)
52 class Content(object):
53 """A MIME-like Content object.
55 Content objects can be serialised to bytes using the iter_bytes method.
56 If the Content-Type is recognised by other code, they are welcome to
57 look for richer contents that mere byte serialisation - for example in
58 memory object graphs etc. However, such code MUST be prepared to receive
59 a generic Content object that has been reconstructed from a byte stream.
61 :ivar content_type: The content type of this Content.
62 """
64 def __init__(self, content_type, get_bytes):
65 """Create a ContentType."""
66 if None in (content_type, get_bytes):
67 raise ValueError("None not permitted in %r, %r" % (
68 content_type, get_bytes))
69 self.content_type = content_type
70 self._get_bytes = get_bytes
72 def __eq__(self, other):
73 return (self.content_type == other.content_type and
74 _join_b(self.iter_bytes()) == _join_b(other.iter_bytes()))
76 def as_text(self):
77 """Return all of the content as text.
79 This is only valid where ``iter_text`` is. It will load all of the
80 content into memory. Where this is a concern, use ``iter_text``
81 instead.
82 """
83 return _u('').join(self.iter_text())
85 def iter_bytes(self):
86 """Iterate over bytestrings of the serialised content."""
87 return self._get_bytes()
89 def iter_text(self):
90 """Iterate over the text of the serialised content.
92 This is only valid for text MIME types, and will use ISO-8859-1 if
93 no charset parameter is present in the MIME type. (This is somewhat
94 arbitrary, but consistent with RFC2617 3.7.1).
96 :raises ValueError: If the content type is not text/\*.
97 """
98 if self.content_type.type != "text":
99 raise ValueError("Not a text type %r" % self.content_type)
100 return self._iter_text()
102 def _iter_text(self):
103 """Worker for iter_text - does the decoding."""
104 encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
105 try:
106 # 2.5+
107 decoder = codecs.getincrementaldecoder(encoding)()
108 for bytes in self.iter_bytes():
109 yield decoder.decode(bytes)
110 final = decoder.decode(_b(''), True)
111 if final:
112 yield final
113 except AttributeError:
114 # < 2.5
115 bytes = ''.join(self.iter_bytes())
116 yield bytes.decode(encoding)
118 def __repr__(self):
119 return "<Content type=%r, value=%r>" % (
120 self.content_type, _join_b(self.iter_bytes()))
123 class TracebackContent(Content):
124 """Content object for tracebacks.
126 This adapts an exc_info tuple to the Content interface.
127 text/x-traceback;language=python is used for the mime type, in order to
128 provide room for other languages to format their tracebacks differently.
131 # Whether or not to hide layers of the stack trace that are
132 # unittest/testtools internal code. Defaults to True since the
133 # system-under-test is rarely unittest or testtools.
134 HIDE_INTERNAL_STACK = True
136 def __init__(self, err, test):
137 """Create a TracebackContent for err."""
138 if err is None:
139 raise ValueError("err may not be None")
140 content_type = ContentType('text', 'x-traceback',
141 {"language": "python", "charset": "utf8"})
142 value = self._exc_info_to_unicode(err, test)
143 super(TracebackContent, self).__init__(
144 content_type, lambda: [value.encode("utf8")])
146 def _exc_info_to_unicode(self, err, test):
147 """Converts a sys.exc_info()-style tuple of values into a string.
149 Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
151 exctype, value, tb = err
152 # Skip test runner traceback levels
153 if self.HIDE_INTERNAL_STACK:
154 while tb and self._is_relevant_tb_level(tb):
155 tb = tb.tb_next
157 # testtools customization. When str is unicode (e.g. IronPython,
158 # Python 3), traceback.format_exception returns unicode. For Python 2,
159 # it returns bytes. We need to guarantee unicode.
160 if str_is_unicode:
161 format_exception = traceback.format_exception
162 else:
163 format_exception = _format_exc_info
165 if (self.HIDE_INTERNAL_STACK and test.failureException
166 and isinstance(value, test.failureException)):
167 # Skip assert*() traceback levels
168 length = self._count_relevant_tb_levels(tb)
169 msgLines = format_exception(exctype, value, tb, length)
170 else:
171 msgLines = format_exception(exctype, value, tb)
173 if getattr(self, 'buffer', None):
174 output = sys.stdout.getvalue()
175 error = sys.stderr.getvalue()
176 if output:
177 if not output.endswith('\n'):
178 output += '\n'
179 msgLines.append(STDOUT_LINE % output)
180 if error:
181 if not error.endswith('\n'):
182 error += '\n'
183 msgLines.append(STDERR_LINE % error)
184 return ''.join(msgLines)
186 def _is_relevant_tb_level(self, tb):
187 return '__unittest' in tb.tb_frame.f_globals
189 def _count_relevant_tb_levels(self, tb):
190 length = 0
191 while tb and not self._is_relevant_tb_level(tb):
192 length += 1
193 tb = tb.tb_next
194 return length
197 def json_content(json_data):
198 """Create a JSON `Content` object from JSON-encodeable data."""
199 data = json.dumps(json_data)
200 if str_is_unicode:
201 # The json module perversely returns native str not bytes
202 data = data.encode('utf8')
203 return Content(JSON, lambda: [data])
206 def text_content(text):
207 """Create a `Content` object from some text.
209 This is useful for adding details which are short strings.
211 return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
214 def maybe_wrap(wrapper, func):
215 """Merge metadata for func into wrapper if functools is present."""
216 if functools is not None:
217 wrapper = functools.update_wrapper(wrapper, func)
218 return wrapper
221 def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
222 buffer_now=False, seek_offset=None, seek_whence=0):
223 """Create a `Content` object from a file on disk.
225 Note that unless 'read_now' is explicitly passed in as True, the file
226 will only be read from when ``iter_bytes`` is called.
228 :param path: The path to the file to be used as content.
229 :param content_type: The type of content. If not specified, defaults
230 to UTF8-encoded text/plain.
231 :param chunk_size: The size of chunks to read from the file.
232 Defaults to ``DEFAULT_CHUNK_SIZE``.
233 :param buffer_now: If True, read the file from disk now and keep it in
234 memory. Otherwise, only read when the content is serialized.
235 :param seek_offset: If non-None, seek within the stream before reading it.
236 :param seek_whence: If supplied, pass to stream.seek() when seeking.
238 if content_type is None:
239 content_type = UTF8_TEXT
240 def reader():
241 # This should be try:finally:, but python2.4 makes that hard. When
242 # We drop older python support we can make this use a context manager
243 # for maximum simplicity.
244 stream = open(path, 'rb')
245 for chunk in _iter_chunks(stream, chunk_size, seek_offset, seek_whence):
246 yield chunk
247 stream.close()
248 return content_from_reader(reader, content_type, buffer_now)
251 def content_from_stream(stream, content_type=None,
252 chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False,
253 seek_offset=None, seek_whence=0):
254 """Create a `Content` object from a file-like stream.
256 Note that the stream will only be read from when ``iter_bytes`` is
257 called.
259 :param stream: A file-like object to read the content from. The stream
260 is not closed by this function or the content object it returns.
261 :param content_type: The type of content. If not specified, defaults
262 to UTF8-encoded text/plain.
263 :param chunk_size: The size of chunks to read from the file.
264 Defaults to ``DEFAULT_CHUNK_SIZE``.
265 :param buffer_now: If True, reads from the stream right now. Otherwise,
266 only reads when the content is serialized. Defaults to False.
267 :param seek_offset: If non-None, seek within the stream before reading it.
268 :param seek_whence: If supplied, pass to stream.seek() when seeking.
270 if content_type is None:
271 content_type = UTF8_TEXT
272 reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence)
273 return content_from_reader(reader, content_type, buffer_now)
276 def content_from_reader(reader, content_type, buffer_now):
277 """Create a Content object that will obtain the content from reader.
279 :param reader: A callback to read the content. Should return an iterable of
280 bytestrings.
281 :param content_type: The content type to create.
282 :param buffer_now: If True the reader is evaluated immediately and
283 buffered.
285 if content_type is None:
286 content_type = UTF8_TEXT
287 if buffer_now:
288 contents = list(reader())
289 reader = lambda: contents
290 return Content(content_type, reader)
293 def attach_file(detailed, path, name=None, content_type=None,
294 chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True):
295 """Attach a file to this test as a detail.
297 This is a convenience method wrapping around ``addDetail``.
299 Note that unless 'read_now' is explicitly passed in as True, the file
300 *must* exist when the test result is called with the results of this
301 test, after the test has been torn down.
303 :param detailed: An object with details
304 :param path: The path to the file to attach.
305 :param name: The name to give to the detail for the attached file.
306 :param content_type: The content type of the file. If not provided,
307 defaults to UTF8-encoded text/plain.
308 :param chunk_size: The size of chunks to read from the file. Defaults
309 to something sensible.
310 :param buffer_now: If False the file content is read when the content
311 object is evaluated rather than when attach_file is called.
312 Note that this may be after any cleanups that obj_with_details has, so
313 if the file is a temporary file disabling buffer_now may cause the file
314 to be read after it is deleted. To handle those cases, using
315 attach_file as a cleanup is recommended because it guarantees a
316 sequence for when the attach_file call is made::
318 detailed.addCleanup(attach_file, 'foo.txt', detailed)
320 if name is None:
321 name = os.path.basename(path)
322 content_object = content_from_file(
323 path, content_type, chunk_size, buffer_now)
324 detailed.addDetail(name, content_object)