1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__
import print_function
23 from __future__
import unicode_literals
36 from itertools
import chain
, cycle
, count
37 from collections
import deque
38 from test
import test_support
as support
41 import io
# C implementation of io
42 import _pyio
as pyio
# Python implementation of io
45 bytes
= support
.py3k_bytes
47 def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io
.open(__file__
, "r", encoding
="latin1") as f
:
55 def __init__(self
, read_stack
=()):
56 self
._read
_stack
= list(read_stack
)
57 self
._write
_stack
= []
60 def read(self
, n
=None):
63 return self
._read
_stack
.pop(0)
68 self
._write
_stack
.append(bytes(b
))
83 def seek(self
, pos
, whence
):
84 return 0 # wrong but we gotta return something
87 return 0 # same comment as above
89 def readinto(self
, buf
):
93 data
= self
._read
_stack
[0]
97 del self
._read
_stack
[0]
100 if len(data
) <= max_len
:
101 del self
._read
_stack
[0]
105 buf
[:] = data
[:max_len
]
106 self
._read
_stack
[0] = data
[max_len
:]
109 def truncate(self
, pos
=None):
112 class CMockRawIO(MockRawIO
, io
.RawIOBase
):
115 class PyMockRawIO(MockRawIO
, pyio
.RawIOBase
):
119 class MisbehavedRawIO(MockRawIO
):
121 return MockRawIO
.write(self
, b
) * 2
123 def read(self
, n
=None):
124 return MockRawIO
.read(self
, n
) * 2
126 def seek(self
, pos
, whence
):
132 def readinto(self
, buf
):
133 MockRawIO
.readinto(self
, buf
)
136 class CMisbehavedRawIO(MisbehavedRawIO
, io
.RawIOBase
):
139 class PyMisbehavedRawIO(MisbehavedRawIO
, pyio
.RawIOBase
):
143 class CloseFailureIO(MockRawIO
):
151 class CCloseFailureIO(CloseFailureIO
, io
.RawIOBase
):
154 class PyCloseFailureIO(CloseFailureIO
, pyio
.RawIOBase
):
160 def __init__(self
, data
):
161 self
.read_history
= []
162 super(MockFileIO
, self
).__init
__(data
)
164 def read(self
, n
=None):
165 res
= super(MockFileIO
, self
).read(n
)
166 self
.read_history
.append(None if res
is None else len(res
))
169 def readinto(self
, b
):
170 res
= super(MockFileIO
, self
).readinto(b
)
171 self
.read_history
.append(res
)
174 class CMockFileIO(MockFileIO
, io
.BytesIO
):
177 class PyMockFileIO(MockFileIO
, pyio
.BytesIO
):
181 class MockNonBlockWriterIO
:
184 self
._write
_stack
= []
185 self
._blocker
_char
= None
187 def pop_written(self
):
188 s
= b
"".join(self
._write
_stack
)
189 self
._write
_stack
[:] = []
192 def block_on(self
, char
):
193 """Block when a given char is encountered."""
194 self
._blocker
_char
= char
208 if self
._blocker
_char
:
210 n
= b
.index(self
._blocker
_char
)
214 self
._blocker
_char
= None
215 self
._write
_stack
.append(b
[:n
])
216 raise self
.BlockingIOError(0, "test blocking", n
)
217 self
._write
_stack
.append(b
)
220 class CMockNonBlockWriterIO(MockNonBlockWriterIO
, io
.RawIOBase
):
221 BlockingIOError
= io
.BlockingIOError
223 class PyMockNonBlockWriterIO(MockNonBlockWriterIO
, pyio
.RawIOBase
):
224 BlockingIOError
= pyio
.BlockingIOError
227 class IOTest(unittest
.TestCase
):
230 support
.unlink(support
.TESTFN
)
233 support
.unlink(support
.TESTFN
)
235 def write_ops(self
, f
):
236 self
.assertEqual(f
.write(b
"blah."), 5)
237 self
.assertEqual(f
.seek(0), 0)
238 self
.assertEqual(f
.write(b
"Hello."), 6)
239 self
.assertEqual(f
.tell(), 6)
240 self
.assertEqual(f
.seek(-1, 1), 5)
241 self
.assertEqual(f
.tell(), 5)
242 self
.assertEqual(f
.write(bytearray(b
" world\n\n\n")), 9)
243 self
.assertEqual(f
.seek(0), 0)
244 self
.assertEqual(f
.write(b
"h"), 1)
245 self
.assertEqual(f
.seek(-1, 2), 13)
246 self
.assertEqual(f
.tell(), 13)
247 self
.assertEqual(f
.truncate(12), 12)
248 self
.assertEqual(f
.tell(), 12)
249 self
.assertRaises(TypeError, f
.seek
, 0.0)
251 def read_ops(self
, f
, buffered
=False):
253 self
.assertEqual(data
, b
"hello")
254 data
= bytearray(data
)
255 self
.assertEqual(f
.readinto(data
), 5)
256 self
.assertEqual(data
, b
" worl")
257 self
.assertEqual(f
.readinto(data
), 2)
258 self
.assertEqual(len(data
), 5)
259 self
.assertEqual(data
[:2], b
"d\n")
260 self
.assertEqual(f
.seek(0), 0)
261 self
.assertEqual(f
.read(20), b
"hello world\n")
262 self
.assertEqual(f
.read(1), b
"")
263 self
.assertEqual(f
.readinto(bytearray(b
"x")), 0)
264 self
.assertEqual(f
.seek(-6, 2), 6)
265 self
.assertEqual(f
.read(5), b
"world")
266 self
.assertEqual(f
.read(0), b
"")
267 self
.assertEqual(f
.readinto(bytearray()), 0)
268 self
.assertEqual(f
.seek(-6, 1), 5)
269 self
.assertEqual(f
.read(5), b
" worl")
270 self
.assertEqual(f
.tell(), 10)
271 self
.assertRaises(TypeError, f
.seek
, 0.0)
274 self
.assertEqual(f
.read(), b
"hello world\n")
276 self
.assertEqual(f
.read(), b
"world\n")
277 self
.assertEqual(f
.read(), b
"")
281 def large_file_ops(self
, f
):
284 self
.assertEqual(f
.seek(self
.LARGE
), self
.LARGE
)
285 self
.assertEqual(f
.tell(), self
.LARGE
)
286 self
.assertEqual(f
.write(b
"xxx"), 3)
287 self
.assertEqual(f
.tell(), self
.LARGE
+ 3)
288 self
.assertEqual(f
.seek(-1, 1), self
.LARGE
+ 2)
289 self
.assertEqual(f
.truncate(), self
.LARGE
+ 2)
290 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
291 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 2)
292 self
.assertEqual(f
.truncate(self
.LARGE
+ 1), self
.LARGE
+ 1)
293 self
.assertEqual(f
.tell(), self
.LARGE
+ 1)
294 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 1)
295 self
.assertEqual(f
.seek(-1, 2), self
.LARGE
)
296 self
.assertEqual(f
.read(2), b
"x")
298 def test_invalid_operations(self
):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode
in ("w", "wb"):
301 with self
.open(support
.TESTFN
, mode
) as fp
:
302 self
.assertRaises(IOError, fp
.read
)
303 self
.assertRaises(IOError, fp
.readline
)
304 with self
.open(support
.TESTFN
, "rb") as fp
:
305 self
.assertRaises(IOError, fp
.write
, b
"blah")
306 self
.assertRaises(IOError, fp
.writelines
, [b
"blah\n"])
307 with self
.open(support
.TESTFN
, "r") as fp
:
308 self
.assertRaises(IOError, fp
.write
, "blah")
309 self
.assertRaises(IOError, fp
.writelines
, ["blah\n"])
311 def test_raw_file_io(self
):
312 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
313 self
.assertEqual(f
.readable(), False)
314 self
.assertEqual(f
.writable(), True)
315 self
.assertEqual(f
.seekable(), True)
317 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
318 self
.assertEqual(f
.readable(), True)
319 self
.assertEqual(f
.writable(), False)
320 self
.assertEqual(f
.seekable(), True)
323 def test_buffered_file_io(self
):
324 with self
.open(support
.TESTFN
, "wb") as f
:
325 self
.assertEqual(f
.readable(), False)
326 self
.assertEqual(f
.writable(), True)
327 self
.assertEqual(f
.seekable(), True)
329 with self
.open(support
.TESTFN
, "rb") as f
:
330 self
.assertEqual(f
.readable(), True)
331 self
.assertEqual(f
.writable(), False)
332 self
.assertEqual(f
.seekable(), True)
333 self
.read_ops(f
, True)
335 def test_readline(self
):
336 with self
.open(support
.TESTFN
, "wb") as f
:
337 f
.write(b
"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self
.open(support
.TESTFN
, "rb") as f
:
339 self
.assertEqual(f
.readline(), b
"abc\n")
340 self
.assertEqual(f
.readline(10), b
"def\n")
341 self
.assertEqual(f
.readline(2), b
"xy")
342 self
.assertEqual(f
.readline(4), b
"zzy\n")
343 self
.assertEqual(f
.readline(), b
"foo\x00bar\n")
344 self
.assertEqual(f
.readline(None), b
"another line")
345 self
.assertRaises(TypeError, f
.readline
, 5.3)
346 with self
.open(support
.TESTFN
, "r") as f
:
347 self
.assertRaises(TypeError, f
.readline
, 5.3)
349 def test_raw_bytes_io(self
):
353 self
.assertEqual(data
, b
"hello world\n")
354 f
= self
.BytesIO(data
)
355 self
.read_ops(f
, True)
357 def test_large_file_ops(self
):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys
.platform
[:3] == 'win' or sys
.platform
== 'darwin':
362 if not support
.is_resource_enabled("largefile"):
363 print("\nTesting large file ops skipped on %s." % sys
.platform
,
365 print("It requires %d bytes and a long time." % self
.LARGE
,
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
370 with self
.open(support
.TESTFN
, "w+b", 0) as f
:
371 self
.large_file_ops(f
)
372 with self
.open(support
.TESTFN
, "w+b") as f
:
373 self
.large_file_ops(f
)
375 def test_with_open(self
):
376 for bufsize
in (0, 1, 100):
378 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
380 self
.assertEqual(f
.closed
, True)
383 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
385 except ZeroDivisionError:
386 self
.assertEqual(f
.closed
, True)
388 self
.fail("1/0 didn't raise an exception")
391 def test_append_mode_tell(self
):
392 with self
.open(support
.TESTFN
, "wb") as f
:
394 with self
.open(support
.TESTFN
, "ab", buffering
=0) as f
:
395 self
.assertEqual(f
.tell(), 3)
396 with self
.open(support
.TESTFN
, "ab") as f
:
397 self
.assertEqual(f
.tell(), 3)
398 with self
.open(support
.TESTFN
, "a") as f
:
399 self
.assertTrue(f
.tell() > 0)
401 def test_destructor(self
):
403 class MyFileIO(self
.FileIO
):
407 f
= super(MyFileIO
, self
).__del
__
408 except AttributeError:
414 super(MyFileIO
, self
).close()
417 super(MyFileIO
, self
).flush()
418 f
= MyFileIO(support
.TESTFN
, "wb")
422 self
.assertEqual(record
, [1, 2, 3])
423 with self
.open(support
.TESTFN
, "rb") as f
:
424 self
.assertEqual(f
.read(), b
"xxx")
426 def _check_base_destructor(self
, base
):
430 # This exercises the availability of attributes on object
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
438 record
.append(self
.on_del
)
440 f
= super(MyIO
, self
).__del
__
441 except AttributeError:
446 record
.append(self
.on_close
)
447 super(MyIO
, self
).close()
449 record
.append(self
.on_flush
)
450 super(MyIO
, self
).flush()
454 self
.assertEqual(record
, [1, 2, 3])
456 def test_IOBase_destructor(self
):
457 self
._check
_base
_destructor
(self
.IOBase
)
459 def test_RawIOBase_destructor(self
):
460 self
._check
_base
_destructor
(self
.RawIOBase
)
462 def test_BufferedIOBase_destructor(self
):
463 self
._check
_base
_destructor
(self
.BufferedIOBase
)
465 def test_TextIOBase_destructor(self
):
466 self
._check
_base
_destructor
(self
.TextIOBase
)
468 def test_close_flushes(self
):
469 with self
.open(support
.TESTFN
, "wb") as f
:
471 with self
.open(support
.TESTFN
, "rb") as f
:
472 self
.assertEqual(f
.read(), b
"xxx")
474 def test_array_writes(self
):
475 a
= array
.array(b
'i', range(10))
476 n
= len(a
.tostring())
477 with self
.open(support
.TESTFN
, "wb", 0) as f
:
478 self
.assertEqual(f
.write(a
), n
)
479 with self
.open(support
.TESTFN
, "wb") as f
:
480 self
.assertEqual(f
.write(a
), n
)
482 def test_closefd(self
):
483 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, 'w',
486 def test_read_closed(self
):
487 with self
.open(support
.TESTFN
, "w") as f
:
489 with self
.open(support
.TESTFN
, "r") as f
:
490 file = self
.open(f
.fileno(), "r", closefd
=False)
491 self
.assertEqual(file.read(), "egg\n")
494 self
.assertRaises(ValueError, file.read
)
496 def test_no_closefd_with_filename(self
):
497 # can't use closefd in combination with a file name
498 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, "r", closefd
=False)
500 def test_closefd_attr(self
):
501 with self
.open(support
.TESTFN
, "wb") as f
:
503 with self
.open(support
.TESTFN
, "r") as f
:
504 self
.assertEqual(f
.buffer.raw
.closefd
, True)
505 file = self
.open(f
.fileno(), "r", closefd
=False)
506 self
.assertEqual(file.buffer.raw
.closefd
, False)
508 def test_garbage_collection(self
):
509 # FileIO objects are collected, and collecting them flushes
511 f
= self
.FileIO(support
.TESTFN
, "wb")
517 self
.assertTrue(wr() is None, wr
)
518 with self
.open(support
.TESTFN
, "rb") as f
:
519 self
.assertEqual(f
.read(), b
"abcxxx")
521 def test_unbounded_file(self
):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
524 if not os
.path
.exists(zero
):
525 self
.skipTest("{0} does not exist".format(zero
))
526 if sys
.maxsize
> 0x7FFFFFFF:
527 self
.skipTest("test can only run in a 32-bit address space")
528 if support
.real_max_memuse
< support
._2G
:
529 self
.skipTest("test requires at least 2GB of memory")
530 with self
.open(zero
, "rb", buffering
=0) as f
:
531 self
.assertRaises(OverflowError, f
.read
)
532 with self
.open(zero
, "rb") as f
:
533 self
.assertRaises(OverflowError, f
.read
)
534 with self
.open(zero
, "r") as f
:
535 self
.assertRaises(OverflowError, f
.read
)
537 class CIOTest(IOTest
):
540 class PyIOTest(IOTest
):
541 test_array_writes
= unittest
.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest
.test_array_writes
)
546 class CommonBufferedTests
:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
549 def test_detach(self
):
550 raw
= self
.MockRawIO()
552 self
.assertIs(buf
.detach(), raw
)
553 self
.assertRaises(ValueError, buf
.detach
)
555 def test_fileno(self
):
556 rawio
= self
.MockRawIO()
557 bufio
= self
.tp(rawio
)
559 self
.assertEquals(42, bufio
.fileno())
561 def test_no_fileno(self
):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
566 def test_invalid_args(self
):
567 rawio
= self
.MockRawIO()
568 bufio
= self
.tp(rawio
)
570 self
.assertRaises(ValueError, bufio
.seek
, 0, -1)
571 self
.assertRaises(ValueError, bufio
.seek
, 0, 3)
573 def test_override_destructor(self
):
576 class MyBufferedIO(tp
):
580 f
= super(MyBufferedIO
, self
).__del
__
581 except AttributeError:
587 super(MyBufferedIO
, self
).close()
590 super(MyBufferedIO
, self
).flush()
591 rawio
= self
.MockRawIO()
592 bufio
= MyBufferedIO(rawio
)
593 writable
= bufio
.writable()
597 self
.assertEqual(record
, [1, 2, 3])
599 self
.assertEqual(record
, [1, 2])
601 def test_context_manager(self
):
602 # Test usability as a context manager
603 rawio
= self
.MockRawIO()
604 bufio
= self
.tp(rawio
)
609 # bufio should now be closed, and using it a second time should raise
611 self
.assertRaises(ValueError, _with
)
613 def test_error_through_destructor(self
):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio
= self
.CloseFailureIO()
619 with support
.captured_output("stderr") as s
:
620 self
.assertRaises(AttributeError, f
)
621 s
= s
.getvalue().strip()
623 # The destructor *may* have printed an unraisable error, check it
624 self
.assertEqual(len(s
.splitlines()), 1)
625 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
626 self
.assertTrue(s
.endswith(" ignored"), s
)
629 raw
= self
.MockRawIO()
631 clsname
= "%s.%s" % (self
.tp
.__module
__, self
.tp
.__name
__)
632 self
.assertEqual(repr(b
), "<%s>" % clsname
)
634 self
.assertEqual(repr(b
), "<%s name=u'dummy'>" % clsname
)
636 self
.assertEqual(repr(b
), "<%s name='dummy'>" % clsname
)
639 class BufferedReaderTest(unittest
.TestCase
, CommonBufferedTests
):
642 def test_constructor(self
):
643 rawio
= self
.MockRawIO([b
"abc"])
644 bufio
= self
.tp(rawio
)
645 bufio
.__init
__(rawio
)
646 bufio
.__init
__(rawio
, buffer_size
=1024)
647 bufio
.__init
__(rawio
, buffer_size
=16)
648 self
.assertEquals(b
"abc", bufio
.read())
649 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
650 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
651 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
652 rawio
= self
.MockRawIO([b
"abc"])
653 bufio
.__init
__(rawio
)
654 self
.assertEquals(b
"abc", bufio
.read())
657 for arg
in (None, 7):
658 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
659 bufio
= self
.tp(rawio
)
660 self
.assertEquals(b
"abcdefg", bufio
.read(arg
))
662 self
.assertRaises(ValueError, bufio
.read
, -2)
664 def test_read1(self
):
665 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
666 bufio
= self
.tp(rawio
)
667 self
.assertEquals(b
"a", bufio
.read(1))
668 self
.assertEquals(b
"b", bufio
.read1(1))
669 self
.assertEquals(rawio
._reads
, 1)
670 self
.assertEquals(b
"c", bufio
.read1(100))
671 self
.assertEquals(rawio
._reads
, 1)
672 self
.assertEquals(b
"d", bufio
.read1(100))
673 self
.assertEquals(rawio
._reads
, 2)
674 self
.assertEquals(b
"efg", bufio
.read1(100))
675 self
.assertEquals(rawio
._reads
, 3)
676 self
.assertEquals(b
"", bufio
.read1(100))
677 self
.assertEquals(rawio
._reads
, 4)
679 self
.assertRaises(ValueError, bufio
.read1
, -1)
681 def test_readinto(self
):
682 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
683 bufio
= self
.tp(rawio
)
685 self
.assertEquals(bufio
.readinto(b
), 2)
686 self
.assertEquals(b
, b
"ab")
687 self
.assertEquals(bufio
.readinto(b
), 2)
688 self
.assertEquals(b
, b
"cd")
689 self
.assertEquals(bufio
.readinto(b
), 2)
690 self
.assertEquals(b
, b
"ef")
691 self
.assertEquals(bufio
.readinto(b
), 1)
692 self
.assertEquals(b
, b
"gf")
693 self
.assertEquals(bufio
.readinto(b
), 0)
694 self
.assertEquals(b
, b
"gf")
696 def test_readlines(self
):
698 rawio
= self
.MockRawIO((b
"abc\n", b
"d\n", b
"ef"))
699 return self
.tp(rawio
)
700 self
.assertEquals(bufio().readlines(), [b
"abc\n", b
"d\n", b
"ef"])
701 self
.assertEquals(bufio().readlines(5), [b
"abc\n", b
"d\n"])
702 self
.assertEquals(bufio().readlines(None), [b
"abc\n", b
"d\n", b
"ef"])
704 def test_buffering(self
):
709 [ 100, [ 3, 1, 4, 8 ], [ dlen
, 0 ] ],
710 [ 100, [ 3, 3, 3], [ dlen
] ],
711 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
714 for bufsize
, buf_read_sizes
, raw_read_sizes
in tests
:
715 rawio
= self
.MockFileIO(data
)
716 bufio
= self
.tp(rawio
, buffer_size
=bufsize
)
718 for nbytes
in buf_read_sizes
:
719 self
.assertEquals(bufio
.read(nbytes
), data
[pos
:pos
+nbytes
])
721 # this is mildly implementation-dependent
722 self
.assertEquals(rawio
.read_history
, raw_read_sizes
)
724 def test_read_non_blocking(self
):
725 # Inject some None's in there to simulate EWOULDBLOCK
726 rawio
= self
.MockRawIO((b
"abc", b
"d", None, b
"efg", None, None, None))
727 bufio
= self
.tp(rawio
)
729 self
.assertEquals(b
"abcd", bufio
.read(6))
730 self
.assertEquals(b
"e", bufio
.read(1))
731 self
.assertEquals(b
"fg", bufio
.read())
732 self
.assertEquals(b
"", bufio
.peek(1))
733 self
.assertTrue(None is bufio
.read())
734 self
.assertEquals(b
"", bufio
.read())
736 def test_read_past_eof(self
):
737 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
738 bufio
= self
.tp(rawio
)
740 self
.assertEquals(b
"abcdefg", bufio
.read(9000))
742 def test_read_all(self
):
743 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
744 bufio
= self
.tp(rawio
)
746 self
.assertEquals(b
"abcdefg", bufio
.read())
748 def test_threads(self
):
750 # Write out many bytes with exactly the same number of 0's,
751 # 1's... 255's. This will help us check that concurrent reading
752 # doesn't duplicate or forget contents.
754 l
= list(range(256)) * N
756 s
= bytes(bytearray(l
))
757 with self
.open(support
.TESTFN
, "wb") as f
:
759 with self
.open(support
.TESTFN
, self
.read_mode
, buffering
=0) as raw
:
760 bufio
= self
.tp(raw
, 8)
765 # Intra-buffer read then buffer-flushing read
766 for n
in cycle([1, 19]):
770 # list.append() is atomic
772 except Exception as e
:
775 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
778 time
.sleep(0.02) # yield
781 self
.assertFalse(errors
,
782 "the following exceptions were caught: %r" % errors
)
783 s
= b
''.join(results
)
785 c
= bytes(bytearray([i
]))
786 self
.assertEqual(s
.count(c
), N
)
788 support
.unlink(support
.TESTFN
)
790 def test_misbehaved_io(self
):
791 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
792 bufio
= self
.tp(rawio
)
793 self
.assertRaises(IOError, bufio
.seek
, 0)
794 self
.assertRaises(IOError, bufio
.tell
)
796 class CBufferedReaderTest(BufferedReaderTest
):
797 tp
= io
.BufferedReader
799 def test_constructor(self
):
800 BufferedReaderTest
.test_constructor(self
)
801 # The allocation can succeed on 32-bit builds, e.g. with more
802 # than 2GB RAM and a 64-bit kernel.
803 if sys
.maxsize
> 0x7FFFFFFF:
804 rawio
= self
.MockRawIO()
805 bufio
= self
.tp(rawio
)
806 self
.assertRaises((OverflowError, MemoryError, ValueError),
807 bufio
.__init
__, rawio
, sys
.maxsize
)
809 def test_initialization(self
):
810 rawio
= self
.MockRawIO([b
"abc"])
811 bufio
= self
.tp(rawio
)
812 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
813 self
.assertRaises(ValueError, bufio
.read
)
814 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
815 self
.assertRaises(ValueError, bufio
.read
)
816 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
817 self
.assertRaises(ValueError, bufio
.read
)
819 def test_misbehaved_io_read(self
):
820 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
821 bufio
= self
.tp(rawio
)
822 # _pyio.BufferedReader seems to implement reading different, so that
823 # checking this is not so easy.
824 self
.assertRaises(IOError, bufio
.read
, 10)
826 def test_garbage_collection(self
):
827 # C BufferedReader objects are collected.
828 # The Python version has __del__, so it ends into gc.garbage instead
829 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
835 self
.assertTrue(wr() is None, wr
)
837 class PyBufferedReaderTest(BufferedReaderTest
):
838 tp
= pyio
.BufferedReader
841 class BufferedWriterTest(unittest
.TestCase
, CommonBufferedTests
):
844 def test_constructor(self
):
845 rawio
= self
.MockRawIO()
846 bufio
= self
.tp(rawio
)
847 bufio
.__init
__(rawio
)
848 bufio
.__init
__(rawio
, buffer_size
=1024)
849 bufio
.__init
__(rawio
, buffer_size
=16)
850 self
.assertEquals(3, bufio
.write(b
"abc"))
852 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
853 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
854 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
855 bufio
.__init
__(rawio
)
856 self
.assertEquals(3, bufio
.write(b
"ghi"))
858 self
.assertEquals(b
"".join(rawio
._write
_stack
), b
"abcghi")
860 def test_detach_flush(self
):
861 raw
= self
.MockRawIO()
864 self
.assertFalse(raw
._write
_stack
)
866 self
.assertEqual(raw
._write
_stack
, [b
"howdy!"])
868 def test_write(self
):
869 # Write to the buffered IO but don't overflow the buffer.
870 writer
= self
.MockRawIO()
871 bufio
= self
.tp(writer
, 8)
873 self
.assertFalse(writer
._write
_stack
)
875 def test_write_overflow(self
):
876 writer
= self
.MockRawIO()
877 bufio
= self
.tp(writer
, 8)
878 contents
= b
"abcdefghijklmnop"
879 for n
in range(0, len(contents
), 3):
880 bufio
.write(contents
[n
:n
+3])
881 flushed
= b
"".join(writer
._write
_stack
)
882 # At least (total - 8) bytes were implicitly flushed, perhaps more
883 # depending on the implementation.
884 self
.assertTrue(flushed
.startswith(contents
[:-8]), flushed
)
886 def check_writes(self
, intermediate_func
):
887 # Lots of writes, test the flushed output is as expected.
888 contents
= bytes(range(256)) * 1000
890 writer
= self
.MockRawIO()
891 bufio
= self
.tp(writer
, 13)
892 # Generator of write sizes: repeat each N 15 times then proceed to N+1
894 for size
in count(1):
898 while n
< len(contents
):
899 size
= min(next(sizes
), len(contents
) - n
)
900 self
.assertEquals(bufio
.write(contents
[n
:n
+size
]), size
)
901 intermediate_func(bufio
)
904 self
.assertEquals(contents
,
905 b
"".join(writer
._write
_stack
))
907 def test_writes(self
):
908 self
.check_writes(lambda bufio
: None)
910 def test_writes_and_flushes(self
):
911 self
.check_writes(lambda bufio
: bufio
.flush())
913 def test_writes_and_seeks(self
):
916 bufio
.seek(pos
+ 1, 0)
917 bufio
.seek(pos
- 1, 0)
919 self
.check_writes(_seekabs
)
921 pos
= bufio
.seek(0, 1)
925 self
.check_writes(_seekrel
)
927 def test_writes_and_truncates(self
):
928 self
.check_writes(lambda bufio
: bufio
.truncate(bufio
.tell()))
930 def test_write_non_blocking(self
):
931 raw
= self
.MockNonBlockWriterIO()
932 bufio
= self
.tp(raw
, 8)
934 self
.assertEquals(bufio
.write(b
"abcd"), 4)
935 self
.assertEquals(bufio
.write(b
"efghi"), 5)
936 # 1 byte will be written, the rest will be buffered
938 self
.assertEquals(bufio
.write(b
"jklmn"), 5)
940 # 8 bytes will be written, 8 will be buffered and the rest will be lost
943 bufio
.write(b
"opqrwxyz0123456789")
944 except self
.BlockingIOError
as e
:
945 written
= e
.characters_written
947 self
.fail("BlockingIOError should have been raised")
948 self
.assertEquals(written
, 16)
949 self
.assertEquals(raw
.pop_written(),
950 b
"abcdefghijklmnopqrwxyz")
952 self
.assertEquals(bufio
.write(b
"ABCDEFGHI"), 9)
953 s
= raw
.pop_written()
954 # Previously buffered bytes were flushed
955 self
.assertTrue(s
.startswith(b
"01234567A"), s
)
957 def test_write_and_rewind(self
):
959 bufio
= self
.tp(raw
, 4)
960 self
.assertEqual(bufio
.write(b
"abcdef"), 6)
961 self
.assertEqual(bufio
.tell(), 6)
963 self
.assertEqual(bufio
.write(b
"XY"), 2)
965 self
.assertEqual(raw
.getvalue(), b
"XYcdef")
966 self
.assertEqual(bufio
.write(b
"123456"), 6)
968 self
.assertEqual(raw
.getvalue(), b
"XYcdef123456")
970 def test_flush(self
):
971 writer
= self
.MockRawIO()
972 bufio
= self
.tp(writer
, 8)
975 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
977 def test_destructor(self
):
978 writer
= self
.MockRawIO()
979 bufio
= self
.tp(writer
, 8)
983 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
985 def test_truncate(self
):
986 # Truncate implicitly flushes the buffer.
987 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
988 bufio
= self
.tp(raw
, 8)
989 bufio
.write(b
"abcdef")
990 self
.assertEqual(bufio
.truncate(3), 3)
991 self
.assertEqual(bufio
.tell(), 3)
992 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
993 self
.assertEqual(f
.read(), b
"abc")
995 def test_threads(self
):
997 # Write out many bytes from many threads and test they were
1000 contents
= bytes(range(256)) * N
1001 sizes
= cycle([1, 19])
1004 while n
< len(contents
):
1006 queue
.append(contents
[n
:n
+size
])
1009 # We use a real file object because it allows us to
1010 # exercise situations where the GIL is released before
1011 # writing the buffer to the raw streams. This is in addition
1012 # to concurrency issues due to switching threads in the middle
1014 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1015 bufio
= self
.tp(raw
, 8)
1025 except Exception as e
:
1028 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
1031 time
.sleep(0.02) # yield
1034 self
.assertFalse(errors
,
1035 "the following exceptions were caught: %r" % errors
)
1037 with self
.open(support
.TESTFN
, "rb") as f
:
1039 for i
in range(256):
1040 self
.assertEquals(s
.count(bytes([i
])), N
)
1042 support
.unlink(support
.TESTFN
)
1044 def test_misbehaved_io(self
):
1045 rawio
= self
.MisbehavedRawIO()
1046 bufio
= self
.tp(rawio
, 5)
1047 self
.assertRaises(IOError, bufio
.seek
, 0)
1048 self
.assertRaises(IOError, bufio
.tell
)
1049 self
.assertRaises(IOError, bufio
.write
, b
"abcdef")
1051 def test_max_buffer_size_deprecation(self
):
1052 with support
.check_warnings() as w
:
1053 warnings
.simplefilter("always", DeprecationWarning)
1054 self
.tp(self
.MockRawIO(), 8, 12)
1055 self
.assertEqual(len(w
.warnings
), 1)
1056 warning
= w
.warnings
[0]
1057 self
.assertTrue(warning
.category
is DeprecationWarning)
1058 self
.assertEqual(str(warning
.message
),
1059 "max_buffer_size is deprecated")
1062 class CBufferedWriterTest(BufferedWriterTest
):
1063 tp
= io
.BufferedWriter
1065 def test_constructor(self
):
1066 BufferedWriterTest
.test_constructor(self
)
1067 # The allocation can succeed on 32-bit builds, e.g. with more
1068 # than 2GB RAM and a 64-bit kernel.
1069 if sys
.maxsize
> 0x7FFFFFFF:
1070 rawio
= self
.MockRawIO()
1071 bufio
= self
.tp(rawio
)
1072 self
.assertRaises((OverflowError, MemoryError, ValueError),
1073 bufio
.__init
__, rawio
, sys
.maxsize
)
1075 def test_initialization(self
):
1076 rawio
= self
.MockRawIO()
1077 bufio
= self
.tp(rawio
)
1078 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
1079 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1080 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
1081 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1082 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
1083 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1085 def test_garbage_collection(self
):
1086 # C BufferedWriter objects are collected, and collecting them flushes
1088 # The Python version has __del__, so it ends into gc.garbage instead
1089 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
1095 support
.gc_collect()
1096 self
.assertTrue(wr() is None, wr
)
1097 with self
.open(support
.TESTFN
, "rb") as f
:
1098 self
.assertEqual(f
.read(), b
"123xxx")
1101 class PyBufferedWriterTest(BufferedWriterTest
):
1102 tp
= pyio
.BufferedWriter
1104 class BufferedRWPairTest(unittest
.TestCase
):
1106 def test_constructor(self
):
1107 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1108 self
.assertFalse(pair
.closed
)
1110 def test_detach(self
):
1111 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1112 self
.assertRaises(self
.UnsupportedOperation
, pair
.detach
)
1114 def test_constructor_max_buffer_size_deprecation(self
):
1115 with support
.check_warnings() as w
:
1116 warnings
.simplefilter("always", DeprecationWarning)
1117 self
.tp(self
.MockRawIO(), self
.MockRawIO(), 8, 12)
1118 self
.assertEqual(len(w
.warnings
), 1)
1119 warning
= w
.warnings
[0]
1120 self
.assertTrue(warning
.category
is DeprecationWarning)
1121 self
.assertEqual(str(warning
.message
),
1122 "max_buffer_size is deprecated")
1124 def test_constructor_with_not_readable(self
):
1125 class NotReadable(MockRawIO
):
1129 self
.assertRaises(IOError, self
.tp
, NotReadable(), self
.MockRawIO())
1131 def test_constructor_with_not_writeable(self
):
1132 class NotWriteable(MockRawIO
):
1136 self
.assertRaises(IOError, self
.tp
, self
.MockRawIO(), NotWriteable())
1138 def test_read(self
):
1139 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1141 self
.assertEqual(pair
.read(3), b
"abc")
1142 self
.assertEqual(pair
.read(1), b
"d")
1143 self
.assertEqual(pair
.read(), b
"ef")
1144 pair
= self
.tp(self
.BytesIO(b
"abc"), self
.MockRawIO())
1145 self
.assertEqual(pair
.read(None), b
"abc")
1147 def test_readlines(self
):
1148 pair
= lambda: self
.tp(self
.BytesIO(b
"abc\ndef\nh"), self
.MockRawIO())
1149 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1150 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1151 self
.assertEqual(pair().readlines(5), [b
"abc\n", b
"def\n"])
1153 def test_read1(self
):
1154 # .read1() is delegated to the underlying reader object, so this test
1156 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1158 self
.assertEqual(pair
.read1(3), b
"abc")
1160 def test_readinto(self
):
1161 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1164 self
.assertEqual(pair
.readinto(data
), 5)
1165 self
.assertEqual(data
, b
"abcde")
1167 def test_write(self
):
1168 w
= self
.MockRawIO()
1169 pair
= self
.tp(self
.MockRawIO(), w
)
1175 self
.assertEqual(w
._write
_stack
, [b
"abc", b
"def"])
1177 def test_peek(self
):
1178 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1180 self
.assertTrue(pair
.peek(3).startswith(b
"abc"))
1181 self
.assertEqual(pair
.read(3), b
"abc")
1183 def test_readable(self
):
1184 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1185 self
.assertTrue(pair
.readable())
1187 def test_writeable(self
):
1188 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1189 self
.assertTrue(pair
.writable())
1191 def test_seekable(self
):
1192 # BufferedRWPairs are never seekable, even if their readers and writers
1194 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1195 self
.assertFalse(pair
.seekable())
1197 # .flush() is delegated to the underlying writer object and has been
1198 # tested in the test_write method.
1200 def test_close_and_closed(self
):
1201 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1202 self
.assertFalse(pair
.closed
)
1204 self
.assertTrue(pair
.closed
)
1206 def test_isatty(self
):
1207 class SelectableIsAtty(MockRawIO
):
1208 def __init__(self
, isatty
):
1209 MockRawIO
.__init
__(self
)
1210 self
._isatty
= isatty
1215 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1216 self
.assertFalse(pair
.isatty())
1218 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1219 self
.assertTrue(pair
.isatty())
1221 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1222 self
.assertTrue(pair
.isatty())
1224 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1225 self
.assertTrue(pair
.isatty())
1227 class CBufferedRWPairTest(BufferedRWPairTest
):
1228 tp
= io
.BufferedRWPair
1230 class PyBufferedRWPairTest(BufferedRWPairTest
):
1231 tp
= pyio
.BufferedRWPair
1234 class BufferedRandomTest(BufferedReaderTest
, BufferedWriterTest
):
1238 def test_constructor(self
):
1239 BufferedReaderTest
.test_constructor(self
)
1240 BufferedWriterTest
.test_constructor(self
)
1242 def test_read_and_write(self
):
1243 raw
= self
.MockRawIO((b
"asdf", b
"ghjk"))
1244 rw
= self
.tp(raw
, 8)
1246 self
.assertEqual(b
"as", rw
.read(2))
1249 self
.assertFalse(raw
._write
_stack
) # Buffer writes
1250 self
.assertEqual(b
"ghjk", rw
.read())
1251 self
.assertEquals(b
"dddeee", raw
._write
_stack
[0])
1253 def test_seek_and_tell(self
):
1254 raw
= self
.BytesIO(b
"asdfghjkl")
1257 self
.assertEquals(b
"as", rw
.read(2))
1258 self
.assertEquals(2, rw
.tell())
1260 self
.assertEquals(b
"asdf", rw
.read(4))
1264 self
.assertEquals(b
"asdfasdfl", rw
.read())
1265 self
.assertEquals(9, rw
.tell())
1267 self
.assertEquals(5, rw
.tell())
1269 self
.assertEquals(7, rw
.tell())
1270 self
.assertEquals(b
"fl", rw
.read(11))
1271 self
.assertRaises(TypeError, rw
.seek
, 0.0)
1273 def check_flush_and_read(self
, read_func
):
1274 raw
= self
.BytesIO(b
"abcdefghi")
1275 bufio
= self
.tp(raw
)
1277 self
.assertEquals(b
"ab", read_func(bufio
, 2))
1279 self
.assertEquals(b
"ef", read_func(bufio
, 2))
1280 self
.assertEquals(6, bufio
.tell())
1282 self
.assertEquals(6, bufio
.tell())
1283 self
.assertEquals(b
"ghi", read_func(bufio
))
1286 # flush() resets the read buffer
1289 self
.assertEquals(b
"XYZ", read_func(bufio
, 3))
1291 def test_flush_and_read(self
):
1292 self
.check_flush_and_read(lambda bufio
, *args
: bufio
.read(*args
))
1294 def test_flush_and_readinto(self
):
1295 def _readinto(bufio
, n
=-1):
1296 b
= bytearray(n
if n
>= 0 else 9999)
1297 n
= bufio
.readinto(b
)
1299 self
.check_flush_and_read(_readinto
)
1301 def test_flush_and_peek(self
):
1302 def _peek(bufio
, n
=-1):
1303 # This relies on the fact that the buffer can contain the whole
1304 # raw stream, otherwise peek() can return less.
1308 bufio
.seek(len(b
), 1)
1310 self
.check_flush_and_read(_peek
)
1312 def test_flush_and_write(self
):
1313 raw
= self
.BytesIO(b
"abcdefghi")
1314 bufio
= self
.tp(raw
)
1321 self
.assertEquals(b
"12345fghi", raw
.getvalue())
1322 self
.assertEquals(b
"12345fghi", bufio
.read())
1324 def test_threads(self
):
1325 BufferedReaderTest
.test_threads(self
)
1326 BufferedWriterTest
.test_threads(self
)
1328 def test_writes_and_peek(self
):
1331 self
.check_writes(_peek
)
1337 self
.check_writes(_peek
)
1339 def test_writes_and_reads(self
):
1343 self
.check_writes(_read
)
1345 def test_writes_and_read1s(self
):
1349 self
.check_writes(_read1
)
1351 def test_writes_and_readintos(self
):
1354 bufio
.readinto(bytearray(1))
1355 self
.check_writes(_read
)
1357 def test_write_after_readahead(self
):
1358 # Issue #6629: writing after the buffer was filled by readahead should
1359 # first rewind the raw stream.
1360 for overwrite_size
in [1, 5]:
1361 raw
= self
.BytesIO(b
"A" * 10)
1362 bufio
= self
.tp(raw
, 4)
1364 self
.assertEqual(bufio
.read(1), b
"A")
1365 self
.assertEqual(bufio
.tell(), 1)
1366 # Overwriting should rewind the raw stream if it needs so
1367 bufio
.write(b
"B" * overwrite_size
)
1368 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1369 # If the write size was smaller than the buffer size, flush() and
1370 # check that rewind happens.
1372 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1375 b
"A" + b
"B" * overwrite_size
+ b
"A" * (9 - overwrite_size
))
1377 def test_misbehaved_io(self
):
1378 BufferedReaderTest
.test_misbehaved_io(self
)
1379 BufferedWriterTest
.test_misbehaved_io(self
)
1381 class CBufferedRandomTest(CBufferedReaderTest
, CBufferedWriterTest
, BufferedRandomTest
):
1382 tp
= io
.BufferedRandom
1384 def test_constructor(self
):
1385 BufferedRandomTest
.test_constructor(self
)
1386 # The allocation can succeed on 32-bit builds, e.g. with more
1387 # than 2GB RAM and a 64-bit kernel.
1388 if sys
.maxsize
> 0x7FFFFFFF:
1389 rawio
= self
.MockRawIO()
1390 bufio
= self
.tp(rawio
)
1391 self
.assertRaises((OverflowError, MemoryError, ValueError),
1392 bufio
.__init
__, rawio
, sys
.maxsize
)
1394 def test_garbage_collection(self
):
1395 CBufferedReaderTest
.test_garbage_collection(self
)
1396 CBufferedWriterTest
.test_garbage_collection(self
)
1398 class PyBufferedRandomTest(BufferedRandomTest
):
1399 tp
= pyio
.BufferedRandom
1402 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1404 # - A single output character can correspond to many bytes of input.
1405 # - The number of input bytes to complete the character can be
1406 # undetermined until the last input byte is received.
1407 # - The number of input bytes can vary depending on previous input.
1408 # - A single input byte can correspond to many characters of output.
1409 # - The number of output characters can be undetermined until the
1410 # last input byte is received.
1411 # - The number of output characters can vary depending on previous input.
1413 class StatefulIncrementalDecoder(codecs
.IncrementalDecoder
):
1415 For testing seek/tell behavior with a stateful, buffering decoder.
1417 Input is a sequence of words. Words may be fixed-length (length set
1418 by input) or variable-length (period-terminated). In variable-length
1419 mode, extra periods are ignored. Possible words are:
1420 - 'i' followed by a number sets the input length, I (maximum 99).
1421 When I is set to 0, words are space-terminated.
1422 - 'o' followed by a number sets the output length, O (maximum 99).
1423 - Any other word is converted into a word followed by a period on
1424 the output. The output word consists of the input word truncated
1425 or padded out with hyphens to make its length equal to O. If O
1426 is 0, the word is output verbatim without truncating or padding.
1427 I and O are initially set to 1. When I changes, any buffered input is
1428 re-scanned according to the new I. EOF also terminates the last word.
1431 def __init__(self
, errors
='strict'):
1432 codecs
.IncrementalDecoder
.__init
__(self
, errors
)
1436 return '<SID %x>' % id(self
)
1441 self
.buffer = bytearray()
1444 i
, o
= self
.i ^
1, self
.o ^
1 # so that flags = 0 after reset()
1445 return bytes(self
.buffer), i
*100 + o
1447 def setstate(self
, state
):
1449 self
.buffer = bytearray(buffer)
1450 i
, o
= divmod(io
, 100)
1451 self
.i
, self
.o
= i ^
1, o ^
1
1453 def decode(self
, input, final
=False):
1456 if self
.i
== 0: # variable-length, terminated with period
1459 output
+= self
.process_word()
1461 self
.buffer.append(b
)
1462 else: # fixed-length, terminate after self.i bytes
1463 self
.buffer.append(b
)
1464 if len(self
.buffer) == self
.i
:
1465 output
+= self
.process_word()
1466 if final
and self
.buffer: # EOF terminates the last word
1467 output
+= self
.process_word()
1470 def process_word(self
):
1472 if self
.buffer[0] == ord('i'):
1473 self
.i
= min(99, int(self
.buffer[1:] or 0)) # set input length
1474 elif self
.buffer[0] == ord('o'):
1475 self
.o
= min(99, int(self
.buffer[1:] or 0)) # set output length
1477 output
= self
.buffer.decode('ascii')
1478 if len(output
) < self
.o
:
1479 output
+= '-'*self
.o
# pad out with hyphens
1481 output
= output
[:self
.o
] # truncate to output length
1483 self
.buffer = bytearray()
1486 codecEnabled
= False
1489 def lookupTestDecoder(cls
, name
):
1490 if cls
.codecEnabled
and name
== 'test_decoder':
1491 latin1
= codecs
.lookup('latin-1')
1492 return codecs
.CodecInfo(
1493 name
='test_decoder', encode
=latin1
.encode
, decode
=None,
1494 incrementalencoder
=None,
1495 streamreader
=None, streamwriter
=None,
1496 incrementaldecoder
=cls
)
1498 # Register the previous decoder for testing.
1499 # Disabled by default, tests will enable it.
1500 codecs
.register(StatefulIncrementalDecoder
.lookupTestDecoder
)
1503 class StatefulIncrementalDecoderTest(unittest
.TestCase
):
1505 Make sure the StatefulIncrementalDecoder actually works.
1509 # I=1, O=1 (fixed-length input == fixed-length output)
1510 (b
'abcd', False, 'a.b.c.d.'),
1511 # I=0, O=0 (variable-length input, variable-length output)
1512 (b
'oiabcd', True, 'abcd.'),
1513 # I=0, O=0 (should ignore extra periods)
1514 (b
'oi...abcd...', True, 'abcd.'),
1515 # I=0, O=6 (variable-length input, fixed-length output)
1516 (b
'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1517 # I=2, O=6 (fixed-length input < fixed-length output)
1518 (b
'i.i2.o6xyz', True, 'xy----.z-----.'),
1519 # I=6, O=3 (fixed-length input > fixed-length output)
1520 (b
'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1521 # I=0, then 3; O=29, then 15 (with longer output)
1522 (b
'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1523 'a----------------------------.' +
1524 'b----------------------------.' +
1525 'cde--------------------------.' +
1526 'abcdefghijabcde.' +
1527 'a.b------------.' +
1528 '.c.------------.' +
1529 'd.e------------.' +
1530 'k--------------.' +
1531 'l--------------.' +
1535 def test_decoder(self
):
1536 # Try a few one-shot test cases.
1537 for input, eof
, output
in self
.test_cases
:
1538 d
= StatefulIncrementalDecoder()
1539 self
.assertEquals(d
.decode(input, eof
), output
)
1541 # Also test an unfinished decode, followed by forcing EOF.
1542 d
= StatefulIncrementalDecoder()
1543 self
.assertEquals(d
.decode(b
'oiabcd'), '')
1544 self
.assertEquals(d
.decode(b
'', 1), 'abcd.')
1546 class TextIOWrapperTest(unittest
.TestCase
):
1549 self
.testdata
= b
"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1550 self
.normalized
= b
"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1551 support
.unlink(support
.TESTFN
)
1554 support
.unlink(support
.TESTFN
)
1556 def test_constructor(self
):
1557 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
1558 b
= self
.BufferedReader(r
, 1000)
1559 t
= self
.TextIOWrapper(b
)
1560 t
.__init
__(b
, encoding
="latin1", newline
="\r\n")
1561 self
.assertEquals(t
.encoding
, "latin1")
1562 self
.assertEquals(t
.line_buffering
, False)
1563 t
.__init
__(b
, encoding
="utf8", line_buffering
=True)
1564 self
.assertEquals(t
.encoding
, "utf8")
1565 self
.assertEquals(t
.line_buffering
, True)
1566 self
.assertEquals("\xe9\n", t
.readline())
1567 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
1568 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
1570 def test_detach(self
):
1572 b
= self
.BufferedWriter(r
)
1573 t
= self
.TextIOWrapper(b
)
1574 self
.assertIs(t
.detach(), b
)
1576 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1578 self
.assertFalse(r
.getvalue())
1580 self
.assertEqual(r
.getvalue(), b
"howdy")
1581 self
.assertRaises(ValueError, t
.detach
)
1583 def test_repr(self
):
1584 raw
= self
.BytesIO("hello".encode("utf-8"))
1585 b
= self
.BufferedReader(raw
)
1586 t
= self
.TextIOWrapper(b
, encoding
="utf-8")
1587 modname
= self
.TextIOWrapper
.__module
__
1588 self
.assertEqual(repr(t
),
1589 "<%s.TextIOWrapper encoding='utf-8'>" % modname
)
1591 self
.assertEqual(repr(t
),
1592 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname
)
1594 self
.assertEqual(repr(t
),
1595 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname
)
1597 def test_line_buffering(self
):
1599 b
= self
.BufferedWriter(r
, 1000)
1600 t
= self
.TextIOWrapper(b
, newline
="\n", line_buffering
=True)
1602 self
.assertEquals(r
.getvalue(), b
"") # No flush happened
1604 self
.assertEquals(r
.getvalue(), b
"XY\nZ") # All got flushed
1606 self
.assertEquals(r
.getvalue(), b
"XY\nZA\rB")
1608 def test_encoding(self
):
1609 # Check the encoding attribute is always set, and valid
1611 t
= self
.TextIOWrapper(b
, encoding
="utf8")
1612 self
.assertEqual(t
.encoding
, "utf8")
1613 t
= self
.TextIOWrapper(b
)
1614 self
.assertTrue(t
.encoding
is not None)
1615 codecs
.lookup(t
.encoding
)
1617 def test_encoding_errors_reading(self
):
1619 b
= self
.BytesIO(b
"abc\n\xff\n")
1620 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1621 self
.assertRaises(UnicodeError, t
.read
)
1622 # (2) explicit strict
1623 b
= self
.BytesIO(b
"abc\n\xff\n")
1624 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1625 self
.assertRaises(UnicodeError, t
.read
)
1627 b
= self
.BytesIO(b
"abc\n\xff\n")
1628 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore")
1629 self
.assertEquals(t
.read(), "abc\n\n")
1631 b
= self
.BytesIO(b
"abc\n\xff\n")
1632 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace")
1633 self
.assertEquals(t
.read(), "abc\n\ufffd\n")
1635 def test_encoding_errors_writing(self
):
1638 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1639 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1640 # (2) explicit strict
1642 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1643 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1646 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore",
1648 t
.write("abc\xffdef\n")
1650 self
.assertEquals(b
.getvalue(), b
"abcdef\n")
1653 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace",
1655 t
.write("abc\xffdef\n")
1657 self
.assertEquals(b
.getvalue(), b
"abc?def\n")
1659 def test_newlines(self
):
1660 input_lines
= [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1663 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1664 [ '', input_lines
],
1665 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1666 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1667 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1671 'utf-16', 'utf-16-le', 'utf-16-be',
1672 'utf-32', 'utf-32-le', 'utf-32-be',
1675 # Try a range of buffer sizes to test the case where \r is the last
1676 # character in TextIOWrapper._pending_line.
1677 for encoding
in encodings
:
1678 # XXX: str.encode() should return bytes
1679 data
= bytes(''.join(input_lines
).encode(encoding
))
1680 for do_reads
in (False, True):
1681 for bufsize
in range(1, 10):
1682 for newline
, exp_lines
in tests
:
1683 bufio
= self
.BufferedReader(self
.BytesIO(data
), bufsize
)
1684 textio
= self
.TextIOWrapper(bufio
, newline
=newline
,
1692 self
.assertEquals(len(c2
), 2)
1693 got_lines
.append(c2
+ textio
.readline())
1695 got_lines
= list(textio
)
1697 for got_line
, exp_line
in zip(got_lines
, exp_lines
):
1698 self
.assertEquals(got_line
, exp_line
)
1699 self
.assertEquals(len(got_lines
), len(exp_lines
))
1701 def test_newlines_input(self
):
1702 testdata
= b
"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1703 normalized
= testdata
.replace(b
"\r\n", b
"\n").replace(b
"\r", b
"\n")
1704 for newline
, expected
in [
1705 (None, normalized
.decode("ascii").splitlines(True)),
1706 ("", testdata
.decode("ascii").splitlines(True)),
1707 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1708 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1709 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1711 buf
= self
.BytesIO(testdata
)
1712 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1713 self
.assertEquals(txt
.readlines(), expected
)
1715 self
.assertEquals(txt
.read(), "".join(expected
))
1717 def test_newlines_output(self
):
1719 "": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1720 "\n": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1721 "\r": b
"AAA\rBBB\rCCC\rX\rY\r\rZ",
1722 "\r\n": b
"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1724 tests
= [(None, testdict
[os
.linesep
])] + sorted(testdict
.items())
1725 for newline
, expected
in tests
:
1726 buf
= self
.BytesIO()
1727 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1729 txt
.write("BB\nCCC\n")
1730 txt
.write("X\rY\r\nZ")
1732 self
.assertEquals(buf
.closed
, False)
1733 self
.assertEquals(buf
.getvalue(), expected
)
1735 def test_destructor(self
):
1738 class MyBytesIO(base
):
1740 l
.append(self
.getvalue())
1743 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1746 support
.gc_collect()
1747 self
.assertEquals([b
"abc"], l
)
1749 def test_override_destructor(self
):
1751 class MyTextIO(self
.TextIOWrapper
):
1755 f
= super(MyTextIO
, self
).__del
__
1756 except AttributeError:
1762 super(MyTextIO
, self
).close()
1765 super(MyTextIO
, self
).flush()
1767 t
= MyTextIO(b
, encoding
="ascii")
1769 support
.gc_collect()
1770 self
.assertEqual(record
, [1, 2, 3])
1772 def test_error_through_destructor(self
):
1773 # Test that the exception state is not modified by a destructor,
1774 # even if close() fails.
1775 rawio
= self
.CloseFailureIO()
1777 self
.TextIOWrapper(rawio
).xyzzy
1778 with support
.captured_output("stderr") as s
:
1779 self
.assertRaises(AttributeError, f
)
1780 s
= s
.getvalue().strip()
1782 # The destructor *may* have printed an unraisable error, check it
1783 self
.assertEqual(len(s
.splitlines()), 1)
1784 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
1785 self
.assertTrue(s
.endswith(" ignored"), s
)
1787 # Systematic tests of the text I/O API
1789 def test_basic_io(self
):
1790 for chunksize
in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1791 for enc
in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1792 f
= self
.open(support
.TESTFN
, "w+", encoding
=enc
)
1793 f
._CHUNK
_SIZE
= chunksize
1794 self
.assertEquals(f
.write("abc"), 3)
1796 f
= self
.open(support
.TESTFN
, "r+", encoding
=enc
)
1797 f
._CHUNK
_SIZE
= chunksize
1798 self
.assertEquals(f
.tell(), 0)
1799 self
.assertEquals(f
.read(), "abc")
1801 self
.assertEquals(f
.seek(0), 0)
1802 self
.assertEquals(f
.read(None), "abc")
1804 self
.assertEquals(f
.read(2), "ab")
1805 self
.assertEquals(f
.read(1), "c")
1806 self
.assertEquals(f
.read(1), "")
1807 self
.assertEquals(f
.read(), "")
1808 self
.assertEquals(f
.tell(), cookie
)
1809 self
.assertEquals(f
.seek(0), 0)
1810 self
.assertEquals(f
.seek(0, 2), cookie
)
1811 self
.assertEquals(f
.write("def"), 3)
1812 self
.assertEquals(f
.seek(cookie
), cookie
)
1813 self
.assertEquals(f
.read(), "def")
1814 if enc
.startswith("utf"):
1815 self
.multi_line_test(f
, enc
)
1818 def multi_line_test(self
, f
, enc
):
1821 sample
= "s\xff\u0fff\uffff"
1823 for size
in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1825 for i
in range(size
):
1826 chars
.append(sample
[i
% len(sample
)])
1827 line
= "".join(chars
) + "\n"
1828 wlines
.append((f
.tell(), line
))
1837 rlines
.append((pos
, line
))
1838 self
.assertEquals(rlines
, wlines
)
1840 def test_telling(self
):
1841 f
= self
.open(support
.TESTFN
, "w+", encoding
="utf8")
1848 self
.assertEquals(f
.tell(), p0
)
1849 self
.assertEquals(f
.readline(), "\xff\n")
1850 self
.assertEquals(f
.tell(), p1
)
1851 self
.assertEquals(f
.readline(), "\xff\n")
1852 self
.assertEquals(f
.tell(), p2
)
1855 self
.assertEquals(line
, "\xff\n")
1856 self
.assertRaises(IOError, f
.tell
)
1857 self
.assertEquals(f
.tell(), p2
)
1860 def test_seeking(self
):
1861 chunk_size
= _default_chunk_size()
1862 prefix_size
= chunk_size
- 2
1863 u_prefix
= "a" * prefix_size
1864 prefix
= bytes(u_prefix
.encode("utf-8"))
1865 self
.assertEquals(len(u_prefix
), len(prefix
))
1866 u_suffix
= "\u8888\n"
1867 suffix
= bytes(u_suffix
.encode("utf-8"))
1868 line
= prefix
+ suffix
1869 f
= self
.open(support
.TESTFN
, "wb")
1872 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1873 s
= f
.read(prefix_size
)
1874 self
.assertEquals(s
, prefix
.decode("ascii"))
1875 self
.assertEquals(f
.tell(), prefix_size
)
1876 self
.assertEquals(f
.readline(), u_suffix
)
1878 def test_seeking_too(self
):
1879 # Regression test for a specific bug
1880 data
= b
'\xe0\xbf\xbf\n'
1881 f
= self
.open(support
.TESTFN
, "wb")
1884 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1885 f
._CHUNK
_SIZE
# Just test that it exists
1890 def test_seek_and_tell(self
):
1891 #Test seek/tell using the StatefulIncrementalDecoder.
1892 # Make test faster by doing smaller seeks
1895 def test_seek_and_tell_with_data(data
, min_pos
=0):
1896 """Tell/seek to various points within a data stream and ensure
1897 that the decoded data returned by read() is consistent."""
1898 f
= self
.open(support
.TESTFN
, 'wb')
1901 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1902 f
._CHUNK
_SIZE
= CHUNK_SIZE
1906 for i
in range(min_pos
, len(decoded
) + 1): # seek positions
1907 for j
in [1, 5, len(decoded
) - i
]: # read lengths
1908 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1909 self
.assertEquals(f
.read(i
), decoded
[:i
])
1911 self
.assertEquals(f
.read(j
), decoded
[i
:i
+ j
])
1913 self
.assertEquals(f
.read(), decoded
[i
:])
1916 # Enable the test decoder.
1917 StatefulIncrementalDecoder
.codecEnabled
= 1
1921 # Try each test case.
1922 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1923 test_seek_and_tell_with_data(input)
1925 # Position each test case so that it crosses a chunk boundary.
1926 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1927 offset
= CHUNK_SIZE
- len(input)//2
1928 prefix
= b
'.'*offset
1929 # Don't bother seeking into the prefix (takes too long).
1931 test_seek_and_tell_with_data(prefix
+ input, min_pos
)
1933 # Ensure our test decoder won't interfere with subsequent tests.
1935 StatefulIncrementalDecoder
.codecEnabled
= 0
1937 def test_encoded_writes(self
):
1945 for encoding
in tests
:
1946 buf
= self
.BytesIO()
1947 f
= self
.TextIOWrapper(buf
, encoding
=encoding
)
1948 # Check if the BOM is written only once (see issue1753).
1952 self
.assertEquals(f
.read(), data
* 2)
1954 self
.assertEquals(f
.read(), data
* 2)
1955 self
.assertEquals(buf
.getvalue(), (data
* 2).encode(encoding
))
1957 def test_unreadable(self
):
1958 class UnReadable(self
.BytesIO
):
1961 txt
= self
.TextIOWrapper(UnReadable())
1962 self
.assertRaises(IOError, txt
.read
)
1964 def test_read_one_by_one(self
):
1965 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\r\nBB"))
1972 self
.assertEquals(reads
, "AA\nBB")
1974 def test_readlines(self
):
1975 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\nBB\nCC"))
1976 self
.assertEqual(txt
.readlines(), ["AA\n", "BB\n", "CC"])
1978 self
.assertEqual(txt
.readlines(None), ["AA\n", "BB\n", "CC"])
1980 self
.assertEqual(txt
.readlines(5), ["AA\n", "BB\n"])
1982 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1983 def test_read_by_chunk(self
):
1984 # make sure "\r\n" straddles 128 char boundary.
1985 txt
= self
.TextIOWrapper(self
.BytesIO(b
"A" * 127 + b
"\r\nB"))
1992 self
.assertEquals(reads
, "A"*127+"\nB")
1994 def test_issue1395_1(self
):
1995 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
1997 # read one char at a time
2004 self
.assertEquals(reads
, self
.normalized
)
2006 def test_issue1395_2(self
):
2007 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2016 self
.assertEquals(reads
, self
.normalized
)
2018 def test_issue1395_3(self
):
2019 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2023 reads
+= txt
.read(4)
2024 reads
+= txt
.readline()
2025 reads
+= txt
.readline()
2026 reads
+= txt
.readline()
2027 self
.assertEquals(reads
, self
.normalized
)
2029 def test_issue1395_4(self
):
2030 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2035 self
.assertEquals(reads
, self
.normalized
)
2037 def test_issue1395_5(self
):
2038 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2045 self
.assertEquals(txt
.read(4), "BBB\n")
2047 def test_issue2282(self
):
2048 buffer = self
.BytesIO(self
.testdata
)
2049 txt
= self
.TextIOWrapper(buffer, encoding
="ascii")
2051 self
.assertEqual(buffer.seekable(), txt
.seekable())
2053 @unittest.skip("Issue #6213 with incremental encoders")
2054 def test_append_bom(self
):
2055 # The BOM is not written again when appending to a non-empty file
2056 filename
= support
.TESTFN
2057 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2058 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2061 with self
.open(filename
, 'rb') as f
:
2062 self
.assertEquals(f
.read(), 'aaa'.encode(charset
))
2064 with self
.open(filename
, 'a', encoding
=charset
) as f
:
2066 with self
.open(filename
, 'rb') as f
:
2067 self
.assertEquals(f
.read(), 'aaaxxx'.encode(charset
))
2069 @unittest.skip("Issue #6213 with incremental encoders")
2070 def test_seek_bom(self
):
2071 # Same test, but when seeking manually
2072 filename
= support
.TESTFN
2073 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2074 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2077 with self
.open(filename
, 'r+', encoding
=charset
) as f
:
2082 with self
.open(filename
, 'rb') as f
:
2083 self
.assertEquals(f
.read(), 'bbbzzz'.encode(charset
))
2085 def test_errors_property(self
):
2086 with self
.open(support
.TESTFN
, "w") as f
:
2087 self
.assertEqual(f
.errors
, "strict")
2088 with self
.open(support
.TESTFN
, "w", errors
="replace") as f
:
2089 self
.assertEqual(f
.errors
, "replace")
2092 def test_threads_write(self
):
2093 # Issue6750: concurrent writes could duplicate data
2094 event
= threading
.Event()
2095 with self
.open(support
.TESTFN
, "w", buffering
=1) as f
:
2097 text
= "Thread%03d\n" % n
2100 threads
= [threading
.Thread(target
=lambda n
=x
: run(n
))
2108 with self
.open(support
.TESTFN
) as f
:
2111 self
.assertEquals(content
.count("Thread%03d\n" % n
), 1)
2113 class CTextIOWrapperTest(TextIOWrapperTest
):
2115 def test_initialization(self
):
2116 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
2117 b
= self
.BufferedReader(r
, 1000)
2118 t
= self
.TextIOWrapper(b
)
2119 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
2120 self
.assertRaises(ValueError, t
.read
)
2121 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
2122 self
.assertRaises(ValueError, t
.read
)
2124 def test_garbage_collection(self
):
2125 # C TextIOWrapper objects are collected, and collecting them flushes
2127 # The Python version has __del__, so it ends in gc.garbage instead.
2128 rawio
= io
.FileIO(support
.TESTFN
, "wb")
2129 b
= self
.BufferedWriter(rawio
)
2130 t
= self
.TextIOWrapper(b
, encoding
="ascii")
2135 support
.gc_collect()
2136 self
.assertTrue(wr() is None, wr
)
2137 with self
.open(support
.TESTFN
, "rb") as f
:
2138 self
.assertEqual(f
.read(), b
"456def")
2140 class PyTextIOWrapperTest(TextIOWrapperTest
):
2144 class IncrementalNewlineDecoderTest(unittest
.TestCase
):
2146 def check_newline_decoding_utf8(self
, decoder
):
2147 # UTF-8 specific tests for a newline decoder
2148 def _check_decode(b
, s
, **kwargs
):
2149 # We exercise getstate() / setstate() as well as decode()
2150 state
= decoder
.getstate()
2151 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2152 decoder
.setstate(state
)
2153 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2155 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2157 _check_decode(b
'\xe8', "")
2158 _check_decode(b
'\xa2', "")
2159 _check_decode(b
'\x88', "\u8888")
2161 _check_decode(b
'\xe8', "")
2162 _check_decode(b
'\xa2', "")
2163 _check_decode(b
'\x88', "\u8888")
2165 _check_decode(b
'\xe8', "")
2166 self
.assertRaises(UnicodeDecodeError, decoder
.decode
, b
'', final
=True)
2169 _check_decode(b
'\n', "\n")
2170 _check_decode(b
'\r', "")
2171 _check_decode(b
'', "\n", final
=True)
2172 _check_decode(b
'\r', "\n", final
=True)
2174 _check_decode(b
'\r', "")
2175 _check_decode(b
'a', "\na")
2177 _check_decode(b
'\r\r\n', "\n\n")
2178 _check_decode(b
'\r', "")
2179 _check_decode(b
'\r', "\n")
2180 _check_decode(b
'\na', "\na")
2182 _check_decode(b
'\xe8\xa2\x88\r\n', "\u8888\n")
2183 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2184 _check_decode(b
'\n', "\n")
2185 _check_decode(b
'\xe8\xa2\x88\r', "\u8888")
2186 _check_decode(b
'\n', "\n")
2188 def check_newline_decoding(self
, decoder
, encoding
):
2190 if encoding
is not None:
2191 encoder
= codecs
.getincrementalencoder(encoding
)()
2192 def _decode_bytewise(s
):
2193 # Decode one byte at a time
2194 for b
in encoder
.encode(s
):
2195 result
.append(decoder
.decode(b
))
2198 def _decode_bytewise(s
):
2199 # Decode one char at a time
2201 result
.append(decoder
.decode(c
))
2202 self
.assertEquals(decoder
.newlines
, None)
2203 _decode_bytewise("abc\n\r")
2204 self
.assertEquals(decoder
.newlines
, '\n')
2205 _decode_bytewise("\nabc")
2206 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2207 _decode_bytewise("abc\r")
2208 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2209 _decode_bytewise("abc")
2210 self
.assertEquals(decoder
.newlines
, ('\r', '\n', '\r\n'))
2211 _decode_bytewise("abc\r")
2212 self
.assertEquals("".join(result
), "abc\n\nabcabc\nabcabc")
2215 if encoder
is not None:
2217 input = encoder
.encode(input)
2218 self
.assertEquals(decoder
.decode(input), "abc")
2219 self
.assertEquals(decoder
.newlines
, None)
2221 def test_newline_decoder(self
):
2223 # None meaning the IncrementalNewlineDecoder takes unicode input
2224 # rather than bytes input
2225 None, 'utf-8', 'latin-1',
2226 'utf-16', 'utf-16-le', 'utf-16-be',
2227 'utf-32', 'utf-32-le', 'utf-32-be',
2229 for enc
in encodings
:
2230 decoder
= enc
and codecs
.getincrementaldecoder(enc
)()
2231 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2232 self
.check_newline_decoding(decoder
, enc
)
2233 decoder
= codecs
.getincrementaldecoder("utf-8")()
2234 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2235 self
.check_newline_decoding_utf8(decoder
)
2237 def test_newline_bytes(self
):
2238 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2240 self
.assertEquals(dec
.newlines
, None)
2241 self
.assertEquals(dec
.decode("\u0D00"), "\u0D00")
2242 self
.assertEquals(dec
.newlines
, None)
2243 self
.assertEquals(dec
.decode("\u0A00"), "\u0A00")
2244 self
.assertEquals(dec
.newlines
, None)
2245 dec
= self
.IncrementalNewlineDecoder(None, translate
=False)
2247 dec
= self
.IncrementalNewlineDecoder(None, translate
=True)
2250 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2253 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2257 # XXX Tests for open()
2259 class MiscIOTest(unittest
.TestCase
):
2262 support
.unlink(support
.TESTFN
)
2264 def test___all__(self
):
2265 for name
in self
.io
.__all
__:
2266 obj
= getattr(self
.io
, name
, None)
2267 self
.assertTrue(obj
is not None, name
)
2270 elif "error" in name
.lower() or name
== "UnsupportedOperation":
2271 self
.assertTrue(issubclass(obj
, Exception), name
)
2272 elif not name
.startswith("SEEK_"):
2273 self
.assertTrue(issubclass(obj
, self
.IOBase
))
2275 def test_attributes(self
):
2276 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
2277 self
.assertEquals(f
.mode
, "wb")
2280 f
= self
.open(support
.TESTFN
, "U")
2281 self
.assertEquals(f
.name
, support
.TESTFN
)
2282 self
.assertEquals(f
.buffer.name
, support
.TESTFN
)
2283 self
.assertEquals(f
.buffer.raw
.name
, support
.TESTFN
)
2284 self
.assertEquals(f
.mode
, "U")
2285 self
.assertEquals(f
.buffer.mode
, "rb")
2286 self
.assertEquals(f
.buffer.raw
.mode
, "rb")
2289 f
= self
.open(support
.TESTFN
, "w+")
2290 self
.assertEquals(f
.mode
, "w+")
2291 self
.assertEquals(f
.buffer.mode
, "rb+") # Does it really matter?
2292 self
.assertEquals(f
.buffer.raw
.mode
, "rb+")
2294 g
= self
.open(f
.fileno(), "wb", closefd
=False)
2295 self
.assertEquals(g
.mode
, "wb")
2296 self
.assertEquals(g
.raw
.mode
, "wb")
2297 self
.assertEquals(g
.name
, f
.fileno())
2298 self
.assertEquals(g
.raw
.name
, f
.fileno())
2302 def test_io_after_close(self
):
2306 {"mode": "w", "buffering": 1},
2307 {"mode": "w", "buffering": 2},
2308 {"mode": "wb", "buffering": 0},
2311 {"mode": "r", "buffering": 1},
2312 {"mode": "r", "buffering": 2},
2313 {"mode": "rb", "buffering": 0},
2316 {"mode": "w+", "buffering": 1},
2317 {"mode": "w+", "buffering": 2},
2318 {"mode": "w+b", "buffering": 0},
2320 f
= self
.open(support
.TESTFN
, **kwargs
)
2322 self
.assertRaises(ValueError, f
.flush
)
2323 self
.assertRaises(ValueError, f
.fileno
)
2324 self
.assertRaises(ValueError, f
.isatty
)
2325 self
.assertRaises(ValueError, f
.__iter
__)
2326 if hasattr(f
, "peek"):
2327 self
.assertRaises(ValueError, f
.peek
, 1)
2328 self
.assertRaises(ValueError, f
.read
)
2329 if hasattr(f
, "read1"):
2330 self
.assertRaises(ValueError, f
.read1
, 1024)
2331 if hasattr(f
, "readinto"):
2332 self
.assertRaises(ValueError, f
.readinto
, bytearray(1024))
2333 self
.assertRaises(ValueError, f
.readline
)
2334 self
.assertRaises(ValueError, f
.readlines
)
2335 self
.assertRaises(ValueError, f
.seek
, 0)
2336 self
.assertRaises(ValueError, f
.tell
)
2337 self
.assertRaises(ValueError, f
.truncate
)
2338 self
.assertRaises(ValueError, f
.write
,
2339 b
"" if "b" in kwargs
['mode'] else "")
2340 self
.assertRaises(ValueError, f
.writelines
, [])
2341 self
.assertRaises(ValueError, next
, f
)
2343 def test_blockingioerror(self
):
2344 # Various BlockingIOError issues
2345 self
.assertRaises(TypeError, self
.BlockingIOError
)
2346 self
.assertRaises(TypeError, self
.BlockingIOError
, 1)
2347 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, 2, 3, 4)
2348 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, "", None)
2349 b
= self
.BlockingIOError(1, "")
2350 self
.assertEqual(b
.characters_written
, 0)
2354 b
= self
.BlockingIOError(1, c
)
2359 support
.gc_collect()
2360 self
.assertTrue(wr() is None, wr
)
2362 def test_abcs(self
):
2363 # Test the visible base classes are ABCs.
2364 self
.assertTrue(isinstance(self
.IOBase
, abc
.ABCMeta
))
2365 self
.assertTrue(isinstance(self
.RawIOBase
, abc
.ABCMeta
))
2366 self
.assertTrue(isinstance(self
.BufferedIOBase
, abc
.ABCMeta
))
2367 self
.assertTrue(isinstance(self
.TextIOBase
, abc
.ABCMeta
))
2369 def _check_abc_inheritance(self
, abcmodule
):
2370 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
2371 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2372 self
.assertTrue(isinstance(f
, abcmodule
.RawIOBase
))
2373 self
.assertFalse(isinstance(f
, abcmodule
.BufferedIOBase
))
2374 self
.assertFalse(isinstance(f
, abcmodule
.TextIOBase
))
2375 with self
.open(support
.TESTFN
, "wb") as f
:
2376 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2377 self
.assertFalse(isinstance(f
, abcmodule
.RawIOBase
))
2378 self
.assertTrue(isinstance(f
, abcmodule
.BufferedIOBase
))
2379 self
.assertFalse(isinstance(f
, abcmodule
.TextIOBase
))
2380 with self
.open(support
.TESTFN
, "w") as f
:
2381 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2382 self
.assertFalse(isinstance(f
, abcmodule
.RawIOBase
))
2383 self
.assertFalse(isinstance(f
, abcmodule
.BufferedIOBase
))
2384 self
.assertTrue(isinstance(f
, abcmodule
.TextIOBase
))
2386 def test_abc_inheritance(self
):
2387 # Test implementations inherit from their respective ABCs
2388 self
._check
_abc
_inheritance
(self
)
2390 def test_abc_inheritance_official(self
):
2391 # Test implementations inherit from the official ABCs of the
2392 # baseline "io" module.
2393 self
._check
_abc
_inheritance
(io
)
2395 class CMiscIOTest(MiscIOTest
):
2398 class PyMiscIOTest(MiscIOTest
):
2402 tests
= (CIOTest
, PyIOTest
,
2403 CBufferedReaderTest
, PyBufferedReaderTest
,
2404 CBufferedWriterTest
, PyBufferedWriterTest
,
2405 CBufferedRWPairTest
, PyBufferedRWPairTest
,
2406 CBufferedRandomTest
, PyBufferedRandomTest
,
2407 StatefulIncrementalDecoderTest
,
2408 CIncrementalNewlineDecoderTest
, PyIncrementalNewlineDecoderTest
,
2409 CTextIOWrapperTest
, PyTextIOWrapperTest
,
2410 CMiscIOTest
, PyMiscIOTest
,
2413 # Put the namespaces of the IO module we are testing and some useful mock
2414 # classes in the __dict__ of each test.
2415 mocks
= (MockRawIO
, MisbehavedRawIO
, MockFileIO
, CloseFailureIO
,
2416 MockNonBlockWriterIO
)
2417 all_members
= io
.__all
__ + ["IncrementalNewlineDecoder"]
2418 c_io_ns
= dict((name
, getattr(io
, name
)) for name
in all_members
)
2419 py_io_ns
= dict((name
, getattr(pyio
, name
)) for name
in all_members
)
2421 c_io_ns
.update((x
.__name
__, globs
["C" + x
.__name
__]) for x
in mocks
)
2422 py_io_ns
.update((x
.__name
__, globs
["Py" + x
.__name
__]) for x
in mocks
)
2423 # Avoid turning open into a bound method.
2424 py_io_ns
["open"] = pyio
.OpenWrapper
2426 if test
.__name
__.startswith("C"):
2427 for name
, obj
in c_io_ns
.items():
2428 setattr(test
, name
, obj
)
2429 elif test
.__name
__.startswith("Py"):
2430 for name
, obj
in py_io_ns
.items():
2431 setattr(test
, name
, obj
)
2433 support
.run_unittest(*tests
)
2435 if __name__
== "__main__":