1 # Copyright (c) 2009-2012 testtools developers. See LICENSE for details.
3 """Content - a MIME-like Content object."""
20 from testtools
import try_import
21 from testtools
.compat
import _b
, _format_exc_info
, str_is_unicode
, _u
22 from testtools
.content_type
import ContentType
, JSON
, UTF8_TEXT
25 functools
= try_import('functools')
30 DEFAULT_CHUNK_SIZE
= 4096
32 STDOUT_LINE
= '\nStdout:\n%s'
33 STDERR_LINE
= '\nStderr:\n%s'
36 def _iter_chunks(stream
, chunk_size
):
37 """Read 'stream' in chunks of 'chunk_size'.
39 :param stream: A file-like object to read from.
40 :param chunk_size: The size of each read from 'stream'.
42 chunk
= stream
.read(chunk_size
)
45 chunk
= stream
.read(chunk_size
)
48 class Content(object):
49 """A MIME-like Content object.
51 Content objects can be serialised to bytes using the iter_bytes method.
52 If the Content-Type is recognised by other code, they are welcome to
53 look for richer contents that mere byte serialisation - for example in
54 memory object graphs etc. However, such code MUST be prepared to receive
55 a generic Content object that has been reconstructed from a byte stream.
57 :ivar content_type: The content type of this Content.
60 def __init__(self
, content_type
, get_bytes
):
61 """Create a ContentType."""
62 if None in (content_type
, get_bytes
):
63 raise ValueError("None not permitted in %r, %r" % (
64 content_type
, get_bytes
))
65 self
.content_type
= content_type
66 self
._get
_bytes
= get_bytes
68 def __eq__(self
, other
):
69 return (self
.content_type
== other
.content_type
and
70 _join_b(self
.iter_bytes()) == _join_b(other
.iter_bytes()))
73 """Return all of the content as text.
75 This is only valid where ``iter_text`` is. It will load all of the
76 content into memory. Where this is a concern, use ``iter_text``
79 return _u('').join(self
.iter_text())
82 """Iterate over bytestrings of the serialised content."""
83 return self
._get
_bytes
()
86 """Iterate over the text of the serialised content.
88 This is only valid for text MIME types, and will use ISO-8859-1 if
89 no charset parameter is present in the MIME type. (This is somewhat
90 arbitrary, but consistent with RFC2617 3.7.1).
92 :raises ValueError: If the content type is not text/\*.
94 if self
.content_type
.type != "text":
95 raise ValueError("Not a text type %r" % self
.content_type
)
96 return self
._iter
_text
()
99 """Worker for iter_text - does the decoding."""
100 encoding
= self
.content_type
.parameters
.get('charset', 'ISO-8859-1')
103 decoder
= codecs
.getincrementaldecoder(encoding
)()
104 for bytes
in self
.iter_bytes():
105 yield decoder
.decode(bytes
)
106 final
= decoder
.decode(_b(''), True)
109 except AttributeError:
111 bytes
= ''.join(self
.iter_bytes())
112 yield bytes
.decode(encoding
)
115 return "<Content type=%r, value=%r>" % (
116 self
.content_type
, _join_b(self
.iter_bytes()))
119 class TracebackContent(Content
):
120 """Content object for tracebacks.
122 This adapts an exc_info tuple to the Content interface.
123 text/x-traceback;language=python is used for the mime type, in order to
124 provide room for other languages to format their tracebacks differently.
127 # Whether or not to hide layers of the stack trace that are
128 # unittest/testtools internal code. Defaults to True since the
129 # system-under-test is rarely unittest or testtools.
130 HIDE_INTERNAL_STACK
= True
132 def __init__(self
, err
, test
):
133 """Create a TracebackContent for err."""
135 raise ValueError("err may not be None")
136 content_type
= ContentType('text', 'x-traceback',
137 {"language": "python", "charset": "utf8"})
138 value
= self
._exc
_info
_to
_unicode
(err
, test
)
139 super(TracebackContent
, self
).__init
__(
140 content_type
, lambda: [value
.encode("utf8")])
142 def _exc_info_to_unicode(self
, err
, test
):
143 """Converts a sys.exc_info()-style tuple of values into a string.
145 Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
147 exctype
, value
, tb
= err
148 # Skip test runner traceback levels
149 if self
.HIDE_INTERNAL_STACK
:
150 while tb
and self
._is
_relevant
_tb
_level
(tb
):
153 # testtools customization. When str is unicode (e.g. IronPython,
154 # Python 3), traceback.format_exception returns unicode. For Python 2,
155 # it returns bytes. We need to guarantee unicode.
157 format_exception
= traceback
.format_exception
159 format_exception
= _format_exc_info
161 if (self
.HIDE_INTERNAL_STACK
and test
.failureException
162 and isinstance(value
, test
.failureException
)):
163 # Skip assert*() traceback levels
164 length
= self
._count
_relevant
_tb
_levels
(tb
)
165 msgLines
= format_exception(exctype
, value
, tb
, length
)
167 msgLines
= format_exception(exctype
, value
, tb
)
169 if getattr(self
, 'buffer', None):
170 output
= sys
.stdout
.getvalue()
171 error
= sys
.stderr
.getvalue()
173 if not output
.endswith('\n'):
175 msgLines
.append(STDOUT_LINE
% output
)
177 if not error
.endswith('\n'):
179 msgLines
.append(STDERR_LINE
% error
)
180 return ''.join(msgLines
)
182 def _is_relevant_tb_level(self
, tb
):
183 return '__unittest' in tb
.tb_frame
.f_globals
185 def _count_relevant_tb_levels(self
, tb
):
187 while tb
and not self
._is
_relevant
_tb
_level
(tb
):
193 def json_content(json_data
):
194 """Create a JSON `Content` object from JSON-encodeable data."""
195 data
= json
.dumps(json_data
)
197 # The json module perversely returns native str not bytes
198 data
= data
.encode('utf8')
199 return Content(JSON
, lambda: [data
])
202 def text_content(text
):
203 """Create a `Content` object from some text.
205 This is useful for adding details which are short strings.
207 return Content(UTF8_TEXT
, lambda: [text
.encode('utf8')])
210 def maybe_wrap(wrapper
, func
):
211 """Merge metadata for func into wrapper if functools is present."""
212 if functools
is not None:
213 wrapper
= functools
.update_wrapper(wrapper
, func
)
217 def content_from_file(path
, content_type
=None, chunk_size
=DEFAULT_CHUNK_SIZE
,
219 """Create a `Content` object from a file on disk.
221 Note that unless 'read_now' is explicitly passed in as True, the file
222 will only be read from when ``iter_bytes`` is called.
224 :param path: The path to the file to be used as content.
225 :param content_type: The type of content. If not specified, defaults
226 to UTF8-encoded text/plain.
227 :param chunk_size: The size of chunks to read from the file.
228 Defaults to ``DEFAULT_CHUNK_SIZE``.
229 :param buffer_now: If True, read the file from disk now and keep it in
230 memory. Otherwise, only read when the content is serialized.
232 if content_type
is None:
233 content_type
= UTF8_TEXT
235 # This should be try:finally:, but python2.4 makes that hard. When
236 # We drop older python support we can make this use a context manager
237 # for maximum simplicity.
238 stream
= open(path
, 'rb')
239 for chunk
in _iter_chunks(stream
, chunk_size
):
242 return content_from_reader(reader
, content_type
, buffer_now
)
245 def content_from_stream(stream
, content_type
=None,
246 chunk_size
=DEFAULT_CHUNK_SIZE
, buffer_now
=False):
247 """Create a `Content` object from a file-like stream.
249 Note that the stream will only be read from when ``iter_bytes`` is
252 :param stream: A file-like object to read the content from. The stream
253 is not closed by this function or the content object it returns.
254 :param content_type: The type of content. If not specified, defaults
255 to UTF8-encoded text/plain.
256 :param chunk_size: The size of chunks to read from the file.
257 Defaults to ``DEFAULT_CHUNK_SIZE``.
258 :param buffer_now: If True, reads from the stream right now. Otherwise,
259 only reads when the content is serialized. Defaults to False.
261 if content_type
is None:
262 content_type
= UTF8_TEXT
263 reader
= lambda: _iter_chunks(stream
, chunk_size
)
264 return content_from_reader(reader
, content_type
, buffer_now
)
267 def content_from_reader(reader
, content_type
, buffer_now
):
268 """Create a Content object that will obtain the content from reader.
270 :param reader: A callback to read the content. Should return an iterable of
272 :param content_type: The content type to create.
273 :param buffer_now: If True the reader is evaluated immediately and
276 if content_type
is None:
277 content_type
= UTF8_TEXT
279 contents
= list(reader())
280 reader
= lambda: contents
281 return Content(content_type
, reader
)
284 def attach_file(detailed
, path
, name
=None, content_type
=None,
285 chunk_size
=DEFAULT_CHUNK_SIZE
, buffer_now
=True):
286 """Attach a file to this test as a detail.
288 This is a convenience method wrapping around ``addDetail``.
290 Note that unless 'read_now' is explicitly passed in as True, the file
291 *must* exist when the test result is called with the results of this
292 test, after the test has been torn down.
294 :param detailed: An object with details
295 :param path: The path to the file to attach.
296 :param name: The name to give to the detail for the attached file.
297 :param content_type: The content type of the file. If not provided,
298 defaults to UTF8-encoded text/plain.
299 :param chunk_size: The size of chunks to read from the file. Defaults
300 to something sensible.
301 :param buffer_now: If False the file content is read when the content
302 object is evaluated rather than when attach_file is called.
303 Note that this may be after any cleanups that obj_with_details has, so
304 if the file is a temporary file disabling buffer_now may cause the file
305 to be read after it is deleted. To handle those cases, using
306 attach_file as a cleanup is recommended because it guarantees a
307 sequence for when the attach_file call is made::
309 detailed.addCleanup(attach_file, 'foo.txt', detailed)
312 name
= os
.path
.basename(path
)
313 content_object
= content_from_file(
314 path
, content_type
, chunk_size
, buffer_now
)
315 detailed
.addDetail(name
, content_object
)