1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__
import print_function
23 from __future__
import unicode_literals
33 from itertools
import cycle
, count
34 from collections
import deque
35 from test
import test_support
as support
38 import io
# C implementation of io
39 import _pyio
as pyio
# Python implementation of io
46 bytes
= support
.py3k_bytes
48 def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
50 with io
.open(__file__
, "r", encoding
="latin1") as f
:
56 def __init__(self
, read_stack
=()):
57 self
._read
_stack
= list(read_stack
)
58 self
._write
_stack
= []
61 def read(self
, n
=None):
64 return self
._read
_stack
.pop(0)
69 self
._write
_stack
.append(bytes(b
))
84 def seek(self
, pos
, whence
):
85 return 0 # wrong but we gotta return something
88 return 0 # same comment as above
90 def readinto(self
, buf
):
94 data
= self
._read
_stack
[0]
98 del self
._read
_stack
[0]
101 if len(data
) <= max_len
:
102 del self
._read
_stack
[0]
106 buf
[:] = data
[:max_len
]
107 self
._read
_stack
[0] = data
[max_len
:]
110 def truncate(self
, pos
=None):
113 class CMockRawIO(MockRawIO
, io
.RawIOBase
):
116 class PyMockRawIO(MockRawIO
, pyio
.RawIOBase
):
120 class MisbehavedRawIO(MockRawIO
):
122 return MockRawIO
.write(self
, b
) * 2
124 def read(self
, n
=None):
125 return MockRawIO
.read(self
, n
) * 2
127 def seek(self
, pos
, whence
):
133 def readinto(self
, buf
):
134 MockRawIO
.readinto(self
, buf
)
137 class CMisbehavedRawIO(MisbehavedRawIO
, io
.RawIOBase
):
140 class PyMisbehavedRawIO(MisbehavedRawIO
, pyio
.RawIOBase
):
144 class CloseFailureIO(MockRawIO
):
152 class CCloseFailureIO(CloseFailureIO
, io
.RawIOBase
):
155 class PyCloseFailureIO(CloseFailureIO
, pyio
.RawIOBase
):
161 def __init__(self
, data
):
162 self
.read_history
= []
163 super(MockFileIO
, self
).__init
__(data
)
165 def read(self
, n
=None):
166 res
= super(MockFileIO
, self
).read(n
)
167 self
.read_history
.append(None if res
is None else len(res
))
170 def readinto(self
, b
):
171 res
= super(MockFileIO
, self
).readinto(b
)
172 self
.read_history
.append(res
)
175 class CMockFileIO(MockFileIO
, io
.BytesIO
):
178 class PyMockFileIO(MockFileIO
, pyio
.BytesIO
):
182 class MockNonBlockWriterIO
:
185 self
._write
_stack
= []
186 self
._blocker
_char
= None
188 def pop_written(self
):
189 s
= b
"".join(self
._write
_stack
)
190 self
._write
_stack
[:] = []
193 def block_on(self
, char
):
194 """Block when a given char is encountered."""
195 self
._blocker
_char
= char
209 if self
._blocker
_char
:
211 n
= b
.index(self
._blocker
_char
)
215 self
._blocker
_char
= None
216 self
._write
_stack
.append(b
[:n
])
217 raise self
.BlockingIOError(0, "test blocking", n
)
218 self
._write
_stack
.append(b
)
221 class CMockNonBlockWriterIO(MockNonBlockWriterIO
, io
.RawIOBase
):
222 BlockingIOError
= io
.BlockingIOError
224 class PyMockNonBlockWriterIO(MockNonBlockWriterIO
, pyio
.RawIOBase
):
225 BlockingIOError
= pyio
.BlockingIOError
228 class IOTest(unittest
.TestCase
):
231 support
.unlink(support
.TESTFN
)
234 support
.unlink(support
.TESTFN
)
236 def write_ops(self
, f
):
237 self
.assertEqual(f
.write(b
"blah."), 5)
239 self
.assertEqual(f
.tell(), 5)
242 self
.assertEqual(f
.write(b
"blah."), 5)
243 self
.assertEqual(f
.seek(0), 0)
244 self
.assertEqual(f
.write(b
"Hello."), 6)
245 self
.assertEqual(f
.tell(), 6)
246 self
.assertEqual(f
.seek(-1, 1), 5)
247 self
.assertEqual(f
.tell(), 5)
248 self
.assertEqual(f
.write(bytearray(b
" world\n\n\n")), 9)
249 self
.assertEqual(f
.seek(0), 0)
250 self
.assertEqual(f
.write(b
"h"), 1)
251 self
.assertEqual(f
.seek(-1, 2), 13)
252 self
.assertEqual(f
.tell(), 13)
254 self
.assertEqual(f
.truncate(12), 12)
255 self
.assertEqual(f
.tell(), 13)
256 self
.assertRaises(TypeError, f
.seek
, 0.0)
258 def read_ops(self
, f
, buffered
=False):
260 self
.assertEqual(data
, b
"hello")
261 data
= bytearray(data
)
262 self
.assertEqual(f
.readinto(data
), 5)
263 self
.assertEqual(data
, b
" worl")
264 self
.assertEqual(f
.readinto(data
), 2)
265 self
.assertEqual(len(data
), 5)
266 self
.assertEqual(data
[:2], b
"d\n")
267 self
.assertEqual(f
.seek(0), 0)
268 self
.assertEqual(f
.read(20), b
"hello world\n")
269 self
.assertEqual(f
.read(1), b
"")
270 self
.assertEqual(f
.readinto(bytearray(b
"x")), 0)
271 self
.assertEqual(f
.seek(-6, 2), 6)
272 self
.assertEqual(f
.read(5), b
"world")
273 self
.assertEqual(f
.read(0), b
"")
274 self
.assertEqual(f
.readinto(bytearray()), 0)
275 self
.assertEqual(f
.seek(-6, 1), 5)
276 self
.assertEqual(f
.read(5), b
" worl")
277 self
.assertEqual(f
.tell(), 10)
278 self
.assertRaises(TypeError, f
.seek
, 0.0)
281 self
.assertEqual(f
.read(), b
"hello world\n")
283 self
.assertEqual(f
.read(), b
"world\n")
284 self
.assertEqual(f
.read(), b
"")
288 def large_file_ops(self
, f
):
291 self
.assertEqual(f
.seek(self
.LARGE
), self
.LARGE
)
292 self
.assertEqual(f
.tell(), self
.LARGE
)
293 self
.assertEqual(f
.write(b
"xxx"), 3)
294 self
.assertEqual(f
.tell(), self
.LARGE
+ 3)
295 self
.assertEqual(f
.seek(-1, 1), self
.LARGE
+ 2)
296 self
.assertEqual(f
.truncate(), self
.LARGE
+ 2)
297 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
298 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 2)
299 self
.assertEqual(f
.truncate(self
.LARGE
+ 1), self
.LARGE
+ 1)
300 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
301 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 1)
302 self
.assertEqual(f
.seek(-1, 2), self
.LARGE
)
303 self
.assertEqual(f
.read(2), b
"x")
305 def test_invalid_operations(self
):
306 # Try writing on a file opened in read mode and vice-versa.
307 for mode
in ("w", "wb"):
308 with self
.open(support
.TESTFN
, mode
) as fp
:
309 self
.assertRaises(IOError, fp
.read
)
310 self
.assertRaises(IOError, fp
.readline
)
311 with self
.open(support
.TESTFN
, "rb") as fp
:
312 self
.assertRaises(IOError, fp
.write
, b
"blah")
313 self
.assertRaises(IOError, fp
.writelines
, [b
"blah\n"])
314 with self
.open(support
.TESTFN
, "r") as fp
:
315 self
.assertRaises(IOError, fp
.write
, "blah")
316 self
.assertRaises(IOError, fp
.writelines
, ["blah\n"])
318 def test_raw_file_io(self
):
319 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
320 self
.assertEqual(f
.readable(), False)
321 self
.assertEqual(f
.writable(), True)
322 self
.assertEqual(f
.seekable(), True)
324 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
325 self
.assertEqual(f
.readable(), True)
326 self
.assertEqual(f
.writable(), False)
327 self
.assertEqual(f
.seekable(), True)
330 def test_buffered_file_io(self
):
331 with self
.open(support
.TESTFN
, "wb") as f
:
332 self
.assertEqual(f
.readable(), False)
333 self
.assertEqual(f
.writable(), True)
334 self
.assertEqual(f
.seekable(), True)
336 with self
.open(support
.TESTFN
, "rb") as f
:
337 self
.assertEqual(f
.readable(), True)
338 self
.assertEqual(f
.writable(), False)
339 self
.assertEqual(f
.seekable(), True)
340 self
.read_ops(f
, True)
342 def test_readline(self
):
343 with self
.open(support
.TESTFN
, "wb") as f
:
344 f
.write(b
"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
345 with self
.open(support
.TESTFN
, "rb") as f
:
346 self
.assertEqual(f
.readline(), b
"abc\n")
347 self
.assertEqual(f
.readline(10), b
"def\n")
348 self
.assertEqual(f
.readline(2), b
"xy")
349 self
.assertEqual(f
.readline(4), b
"zzy\n")
350 self
.assertEqual(f
.readline(), b
"foo\x00bar\n")
351 self
.assertEqual(f
.readline(None), b
"another line")
352 self
.assertRaises(TypeError, f
.readline
, 5.3)
353 with self
.open(support
.TESTFN
, "r") as f
:
354 self
.assertRaises(TypeError, f
.readline
, 5.3)
356 def test_raw_bytes_io(self
):
360 self
.assertEqual(data
, b
"hello world\n")
361 f
= self
.BytesIO(data
)
362 self
.read_ops(f
, True)
364 def test_large_file_ops(self
):
365 # On Windows and Mac OSX this test comsumes large resources; It takes
366 # a long time to build the >2GB file and takes >2GB of disk space
367 # therefore the resource must be enabled to run this test.
368 if sys
.platform
[:3] == 'win' or sys
.platform
== 'darwin':
369 if not support
.is_resource_enabled("largefile"):
370 print("\nTesting large file ops skipped on %s." % sys
.platform
,
372 print("It requires %d bytes and a long time." % self
.LARGE
,
374 print("Use 'regrtest.py -u largefile test_io' to run it.",
377 with self
.open(support
.TESTFN
, "w+b", 0) as f
:
378 self
.large_file_ops(f
)
379 with self
.open(support
.TESTFN
, "w+b") as f
:
380 self
.large_file_ops(f
)
382 def test_with_open(self
):
383 for bufsize
in (0, 1, 100):
385 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
387 self
.assertEqual(f
.closed
, True)
390 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
392 except ZeroDivisionError:
393 self
.assertEqual(f
.closed
, True)
395 self
.fail("1 // 0 didn't raise an exception")
398 def test_append_mode_tell(self
):
399 with self
.open(support
.TESTFN
, "wb") as f
:
401 with self
.open(support
.TESTFN
, "ab", buffering
=0) as f
:
402 self
.assertEqual(f
.tell(), 3)
403 with self
.open(support
.TESTFN
, "ab") as f
:
404 self
.assertEqual(f
.tell(), 3)
405 with self
.open(support
.TESTFN
, "a") as f
:
406 self
.assertTrue(f
.tell() > 0)
408 def test_destructor(self
):
410 class MyFileIO(self
.FileIO
):
414 f
= super(MyFileIO
, self
).__del
__
415 except AttributeError:
421 super(MyFileIO
, self
).close()
424 super(MyFileIO
, self
).flush()
425 f
= MyFileIO(support
.TESTFN
, "wb")
429 self
.assertEqual(record
, [1, 2, 3])
430 with self
.open(support
.TESTFN
, "rb") as f
:
431 self
.assertEqual(f
.read(), b
"xxx")
433 def _check_base_destructor(self
, base
):
437 # This exercises the availability of attributes on object
439 # (in the C version, close() is called by the tp_dealloc
440 # function, not by __del__)
445 record
.append(self
.on_del
)
447 f
= super(MyIO
, self
).__del
__
448 except AttributeError:
453 record
.append(self
.on_close
)
454 super(MyIO
, self
).close()
456 record
.append(self
.on_flush
)
457 super(MyIO
, self
).flush()
461 self
.assertEqual(record
, [1, 2, 3])
463 def test_IOBase_destructor(self
):
464 self
._check
_base
_destructor
(self
.IOBase
)
466 def test_RawIOBase_destructor(self
):
467 self
._check
_base
_destructor
(self
.RawIOBase
)
469 def test_BufferedIOBase_destructor(self
):
470 self
._check
_base
_destructor
(self
.BufferedIOBase
)
472 def test_TextIOBase_destructor(self
):
473 self
._check
_base
_destructor
(self
.TextIOBase
)
475 def test_close_flushes(self
):
476 with self
.open(support
.TESTFN
, "wb") as f
:
478 with self
.open(support
.TESTFN
, "rb") as f
:
479 self
.assertEqual(f
.read(), b
"xxx")
481 def test_array_writes(self
):
482 a
= array
.array(b
'i', range(10))
483 n
= len(a
.tostring())
484 with self
.open(support
.TESTFN
, "wb", 0) as f
:
485 self
.assertEqual(f
.write(a
), n
)
486 with self
.open(support
.TESTFN
, "wb") as f
:
487 self
.assertEqual(f
.write(a
), n
)
489 def test_closefd(self
):
490 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, 'w',
493 def test_read_closed(self
):
494 with self
.open(support
.TESTFN
, "w") as f
:
496 with self
.open(support
.TESTFN
, "r") as f
:
497 file = self
.open(f
.fileno(), "r", closefd
=False)
498 self
.assertEqual(file.read(), "egg\n")
501 self
.assertRaises(ValueError, file.read
)
503 def test_no_closefd_with_filename(self
):
504 # can't use closefd in combination with a file name
505 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, "r", closefd
=False)
507 def test_closefd_attr(self
):
508 with self
.open(support
.TESTFN
, "wb") as f
:
510 with self
.open(support
.TESTFN
, "r") as f
:
511 self
.assertEqual(f
.buffer.raw
.closefd
, True)
512 file = self
.open(f
.fileno(), "r", closefd
=False)
513 self
.assertEqual(file.buffer.raw
.closefd
, False)
515 def test_garbage_collection(self
):
516 # FileIO objects are collected, and collecting them flushes
518 f
= self
.FileIO(support
.TESTFN
, "wb")
524 self
.assertTrue(wr() is None, wr
)
525 with self
.open(support
.TESTFN
, "rb") as f
:
526 self
.assertEqual(f
.read(), b
"abcxxx")
528 def test_unbounded_file(self
):
529 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
531 if not os
.path
.exists(zero
):
532 self
.skipTest("{0} does not exist".format(zero
))
533 if sys
.maxsize
> 0x7FFFFFFF:
534 self
.skipTest("test can only run in a 32-bit address space")
535 if support
.real_max_memuse
< support
._2G
:
536 self
.skipTest("test requires at least 2GB of memory")
537 with self
.open(zero
, "rb", buffering
=0) as f
:
538 self
.assertRaises(OverflowError, f
.read
)
539 with self
.open(zero
, "rb") as f
:
540 self
.assertRaises(OverflowError, f
.read
)
541 with self
.open(zero
, "r") as f
:
542 self
.assertRaises(OverflowError, f
.read
)
544 def test_flush_error_on_close(self
):
545 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
549 self
.assertRaises(IOError, f
.close
) # exception not swallowed
551 def test_multi_close(self
):
552 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
556 self
.assertRaises(ValueError, f
.flush
)
558 class CIOTest(IOTest
):
561 class PyIOTest(IOTest
):
562 test_array_writes
= unittest
.skip(
563 "len(array.array) returns number of elements rather than bytelength"
564 )(IOTest
.test_array_writes
)
567 class CommonBufferedTests
:
568 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
570 def test_detach(self
):
571 raw
= self
.MockRawIO()
573 self
.assertIs(buf
.detach(), raw
)
574 self
.assertRaises(ValueError, buf
.detach
)
576 def test_fileno(self
):
577 rawio
= self
.MockRawIO()
578 bufio
= self
.tp(rawio
)
580 self
.assertEquals(42, bufio
.fileno())
582 def test_no_fileno(self
):
583 # XXX will we always have fileno() function? If so, kill
584 # this test. Else, write it.
587 def test_invalid_args(self
):
588 rawio
= self
.MockRawIO()
589 bufio
= self
.tp(rawio
)
591 self
.assertRaises(ValueError, bufio
.seek
, 0, -1)
592 self
.assertRaises(ValueError, bufio
.seek
, 0, 3)
594 def test_override_destructor(self
):
597 class MyBufferedIO(tp
):
601 f
= super(MyBufferedIO
, self
).__del
__
602 except AttributeError:
608 super(MyBufferedIO
, self
).close()
611 super(MyBufferedIO
, self
).flush()
612 rawio
= self
.MockRawIO()
613 bufio
= MyBufferedIO(rawio
)
614 writable
= bufio
.writable()
618 self
.assertEqual(record
, [1, 2, 3])
620 self
.assertEqual(record
, [1, 2])
622 def test_context_manager(self
):
623 # Test usability as a context manager
624 rawio
= self
.MockRawIO()
625 bufio
= self
.tp(rawio
)
630 # bufio should now be closed, and using it a second time should raise
632 self
.assertRaises(ValueError, _with
)
634 def test_error_through_destructor(self
):
635 # Test that the exception state is not modified by a destructor,
636 # even if close() fails.
637 rawio
= self
.CloseFailureIO()
640 with support
.captured_output("stderr") as s
:
641 self
.assertRaises(AttributeError, f
)
642 s
= s
.getvalue().strip()
644 # The destructor *may* have printed an unraisable error, check it
645 self
.assertEqual(len(s
.splitlines()), 1)
646 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
647 self
.assertTrue(s
.endswith(" ignored"), s
)
650 raw
= self
.MockRawIO()
652 clsname
= "%s.%s" % (self
.tp
.__module
__, self
.tp
.__name
__)
653 self
.assertEqual(repr(b
), "<%s>" % clsname
)
655 self
.assertEqual(repr(b
), "<%s name=u'dummy'>" % clsname
)
657 self
.assertEqual(repr(b
), "<%s name='dummy'>" % clsname
)
659 def test_flush_error_on_close(self
):
660 raw
= self
.MockRawIO()
663 raw
.flush
= bad_flush
665 self
.assertRaises(IOError, b
.close
) # exception not swallowed
667 def test_multi_close(self
):
668 raw
= self
.MockRawIO()
673 self
.assertRaises(ValueError, b
.flush
)
676 class BufferedReaderTest(unittest
.TestCase
, CommonBufferedTests
):
679 def test_constructor(self
):
680 rawio
= self
.MockRawIO([b
"abc"])
681 bufio
= self
.tp(rawio
)
682 bufio
.__init
__(rawio
)
683 bufio
.__init
__(rawio
, buffer_size
=1024)
684 bufio
.__init
__(rawio
, buffer_size
=16)
685 self
.assertEquals(b
"abc", bufio
.read())
686 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
687 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
688 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
689 rawio
= self
.MockRawIO([b
"abc"])
690 bufio
.__init
__(rawio
)
691 self
.assertEquals(b
"abc", bufio
.read())
694 for arg
in (None, 7):
695 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
696 bufio
= self
.tp(rawio
)
697 self
.assertEquals(b
"abcdefg", bufio
.read(arg
))
699 self
.assertRaises(ValueError, bufio
.read
, -2)
701 def test_read1(self
):
702 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
703 bufio
= self
.tp(rawio
)
704 self
.assertEquals(b
"a", bufio
.read(1))
705 self
.assertEquals(b
"b", bufio
.read1(1))
706 self
.assertEquals(rawio
._reads
, 1)
707 self
.assertEquals(b
"c", bufio
.read1(100))
708 self
.assertEquals(rawio
._reads
, 1)
709 self
.assertEquals(b
"d", bufio
.read1(100))
710 self
.assertEquals(rawio
._reads
, 2)
711 self
.assertEquals(b
"efg", bufio
.read1(100))
712 self
.assertEquals(rawio
._reads
, 3)
713 self
.assertEquals(b
"", bufio
.read1(100))
714 self
.assertEquals(rawio
._reads
, 4)
716 self
.assertRaises(ValueError, bufio
.read1
, -1)
718 def test_readinto(self
):
719 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
720 bufio
= self
.tp(rawio
)
722 self
.assertEquals(bufio
.readinto(b
), 2)
723 self
.assertEquals(b
, b
"ab")
724 self
.assertEquals(bufio
.readinto(b
), 2)
725 self
.assertEquals(b
, b
"cd")
726 self
.assertEquals(bufio
.readinto(b
), 2)
727 self
.assertEquals(b
, b
"ef")
728 self
.assertEquals(bufio
.readinto(b
), 1)
729 self
.assertEquals(b
, b
"gf")
730 self
.assertEquals(bufio
.readinto(b
), 0)
731 self
.assertEquals(b
, b
"gf")
733 def test_readlines(self
):
735 rawio
= self
.MockRawIO((b
"abc\n", b
"d\n", b
"ef"))
736 return self
.tp(rawio
)
737 self
.assertEquals(bufio().readlines(), [b
"abc\n", b
"d\n", b
"ef"])
738 self
.assertEquals(bufio().readlines(5), [b
"abc\n", b
"d\n"])
739 self
.assertEquals(bufio().readlines(None), [b
"abc\n", b
"d\n", b
"ef"])
741 def test_buffering(self
):
746 [ 100, [ 3, 1, 4, 8 ], [ dlen
, 0 ] ],
747 [ 100, [ 3, 3, 3], [ dlen
] ],
748 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
751 for bufsize
, buf_read_sizes
, raw_read_sizes
in tests
:
752 rawio
= self
.MockFileIO(data
)
753 bufio
= self
.tp(rawio
, buffer_size
=bufsize
)
755 for nbytes
in buf_read_sizes
:
756 self
.assertEquals(bufio
.read(nbytes
), data
[pos
:pos
+nbytes
])
758 # this is mildly implementation-dependent
759 self
.assertEquals(rawio
.read_history
, raw_read_sizes
)
761 def test_read_non_blocking(self
):
762 # Inject some None's in there to simulate EWOULDBLOCK
763 rawio
= self
.MockRawIO((b
"abc", b
"d", None, b
"efg", None, None, None))
764 bufio
= self
.tp(rawio
)
766 self
.assertEquals(b
"abcd", bufio
.read(6))
767 self
.assertEquals(b
"e", bufio
.read(1))
768 self
.assertEquals(b
"fg", bufio
.read())
769 self
.assertEquals(b
"", bufio
.peek(1))
770 self
.assertTrue(None is bufio
.read())
771 self
.assertEquals(b
"", bufio
.read())
773 def test_read_past_eof(self
):
774 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
775 bufio
= self
.tp(rawio
)
777 self
.assertEquals(b
"abcdefg", bufio
.read(9000))
779 def test_read_all(self
):
780 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
781 bufio
= self
.tp(rawio
)
783 self
.assertEquals(b
"abcdefg", bufio
.read())
785 @unittest.skipUnless(threading
, 'Threading required for this test.')
786 def test_threads(self
):
788 # Write out many bytes with exactly the same number of 0's,
789 # 1's... 255's. This will help us check that concurrent reading
790 # doesn't duplicate or forget contents.
792 l
= list(range(256)) * N
794 s
= bytes(bytearray(l
))
795 with self
.open(support
.TESTFN
, "wb") as f
:
797 with self
.open(support
.TESTFN
, self
.read_mode
, buffering
=0) as raw
:
798 bufio
= self
.tp(raw
, 8)
803 # Intra-buffer read then buffer-flushing read
804 for n
in cycle([1, 19]):
808 # list.append() is atomic
810 except Exception as e
:
813 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
816 time
.sleep(0.02) # yield
819 self
.assertFalse(errors
,
820 "the following exceptions were caught: %r" % errors
)
821 s
= b
''.join(results
)
823 c
= bytes(bytearray([i
]))
824 self
.assertEqual(s
.count(c
), N
)
826 support
.unlink(support
.TESTFN
)
828 def test_misbehaved_io(self
):
829 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
830 bufio
= self
.tp(rawio
)
831 self
.assertRaises(IOError, bufio
.seek
, 0)
832 self
.assertRaises(IOError, bufio
.tell
)
834 class CBufferedReaderTest(BufferedReaderTest
):
835 tp
= io
.BufferedReader
837 def test_constructor(self
):
838 BufferedReaderTest
.test_constructor(self
)
839 # The allocation can succeed on 32-bit builds, e.g. with more
840 # than 2GB RAM and a 64-bit kernel.
841 if sys
.maxsize
> 0x7FFFFFFF:
842 rawio
= self
.MockRawIO()
843 bufio
= self
.tp(rawio
)
844 self
.assertRaises((OverflowError, MemoryError, ValueError),
845 bufio
.__init
__, rawio
, sys
.maxsize
)
847 def test_initialization(self
):
848 rawio
= self
.MockRawIO([b
"abc"])
849 bufio
= self
.tp(rawio
)
850 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
851 self
.assertRaises(ValueError, bufio
.read
)
852 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
853 self
.assertRaises(ValueError, bufio
.read
)
854 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
855 self
.assertRaises(ValueError, bufio
.read
)
857 def test_misbehaved_io_read(self
):
858 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
859 bufio
= self
.tp(rawio
)
860 # _pyio.BufferedReader seems to implement reading different, so that
861 # checking this is not so easy.
862 self
.assertRaises(IOError, bufio
.read
, 10)
864 def test_garbage_collection(self
):
865 # C BufferedReader objects are collected.
866 # The Python version has __del__, so it ends into gc.garbage instead
867 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
873 self
.assertTrue(wr() is None, wr
)
875 class PyBufferedReaderTest(BufferedReaderTest
):
876 tp
= pyio
.BufferedReader
879 class BufferedWriterTest(unittest
.TestCase
, CommonBufferedTests
):
882 def test_constructor(self
):
883 rawio
= self
.MockRawIO()
884 bufio
= self
.tp(rawio
)
885 bufio
.__init
__(rawio
)
886 bufio
.__init
__(rawio
, buffer_size
=1024)
887 bufio
.__init
__(rawio
, buffer_size
=16)
888 self
.assertEquals(3, bufio
.write(b
"abc"))
890 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
891 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
892 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
893 bufio
.__init
__(rawio
)
894 self
.assertEquals(3, bufio
.write(b
"ghi"))
896 self
.assertEquals(b
"".join(rawio
._write
_stack
), b
"abcghi")
898 def test_detach_flush(self
):
899 raw
= self
.MockRawIO()
902 self
.assertFalse(raw
._write
_stack
)
904 self
.assertEqual(raw
._write
_stack
, [b
"howdy!"])
906 def test_write(self
):
907 # Write to the buffered IO but don't overflow the buffer.
908 writer
= self
.MockRawIO()
909 bufio
= self
.tp(writer
, 8)
911 self
.assertFalse(writer
._write
_stack
)
913 def test_write_overflow(self
):
914 writer
= self
.MockRawIO()
915 bufio
= self
.tp(writer
, 8)
916 contents
= b
"abcdefghijklmnop"
917 for n
in range(0, len(contents
), 3):
918 bufio
.write(contents
[n
:n
+3])
919 flushed
= b
"".join(writer
._write
_stack
)
920 # At least (total - 8) bytes were implicitly flushed, perhaps more
921 # depending on the implementation.
922 self
.assertTrue(flushed
.startswith(contents
[:-8]), flushed
)
924 def check_writes(self
, intermediate_func
):
925 # Lots of writes, test the flushed output is as expected.
926 contents
= bytes(range(256)) * 1000
928 writer
= self
.MockRawIO()
929 bufio
= self
.tp(writer
, 13)
930 # Generator of write sizes: repeat each N 15 times then proceed to N+1
932 for size
in count(1):
936 while n
< len(contents
):
937 size
= min(next(sizes
), len(contents
) - n
)
938 self
.assertEquals(bufio
.write(contents
[n
:n
+size
]), size
)
939 intermediate_func(bufio
)
942 self
.assertEquals(contents
,
943 b
"".join(writer
._write
_stack
))
945 def test_writes(self
):
946 self
.check_writes(lambda bufio
: None)
948 def test_writes_and_flushes(self
):
949 self
.check_writes(lambda bufio
: bufio
.flush())
951 def test_writes_and_seeks(self
):
954 bufio
.seek(pos
+ 1, 0)
955 bufio
.seek(pos
- 1, 0)
957 self
.check_writes(_seekabs
)
959 pos
= bufio
.seek(0, 1)
963 self
.check_writes(_seekrel
)
965 def test_writes_and_truncates(self
):
966 self
.check_writes(lambda bufio
: bufio
.truncate(bufio
.tell()))
968 def test_write_non_blocking(self
):
969 raw
= self
.MockNonBlockWriterIO()
970 bufio
= self
.tp(raw
, 8)
972 self
.assertEquals(bufio
.write(b
"abcd"), 4)
973 self
.assertEquals(bufio
.write(b
"efghi"), 5)
974 # 1 byte will be written, the rest will be buffered
976 self
.assertEquals(bufio
.write(b
"jklmn"), 5)
978 # 8 bytes will be written, 8 will be buffered and the rest will be lost
981 bufio
.write(b
"opqrwxyz0123456789")
982 except self
.BlockingIOError
as e
:
983 written
= e
.characters_written
985 self
.fail("BlockingIOError should have been raised")
986 self
.assertEquals(written
, 16)
987 self
.assertEquals(raw
.pop_written(),
988 b
"abcdefghijklmnopqrwxyz")
990 self
.assertEquals(bufio
.write(b
"ABCDEFGHI"), 9)
991 s
= raw
.pop_written()
992 # Previously buffered bytes were flushed
993 self
.assertTrue(s
.startswith(b
"01234567A"), s
)
995 def test_write_and_rewind(self
):
997 bufio
= self
.tp(raw
, 4)
998 self
.assertEqual(bufio
.write(b
"abcdef"), 6)
999 self
.assertEqual(bufio
.tell(), 6)
1001 self
.assertEqual(bufio
.write(b
"XY"), 2)
1003 self
.assertEqual(raw
.getvalue(), b
"XYcdef")
1004 self
.assertEqual(bufio
.write(b
"123456"), 6)
1006 self
.assertEqual(raw
.getvalue(), b
"XYcdef123456")
1008 def test_flush(self
):
1009 writer
= self
.MockRawIO()
1010 bufio
= self
.tp(writer
, 8)
1013 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
1015 def test_destructor(self
):
1016 writer
= self
.MockRawIO()
1017 bufio
= self
.tp(writer
, 8)
1020 support
.gc_collect()
1021 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
1023 def test_truncate(self
):
1024 # Truncate implicitly flushes the buffer.
1025 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1026 bufio
= self
.tp(raw
, 8)
1027 bufio
.write(b
"abcdef")
1028 self
.assertEqual(bufio
.truncate(3), 3)
1029 self
.assertEqual(bufio
.tell(), 6)
1030 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
1031 self
.assertEqual(f
.read(), b
"abc")
1033 @unittest.skipUnless(threading
, 'Threading required for this test.')
1034 def test_threads(self
):
1036 # Write out many bytes from many threads and test they were
1039 contents
= bytes(range(256)) * N
1040 sizes
= cycle([1, 19])
1043 while n
< len(contents
):
1045 queue
.append(contents
[n
:n
+size
])
1048 # We use a real file object because it allows us to
1049 # exercise situations where the GIL is released before
1050 # writing the buffer to the raw streams. This is in addition
1051 # to concurrency issues due to switching threads in the middle
1053 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1054 bufio
= self
.tp(raw
, 8)
1064 except Exception as e
:
1067 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
1070 time
.sleep(0.02) # yield
1073 self
.assertFalse(errors
,
1074 "the following exceptions were caught: %r" % errors
)
1076 with self
.open(support
.TESTFN
, "rb") as f
:
1078 for i
in range(256):
1079 self
.assertEquals(s
.count(bytes([i
])), N
)
1081 support
.unlink(support
.TESTFN
)
1083 def test_misbehaved_io(self
):
1084 rawio
= self
.MisbehavedRawIO()
1085 bufio
= self
.tp(rawio
, 5)
1086 self
.assertRaises(IOError, bufio
.seek
, 0)
1087 self
.assertRaises(IOError, bufio
.tell
)
1088 self
.assertRaises(IOError, bufio
.write
, b
"abcdef")
1090 def test_max_buffer_size_deprecation(self
):
1091 with support
.check_warnings(("max_buffer_size is deprecated",
1092 DeprecationWarning)):
1093 self
.tp(self
.MockRawIO(), 8, 12)
1096 class CBufferedWriterTest(BufferedWriterTest
):
1097 tp
= io
.BufferedWriter
1099 def test_constructor(self
):
1100 BufferedWriterTest
.test_constructor(self
)
1101 # The allocation can succeed on 32-bit builds, e.g. with more
1102 # than 2GB RAM and a 64-bit kernel.
1103 if sys
.maxsize
> 0x7FFFFFFF:
1104 rawio
= self
.MockRawIO()
1105 bufio
= self
.tp(rawio
)
1106 self
.assertRaises((OverflowError, MemoryError, ValueError),
1107 bufio
.__init
__, rawio
, sys
.maxsize
)
1109 def test_initialization(self
):
1110 rawio
= self
.MockRawIO()
1111 bufio
= self
.tp(rawio
)
1112 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
1113 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1114 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
1115 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1116 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
1117 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1119 def test_garbage_collection(self
):
1120 # C BufferedWriter objects are collected, and collecting them flushes
1122 # The Python version has __del__, so it ends into gc.garbage instead
1123 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
1129 support
.gc_collect()
1130 self
.assertTrue(wr() is None, wr
)
1131 with self
.open(support
.TESTFN
, "rb") as f
:
1132 self
.assertEqual(f
.read(), b
"123xxx")
1135 class PyBufferedWriterTest(BufferedWriterTest
):
1136 tp
= pyio
.BufferedWriter
1138 class BufferedRWPairTest(unittest
.TestCase
):
1140 def test_constructor(self
):
1141 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1142 self
.assertFalse(pair
.closed
)
1144 def test_detach(self
):
1145 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1146 self
.assertRaises(self
.UnsupportedOperation
, pair
.detach
)
1148 def test_constructor_max_buffer_size_deprecation(self
):
1149 with support
.check_warnings(("max_buffer_size is deprecated",
1150 DeprecationWarning)):
1151 self
.tp(self
.MockRawIO(), self
.MockRawIO(), 8, 12)
1153 def test_constructor_with_not_readable(self
):
1154 class NotReadable(MockRawIO
):
1158 self
.assertRaises(IOError, self
.tp
, NotReadable(), self
.MockRawIO())
1160 def test_constructor_with_not_writeable(self
):
1161 class NotWriteable(MockRawIO
):
1165 self
.assertRaises(IOError, self
.tp
, self
.MockRawIO(), NotWriteable())
1167 def test_read(self
):
1168 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1170 self
.assertEqual(pair
.read(3), b
"abc")
1171 self
.assertEqual(pair
.read(1), b
"d")
1172 self
.assertEqual(pair
.read(), b
"ef")
1173 pair
= self
.tp(self
.BytesIO(b
"abc"), self
.MockRawIO())
1174 self
.assertEqual(pair
.read(None), b
"abc")
1176 def test_readlines(self
):
1177 pair
= lambda: self
.tp(self
.BytesIO(b
"abc\ndef\nh"), self
.MockRawIO())
1178 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1179 self
.assertEqual(pair().readlines(), [b
"abc\n", b
"def\n", b
"h"])
1180 self
.assertEqual(pair().readlines(5), [b
"abc\n", b
"def\n"])
1182 def test_read1(self
):
1183 # .read1() is delegated to the underlying reader object, so this test
1185 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1187 self
.assertEqual(pair
.read1(3), b
"abc")
1189 def test_readinto(self
):
1190 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1193 self
.assertEqual(pair
.readinto(data
), 5)
1194 self
.assertEqual(data
, b
"abcde")
1196 def test_write(self
):
1197 w
= self
.MockRawIO()
1198 pair
= self
.tp(self
.MockRawIO(), w
)
1204 self
.assertEqual(w
._write
_stack
, [b
"abc", b
"def"])
1206 def test_peek(self
):
1207 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1209 self
.assertTrue(pair
.peek(3).startswith(b
"abc"))
1210 self
.assertEqual(pair
.read(3), b
"abc")
1212 def test_readable(self
):
1213 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1214 self
.assertTrue(pair
.readable())
1216 def test_writeable(self
):
1217 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1218 self
.assertTrue(pair
.writable())
1220 def test_seekable(self
):
1221 # BufferedRWPairs are never seekable, even if their readers and writers
1223 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1224 self
.assertFalse(pair
.seekable())
1226 # .flush() is delegated to the underlying writer object and has been
1227 # tested in the test_write method.
1229 def test_close_and_closed(self
):
1230 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1231 self
.assertFalse(pair
.closed
)
1233 self
.assertTrue(pair
.closed
)
1235 def test_isatty(self
):
1236 class SelectableIsAtty(MockRawIO
):
1237 def __init__(self
, isatty
):
1238 MockRawIO
.__init
__(self
)
1239 self
._isatty
= isatty
1244 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1245 self
.assertFalse(pair
.isatty())
1247 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1248 self
.assertTrue(pair
.isatty())
1250 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1251 self
.assertTrue(pair
.isatty())
1253 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1254 self
.assertTrue(pair
.isatty())
1256 class CBufferedRWPairTest(BufferedRWPairTest
):
1257 tp
= io
.BufferedRWPair
1259 class PyBufferedRWPairTest(BufferedRWPairTest
):
1260 tp
= pyio
.BufferedRWPair
1263 class BufferedRandomTest(BufferedReaderTest
, BufferedWriterTest
):
1267 def test_constructor(self
):
1268 BufferedReaderTest
.test_constructor(self
)
1269 BufferedWriterTest
.test_constructor(self
)
1271 def test_read_and_write(self
):
1272 raw
= self
.MockRawIO((b
"asdf", b
"ghjk"))
1273 rw
= self
.tp(raw
, 8)
1275 self
.assertEqual(b
"as", rw
.read(2))
1278 self
.assertFalse(raw
._write
_stack
) # Buffer writes
1279 self
.assertEqual(b
"ghjk", rw
.read())
1280 self
.assertEquals(b
"dddeee", raw
._write
_stack
[0])
1282 def test_seek_and_tell(self
):
1283 raw
= self
.BytesIO(b
"asdfghjkl")
1286 self
.assertEquals(b
"as", rw
.read(2))
1287 self
.assertEquals(2, rw
.tell())
1289 self
.assertEquals(b
"asdf", rw
.read(4))
1293 self
.assertEquals(b
"asdfasdfl", rw
.read())
1294 self
.assertEquals(9, rw
.tell())
1296 self
.assertEquals(5, rw
.tell())
1298 self
.assertEquals(7, rw
.tell())
1299 self
.assertEquals(b
"fl", rw
.read(11))
1300 self
.assertRaises(TypeError, rw
.seek
, 0.0)
1302 def check_flush_and_read(self
, read_func
):
1303 raw
= self
.BytesIO(b
"abcdefghi")
1304 bufio
= self
.tp(raw
)
1306 self
.assertEquals(b
"ab", read_func(bufio
, 2))
1308 self
.assertEquals(b
"ef", read_func(bufio
, 2))
1309 self
.assertEquals(6, bufio
.tell())
1311 self
.assertEquals(6, bufio
.tell())
1312 self
.assertEquals(b
"ghi", read_func(bufio
))
1315 # flush() resets the read buffer
1318 self
.assertEquals(b
"XYZ", read_func(bufio
, 3))
1320 def test_flush_and_read(self
):
1321 self
.check_flush_and_read(lambda bufio
, *args
: bufio
.read(*args
))
1323 def test_flush_and_readinto(self
):
1324 def _readinto(bufio
, n
=-1):
1325 b
= bytearray(n
if n
>= 0 else 9999)
1326 n
= bufio
.readinto(b
)
1328 self
.check_flush_and_read(_readinto
)
1330 def test_flush_and_peek(self
):
1331 def _peek(bufio
, n
=-1):
1332 # This relies on the fact that the buffer can contain the whole
1333 # raw stream, otherwise peek() can return less.
1337 bufio
.seek(len(b
), 1)
1339 self
.check_flush_and_read(_peek
)
1341 def test_flush_and_write(self
):
1342 raw
= self
.BytesIO(b
"abcdefghi")
1343 bufio
= self
.tp(raw
)
1350 self
.assertEquals(b
"12345fghi", raw
.getvalue())
1351 self
.assertEquals(b
"12345fghi", bufio
.read())
1353 def test_threads(self
):
1354 BufferedReaderTest
.test_threads(self
)
1355 BufferedWriterTest
.test_threads(self
)
1357 def test_writes_and_peek(self
):
1360 self
.check_writes(_peek
)
1366 self
.check_writes(_peek
)
1368 def test_writes_and_reads(self
):
1372 self
.check_writes(_read
)
1374 def test_writes_and_read1s(self
):
1378 self
.check_writes(_read1
)
1380 def test_writes_and_readintos(self
):
1383 bufio
.readinto(bytearray(1))
1384 self
.check_writes(_read
)
1386 def test_write_after_readahead(self
):
1387 # Issue #6629: writing after the buffer was filled by readahead should
1388 # first rewind the raw stream.
1389 for overwrite_size
in [1, 5]:
1390 raw
= self
.BytesIO(b
"A" * 10)
1391 bufio
= self
.tp(raw
, 4)
1393 self
.assertEqual(bufio
.read(1), b
"A")
1394 self
.assertEqual(bufio
.tell(), 1)
1395 # Overwriting should rewind the raw stream if it needs so
1396 bufio
.write(b
"B" * overwrite_size
)
1397 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1398 # If the write size was smaller than the buffer size, flush() and
1399 # check that rewind happens.
1401 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1404 b
"A" + b
"B" * overwrite_size
+ b
"A" * (9 - overwrite_size
))
1406 def test_truncate_after_read_or_write(self
):
1407 raw
= self
.BytesIO(b
"A" * 10)
1408 bufio
= self
.tp(raw
, 100)
1409 self
.assertEqual(bufio
.read(2), b
"AA") # the read buffer gets filled
1410 self
.assertEqual(bufio
.truncate(), 2)
1411 self
.assertEqual(bufio
.write(b
"BB"), 2) # the write buffer increases
1412 self
.assertEqual(bufio
.truncate(), 4)
1414 def test_misbehaved_io(self
):
1415 BufferedReaderTest
.test_misbehaved_io(self
)
1416 BufferedWriterTest
.test_misbehaved_io(self
)
1418 class CBufferedRandomTest(CBufferedReaderTest
, CBufferedWriterTest
, BufferedRandomTest
):
1419 tp
= io
.BufferedRandom
1421 def test_constructor(self
):
1422 BufferedRandomTest
.test_constructor(self
)
1423 # The allocation can succeed on 32-bit builds, e.g. with more
1424 # than 2GB RAM and a 64-bit kernel.
1425 if sys
.maxsize
> 0x7FFFFFFF:
1426 rawio
= self
.MockRawIO()
1427 bufio
= self
.tp(rawio
)
1428 self
.assertRaises((OverflowError, MemoryError, ValueError),
1429 bufio
.__init
__, rawio
, sys
.maxsize
)
1431 def test_garbage_collection(self
):
1432 CBufferedReaderTest
.test_garbage_collection(self
)
1433 CBufferedWriterTest
.test_garbage_collection(self
)
1435 class PyBufferedRandomTest(BufferedRandomTest
):
1436 tp
= pyio
.BufferedRandom
1439 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1441 # - A single output character can correspond to many bytes of input.
1442 # - The number of input bytes to complete the character can be
1443 # undetermined until the last input byte is received.
1444 # - The number of input bytes can vary depending on previous input.
1445 # - A single input byte can correspond to many characters of output.
1446 # - The number of output characters can be undetermined until the
1447 # last input byte is received.
1448 # - The number of output characters can vary depending on previous input.
1450 class StatefulIncrementalDecoder(codecs
.IncrementalDecoder
):
1452 For testing seek/tell behavior with a stateful, buffering decoder.
1454 Input is a sequence of words. Words may be fixed-length (length set
1455 by input) or variable-length (period-terminated). In variable-length
1456 mode, extra periods are ignored. Possible words are:
1457 - 'i' followed by a number sets the input length, I (maximum 99).
1458 When I is set to 0, words are space-terminated.
1459 - 'o' followed by a number sets the output length, O (maximum 99).
1460 - Any other word is converted into a word followed by a period on
1461 the output. The output word consists of the input word truncated
1462 or padded out with hyphens to make its length equal to O. If O
1463 is 0, the word is output verbatim without truncating or padding.
1464 I and O are initially set to 1. When I changes, any buffered input is
1465 re-scanned according to the new I. EOF also terminates the last word.
1468 def __init__(self
, errors
='strict'):
1469 codecs
.IncrementalDecoder
.__init
__(self
, errors
)
1473 return '<SID %x>' % id(self
)
1478 self
.buffer = bytearray()
1481 i
, o
= self
.i ^
1, self
.o ^
1 # so that flags = 0 after reset()
1482 return bytes(self
.buffer), i
*100 + o
1484 def setstate(self
, state
):
1486 self
.buffer = bytearray(buffer)
1487 i
, o
= divmod(io
, 100)
1488 self
.i
, self
.o
= i ^
1, o ^
1
1490 def decode(self
, input, final
=False):
1493 if self
.i
== 0: # variable-length, terminated with period
1496 output
+= self
.process_word()
1498 self
.buffer.append(b
)
1499 else: # fixed-length, terminate after self.i bytes
1500 self
.buffer.append(b
)
1501 if len(self
.buffer) == self
.i
:
1502 output
+= self
.process_word()
1503 if final
and self
.buffer: # EOF terminates the last word
1504 output
+= self
.process_word()
1507 def process_word(self
):
1509 if self
.buffer[0] == ord('i'):
1510 self
.i
= min(99, int(self
.buffer[1:] or 0)) # set input length
1511 elif self
.buffer[0] == ord('o'):
1512 self
.o
= min(99, int(self
.buffer[1:] or 0)) # set output length
1514 output
= self
.buffer.decode('ascii')
1515 if len(output
) < self
.o
:
1516 output
+= '-'*self
.o
# pad out with hyphens
1518 output
= output
[:self
.o
] # truncate to output length
1520 self
.buffer = bytearray()
1523 codecEnabled
= False
1526 def lookupTestDecoder(cls
, name
):
1527 if cls
.codecEnabled
and name
== 'test_decoder':
1528 latin1
= codecs
.lookup('latin-1')
1529 return codecs
.CodecInfo(
1530 name
='test_decoder', encode
=latin1
.encode
, decode
=None,
1531 incrementalencoder
=None,
1532 streamreader
=None, streamwriter
=None,
1533 incrementaldecoder
=cls
)
1535 # Register the previous decoder for testing.
1536 # Disabled by default, tests will enable it.
1537 codecs
.register(StatefulIncrementalDecoder
.lookupTestDecoder
)
1540 class StatefulIncrementalDecoderTest(unittest
.TestCase
):
1542 Make sure the StatefulIncrementalDecoder actually works.
1546 # I=1, O=1 (fixed-length input == fixed-length output)
1547 (b
'abcd', False, 'a.b.c.d.'),
1548 # I=0, O=0 (variable-length input, variable-length output)
1549 (b
'oiabcd', True, 'abcd.'),
1550 # I=0, O=0 (should ignore extra periods)
1551 (b
'oi...abcd...', True, 'abcd.'),
1552 # I=0, O=6 (variable-length input, fixed-length output)
1553 (b
'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1554 # I=2, O=6 (fixed-length input < fixed-length output)
1555 (b
'i.i2.o6xyz', True, 'xy----.z-----.'),
1556 # I=6, O=3 (fixed-length input > fixed-length output)
1557 (b
'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1558 # I=0, then 3; O=29, then 15 (with longer output)
1559 (b
'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1560 'a----------------------------.' +
1561 'b----------------------------.' +
1562 'cde--------------------------.' +
1563 'abcdefghijabcde.' +
1564 'a.b------------.' +
1565 '.c.------------.' +
1566 'd.e------------.' +
1567 'k--------------.' +
1568 'l--------------.' +
1572 def test_decoder(self
):
1573 # Try a few one-shot test cases.
1574 for input, eof
, output
in self
.test_cases
:
1575 d
= StatefulIncrementalDecoder()
1576 self
.assertEquals(d
.decode(input, eof
), output
)
1578 # Also test an unfinished decode, followed by forcing EOF.
1579 d
= StatefulIncrementalDecoder()
1580 self
.assertEquals(d
.decode(b
'oiabcd'), '')
1581 self
.assertEquals(d
.decode(b
'', 1), 'abcd.')
1583 class TextIOWrapperTest(unittest
.TestCase
):
1586 self
.testdata
= b
"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1587 self
.normalized
= b
"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1588 support
.unlink(support
.TESTFN
)
1591 support
.unlink(support
.TESTFN
)
1593 def test_constructor(self
):
1594 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
1595 b
= self
.BufferedReader(r
, 1000)
1596 t
= self
.TextIOWrapper(b
)
1597 t
.__init
__(b
, encoding
="latin1", newline
="\r\n")
1598 self
.assertEquals(t
.encoding
, "latin1")
1599 self
.assertEquals(t
.line_buffering
, False)
1600 t
.__init
__(b
, encoding
="utf8", line_buffering
=True)
1601 self
.assertEquals(t
.encoding
, "utf8")
1602 self
.assertEquals(t
.line_buffering
, True)
1603 self
.assertEquals("\xe9\n", t
.readline())
1604 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
1605 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
1607 def test_detach(self
):
1609 b
= self
.BufferedWriter(r
)
1610 t
= self
.TextIOWrapper(b
)
1611 self
.assertIs(t
.detach(), b
)
1613 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1615 self
.assertFalse(r
.getvalue())
1617 self
.assertEqual(r
.getvalue(), b
"howdy")
1618 self
.assertRaises(ValueError, t
.detach
)
1620 def test_repr(self
):
1621 raw
= self
.BytesIO("hello".encode("utf-8"))
1622 b
= self
.BufferedReader(raw
)
1623 t
= self
.TextIOWrapper(b
, encoding
="utf-8")
1624 modname
= self
.TextIOWrapper
.__module
__
1625 self
.assertEqual(repr(t
),
1626 "<%s.TextIOWrapper encoding='utf-8'>" % modname
)
1628 self
.assertEqual(repr(t
),
1629 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname
)
1631 self
.assertEqual(repr(t
),
1632 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname
)
1634 def test_line_buffering(self
):
1636 b
= self
.BufferedWriter(r
, 1000)
1637 t
= self
.TextIOWrapper(b
, newline
="\n", line_buffering
=True)
1639 self
.assertEquals(r
.getvalue(), b
"") # No flush happened
1641 self
.assertEquals(r
.getvalue(), b
"XY\nZ") # All got flushed
1643 self
.assertEquals(r
.getvalue(), b
"XY\nZA\rB")
1645 def test_encoding(self
):
1646 # Check the encoding attribute is always set, and valid
1648 t
= self
.TextIOWrapper(b
, encoding
="utf8")
1649 self
.assertEqual(t
.encoding
, "utf8")
1650 t
= self
.TextIOWrapper(b
)
1651 self
.assertTrue(t
.encoding
is not None)
1652 codecs
.lookup(t
.encoding
)
1654 def test_encoding_errors_reading(self
):
1656 b
= self
.BytesIO(b
"abc\n\xff\n")
1657 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1658 self
.assertRaises(UnicodeError, t
.read
)
1659 # (2) explicit strict
1660 b
= self
.BytesIO(b
"abc\n\xff\n")
1661 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1662 self
.assertRaises(UnicodeError, t
.read
)
1664 b
= self
.BytesIO(b
"abc\n\xff\n")
1665 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore")
1666 self
.assertEquals(t
.read(), "abc\n\n")
1668 b
= self
.BytesIO(b
"abc\n\xff\n")
1669 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace")
1670 self
.assertEquals(t
.read(), "abc\n\ufffd\n")
1672 def test_encoding_errors_writing(self
):
1675 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1676 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1677 # (2) explicit strict
1679 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1680 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1683 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore",
1685 t
.write("abc\xffdef\n")
1687 self
.assertEquals(b
.getvalue(), b
"abcdef\n")
1690 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace",
1692 t
.write("abc\xffdef\n")
1694 self
.assertEquals(b
.getvalue(), b
"abc?def\n")
1696 def test_newlines(self
):
1697 input_lines
= [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1700 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1701 [ '', input_lines
],
1702 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1703 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1704 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1708 'utf-16', 'utf-16-le', 'utf-16-be',
1709 'utf-32', 'utf-32-le', 'utf-32-be',
1712 # Try a range of buffer sizes to test the case where \r is the last
1713 # character in TextIOWrapper._pending_line.
1714 for encoding
in encodings
:
1715 # XXX: str.encode() should return bytes
1716 data
= bytes(''.join(input_lines
).encode(encoding
))
1717 for do_reads
in (False, True):
1718 for bufsize
in range(1, 10):
1719 for newline
, exp_lines
in tests
:
1720 bufio
= self
.BufferedReader(self
.BytesIO(data
), bufsize
)
1721 textio
= self
.TextIOWrapper(bufio
, newline
=newline
,
1729 self
.assertEquals(len(c2
), 2)
1730 got_lines
.append(c2
+ textio
.readline())
1732 got_lines
= list(textio
)
1734 for got_line
, exp_line
in zip(got_lines
, exp_lines
):
1735 self
.assertEquals(got_line
, exp_line
)
1736 self
.assertEquals(len(got_lines
), len(exp_lines
))
1738 def test_newlines_input(self
):
1739 testdata
= b
"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1740 normalized
= testdata
.replace(b
"\r\n", b
"\n").replace(b
"\r", b
"\n")
1741 for newline
, expected
in [
1742 (None, normalized
.decode("ascii").splitlines(True)),
1743 ("", testdata
.decode("ascii").splitlines(True)),
1744 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1745 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1746 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1748 buf
= self
.BytesIO(testdata
)
1749 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1750 self
.assertEquals(txt
.readlines(), expected
)
1752 self
.assertEquals(txt
.read(), "".join(expected
))
1754 def test_newlines_output(self
):
1756 "": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1757 "\n": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1758 "\r": b
"AAA\rBBB\rCCC\rX\rY\r\rZ",
1759 "\r\n": b
"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1761 tests
= [(None, testdict
[os
.linesep
])] + sorted(testdict
.items())
1762 for newline
, expected
in tests
:
1763 buf
= self
.BytesIO()
1764 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1766 txt
.write("BB\nCCC\n")
1767 txt
.write("X\rY\r\nZ")
1769 self
.assertEquals(buf
.closed
, False)
1770 self
.assertEquals(buf
.getvalue(), expected
)
1772 def test_destructor(self
):
1775 class MyBytesIO(base
):
1777 l
.append(self
.getvalue())
1780 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1783 support
.gc_collect()
1784 self
.assertEquals([b
"abc"], l
)
1786 def test_override_destructor(self
):
1788 class MyTextIO(self
.TextIOWrapper
):
1792 f
= super(MyTextIO
, self
).__del
__
1793 except AttributeError:
1799 super(MyTextIO
, self
).close()
1802 super(MyTextIO
, self
).flush()
1804 t
= MyTextIO(b
, encoding
="ascii")
1806 support
.gc_collect()
1807 self
.assertEqual(record
, [1, 2, 3])
1809 def test_error_through_destructor(self
):
1810 # Test that the exception state is not modified by a destructor,
1811 # even if close() fails.
1812 rawio
= self
.CloseFailureIO()
1814 self
.TextIOWrapper(rawio
).xyzzy
1815 with support
.captured_output("stderr") as s
:
1816 self
.assertRaises(AttributeError, f
)
1817 s
= s
.getvalue().strip()
1819 # The destructor *may* have printed an unraisable error, check it
1820 self
.assertEqual(len(s
.splitlines()), 1)
1821 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
1822 self
.assertTrue(s
.endswith(" ignored"), s
)
1824 # Systematic tests of the text I/O API
1826 def test_basic_io(self
):
1827 for chunksize
in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1828 for enc
in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1829 f
= self
.open(support
.TESTFN
, "w+", encoding
=enc
)
1830 f
._CHUNK
_SIZE
= chunksize
1831 self
.assertEquals(f
.write("abc"), 3)
1833 f
= self
.open(support
.TESTFN
, "r+", encoding
=enc
)
1834 f
._CHUNK
_SIZE
= chunksize
1835 self
.assertEquals(f
.tell(), 0)
1836 self
.assertEquals(f
.read(), "abc")
1838 self
.assertEquals(f
.seek(0), 0)
1839 self
.assertEquals(f
.read(None), "abc")
1841 self
.assertEquals(f
.read(2), "ab")
1842 self
.assertEquals(f
.read(1), "c")
1843 self
.assertEquals(f
.read(1), "")
1844 self
.assertEquals(f
.read(), "")
1845 self
.assertEquals(f
.tell(), cookie
)
1846 self
.assertEquals(f
.seek(0), 0)
1847 self
.assertEquals(f
.seek(0, 2), cookie
)
1848 self
.assertEquals(f
.write("def"), 3)
1849 self
.assertEquals(f
.seek(cookie
), cookie
)
1850 self
.assertEquals(f
.read(), "def")
1851 if enc
.startswith("utf"):
1852 self
.multi_line_test(f
, enc
)
1855 def multi_line_test(self
, f
, enc
):
1858 sample
= "s\xff\u0fff\uffff"
1860 for size
in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1862 for i
in range(size
):
1863 chars
.append(sample
[i
% len(sample
)])
1864 line
= "".join(chars
) + "\n"
1865 wlines
.append((f
.tell(), line
))
1874 rlines
.append((pos
, line
))
1875 self
.assertEquals(rlines
, wlines
)
1877 def test_telling(self
):
1878 f
= self
.open(support
.TESTFN
, "w+", encoding
="utf8")
1885 self
.assertEquals(f
.tell(), p0
)
1886 self
.assertEquals(f
.readline(), "\xff\n")
1887 self
.assertEquals(f
.tell(), p1
)
1888 self
.assertEquals(f
.readline(), "\xff\n")
1889 self
.assertEquals(f
.tell(), p2
)
1892 self
.assertEquals(line
, "\xff\n")
1893 self
.assertRaises(IOError, f
.tell
)
1894 self
.assertEquals(f
.tell(), p2
)
1897 def test_seeking(self
):
1898 chunk_size
= _default_chunk_size()
1899 prefix_size
= chunk_size
- 2
1900 u_prefix
= "a" * prefix_size
1901 prefix
= bytes(u_prefix
.encode("utf-8"))
1902 self
.assertEquals(len(u_prefix
), len(prefix
))
1903 u_suffix
= "\u8888\n"
1904 suffix
= bytes(u_suffix
.encode("utf-8"))
1905 line
= prefix
+ suffix
1906 f
= self
.open(support
.TESTFN
, "wb")
1909 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1910 s
= f
.read(prefix_size
)
1911 self
.assertEquals(s
, prefix
.decode("ascii"))
1912 self
.assertEquals(f
.tell(), prefix_size
)
1913 self
.assertEquals(f
.readline(), u_suffix
)
1915 def test_seeking_too(self
):
1916 # Regression test for a specific bug
1917 data
= b
'\xe0\xbf\xbf\n'
1918 f
= self
.open(support
.TESTFN
, "wb")
1921 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1922 f
._CHUNK
_SIZE
# Just test that it exists
1927 def test_seek_and_tell(self
):
1928 #Test seek/tell using the StatefulIncrementalDecoder.
1929 # Make test faster by doing smaller seeks
1932 def test_seek_and_tell_with_data(data
, min_pos
=0):
1933 """Tell/seek to various points within a data stream and ensure
1934 that the decoded data returned by read() is consistent."""
1935 f
= self
.open(support
.TESTFN
, 'wb')
1938 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1939 f
._CHUNK
_SIZE
= CHUNK_SIZE
1943 for i
in range(min_pos
, len(decoded
) + 1): # seek positions
1944 for j
in [1, 5, len(decoded
) - i
]: # read lengths
1945 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1946 self
.assertEquals(f
.read(i
), decoded
[:i
])
1948 self
.assertEquals(f
.read(j
), decoded
[i
:i
+ j
])
1950 self
.assertEquals(f
.read(), decoded
[i
:])
1953 # Enable the test decoder.
1954 StatefulIncrementalDecoder
.codecEnabled
= 1
1958 # Try each test case.
1959 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1960 test_seek_and_tell_with_data(input)
1962 # Position each test case so that it crosses a chunk boundary.
1963 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1964 offset
= CHUNK_SIZE
- len(input)//2
1965 prefix
= b
'.'*offset
1966 # Don't bother seeking into the prefix (takes too long).
1968 test_seek_and_tell_with_data(prefix
+ input, min_pos
)
1970 # Ensure our test decoder won't interfere with subsequent tests.
1972 StatefulIncrementalDecoder
.codecEnabled
= 0
1974 def test_encoded_writes(self
):
1982 for encoding
in tests
:
1983 buf
= self
.BytesIO()
1984 f
= self
.TextIOWrapper(buf
, encoding
=encoding
)
1985 # Check if the BOM is written only once (see issue1753).
1989 self
.assertEquals(f
.read(), data
* 2)
1991 self
.assertEquals(f
.read(), data
* 2)
1992 self
.assertEquals(buf
.getvalue(), (data
* 2).encode(encoding
))
1994 def test_unreadable(self
):
1995 class UnReadable(self
.BytesIO
):
1998 txt
= self
.TextIOWrapper(UnReadable())
1999 self
.assertRaises(IOError, txt
.read
)
2001 def test_read_one_by_one(self
):
2002 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\r\nBB"))
2009 self
.assertEquals(reads
, "AA\nBB")
2011 def test_readlines(self
):
2012 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\nBB\nCC"))
2013 self
.assertEqual(txt
.readlines(), ["AA\n", "BB\n", "CC"])
2015 self
.assertEqual(txt
.readlines(None), ["AA\n", "BB\n", "CC"])
2017 self
.assertEqual(txt
.readlines(5), ["AA\n", "BB\n"])
2019 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2020 def test_read_by_chunk(self
):
2021 # make sure "\r\n" straddles 128 char boundary.
2022 txt
= self
.TextIOWrapper(self
.BytesIO(b
"A" * 127 + b
"\r\nB"))
2029 self
.assertEquals(reads
, "A"*127+"\nB")
2031 def test_issue1395_1(self
):
2032 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2034 # read one char at a time
2041 self
.assertEquals(reads
, self
.normalized
)
2043 def test_issue1395_2(self
):
2044 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2053 self
.assertEquals(reads
, self
.normalized
)
2055 def test_issue1395_3(self
):
2056 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2060 reads
+= txt
.read(4)
2061 reads
+= txt
.readline()
2062 reads
+= txt
.readline()
2063 reads
+= txt
.readline()
2064 self
.assertEquals(reads
, self
.normalized
)
2066 def test_issue1395_4(self
):
2067 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2072 self
.assertEquals(reads
, self
.normalized
)
2074 def test_issue1395_5(self
):
2075 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2082 self
.assertEquals(txt
.read(4), "BBB\n")
2084 def test_issue2282(self
):
2085 buffer = self
.BytesIO(self
.testdata
)
2086 txt
= self
.TextIOWrapper(buffer, encoding
="ascii")
2088 self
.assertEqual(buffer.seekable(), txt
.seekable())
2090 @unittest.skip("Issue #6213 with incremental encoders")
2091 def test_append_bom(self
):
2092 # The BOM is not written again when appending to a non-empty file
2093 filename
= support
.TESTFN
2094 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2095 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2098 with self
.open(filename
, 'rb') as f
:
2099 self
.assertEquals(f
.read(), 'aaa'.encode(charset
))
2101 with self
.open(filename
, 'a', encoding
=charset
) as f
:
2103 with self
.open(filename
, 'rb') as f
:
2104 self
.assertEquals(f
.read(), 'aaaxxx'.encode(charset
))
2106 @unittest.skip("Issue #6213 with incremental encoders")
2107 def test_seek_bom(self
):
2108 # Same test, but when seeking manually
2109 filename
= support
.TESTFN
2110 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2111 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2114 with self
.open(filename
, 'r+', encoding
=charset
) as f
:
2119 with self
.open(filename
, 'rb') as f
:
2120 self
.assertEquals(f
.read(), 'bbbzzz'.encode(charset
))
2122 def test_errors_property(self
):
2123 with self
.open(support
.TESTFN
, "w") as f
:
2124 self
.assertEqual(f
.errors
, "strict")
2125 with self
.open(support
.TESTFN
, "w", errors
="replace") as f
:
2126 self
.assertEqual(f
.errors
, "replace")
2128 @unittest.skipUnless(threading
, 'Threading required for this test.')
2129 def test_threads_write(self
):
2130 # Issue6750: concurrent writes could duplicate data
2131 event
= threading
.Event()
2132 with self
.open(support
.TESTFN
, "w", buffering
=1) as f
:
2134 text
= "Thread%03d\n" % n
2137 threads
= [threading
.Thread(target
=lambda n
=x
: run(n
))
2145 with self
.open(support
.TESTFN
) as f
:
2148 self
.assertEquals(content
.count("Thread%03d\n" % n
), 1)
2150 def test_flush_error_on_close(self
):
2151 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2154 txt
.flush
= bad_flush
2155 self
.assertRaises(IOError, txt
.close
) # exception not swallowed
2157 def test_multi_close(self
):
2158 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2162 self
.assertRaises(ValueError, txt
.flush
)
2164 class CTextIOWrapperTest(TextIOWrapperTest
):
2166 def test_initialization(self
):
2167 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
2168 b
= self
.BufferedReader(r
, 1000)
2169 t
= self
.TextIOWrapper(b
)
2170 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
2171 self
.assertRaises(ValueError, t
.read
)
2172 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
2173 self
.assertRaises(ValueError, t
.read
)
2175 def test_garbage_collection(self
):
2176 # C TextIOWrapper objects are collected, and collecting them flushes
2178 # The Python version has __del__, so it ends in gc.garbage instead.
2179 rawio
= io
.FileIO(support
.TESTFN
, "wb")
2180 b
= self
.BufferedWriter(rawio
)
2181 t
= self
.TextIOWrapper(b
, encoding
="ascii")
2186 support
.gc_collect()
2187 self
.assertTrue(wr() is None, wr
)
2188 with self
.open(support
.TESTFN
, "rb") as f
:
2189 self
.assertEqual(f
.read(), b
"456def")
2191 class PyTextIOWrapperTest(TextIOWrapperTest
):
2195 class IncrementalNewlineDecoderTest(unittest
.TestCase
):
2197 def check_newline_decoding_utf8(self
, decoder
):
2198 # UTF-8 specific tests for a newline decoder
2199 def _check_decode(b
, s
, **kwargs
):
2200 # We exercise getstate() / setstate() as well as decode()
2201 state
= decoder
.getstate()
2202 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2203 decoder
.setstate(state
)
2204 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2206 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2208 _check_decode(b
'\xe8', "")
2209 _check_decode(b
'\xa2', "")
2210 _check_decode(b
'\x88', "\u8888")
2212 _check_decode(b
'\xe8', "")
2213 _check_decode(b
'\xa2', "")
2214 _check_decode(b
'\x88', "\u8888")
2216 _check_decode(b
'\xe8', "")
2217 self
.assertRaises(UnicodeDecodeError, decoder
.decode
, b
'', final
=True)
2220 _check_decode(b
'\n', "\n")
2221 _check_decode(b
'\r', "")
2222 _check_decode(b
'', "\n", final
=True)
2223 _check_decode(b
'\r', "\n", final
=True)
2225 _check_decode(b
'\r', "")
2226 _check_decode(b
'a', "\na")
2228 _check_decode(b
'\r\r\n', "\n\n")
2229 _check_decode(b
'\r', "")
2230 _check_decode(b
'\r', "\n")
2231 _check_decode(b
'\na', "\na")
2233 _check_decode(b
'\xe8\xa2\x88\r\n', "\u8888\n")
2234 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2235 _check_decode(b
'\n', "\n")
2236 _check_decode(b
'\xe8\xa2\x88\r', "\u8888")
2237 _check_decode(b
'\n', "\n")
2239 def check_newline_decoding(self
, decoder
, encoding
):
2241 if encoding
is not None:
2242 encoder
= codecs
.getincrementalencoder(encoding
)()
2243 def _decode_bytewise(s
):
2244 # Decode one byte at a time
2245 for b
in encoder
.encode(s
):
2246 result
.append(decoder
.decode(b
))
2249 def _decode_bytewise(s
):
2250 # Decode one char at a time
2252 result
.append(decoder
.decode(c
))
2253 self
.assertEquals(decoder
.newlines
, None)
2254 _decode_bytewise("abc\n\r")
2255 self
.assertEquals(decoder
.newlines
, '\n')
2256 _decode_bytewise("\nabc")
2257 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2258 _decode_bytewise("abc\r")
2259 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2260 _decode_bytewise("abc")
2261 self
.assertEquals(decoder
.newlines
, ('\r', '\n', '\r\n'))
2262 _decode_bytewise("abc\r")
2263 self
.assertEquals("".join(result
), "abc\n\nabcabc\nabcabc")
2266 if encoder
is not None:
2268 input = encoder
.encode(input)
2269 self
.assertEquals(decoder
.decode(input), "abc")
2270 self
.assertEquals(decoder
.newlines
, None)
2272 def test_newline_decoder(self
):
2274 # None meaning the IncrementalNewlineDecoder takes unicode input
2275 # rather than bytes input
2276 None, 'utf-8', 'latin-1',
2277 'utf-16', 'utf-16-le', 'utf-16-be',
2278 'utf-32', 'utf-32-le', 'utf-32-be',
2280 for enc
in encodings
:
2281 decoder
= enc
and codecs
.getincrementaldecoder(enc
)()
2282 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2283 self
.check_newline_decoding(decoder
, enc
)
2284 decoder
= codecs
.getincrementaldecoder("utf-8")()
2285 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2286 self
.check_newline_decoding_utf8(decoder
)
2288 def test_newline_bytes(self
):
2289 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2291 self
.assertEquals(dec
.newlines
, None)
2292 self
.assertEquals(dec
.decode("\u0D00"), "\u0D00")
2293 self
.assertEquals(dec
.newlines
, None)
2294 self
.assertEquals(dec
.decode("\u0A00"), "\u0A00")
2295 self
.assertEquals(dec
.newlines
, None)
2296 dec
= self
.IncrementalNewlineDecoder(None, translate
=False)
2298 dec
= self
.IncrementalNewlineDecoder(None, translate
=True)
2301 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2304 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2308 # XXX Tests for open()
2310 class MiscIOTest(unittest
.TestCase
):
2313 support
.unlink(support
.TESTFN
)
2315 def test___all__(self
):
2316 for name
in self
.io
.__all
__:
2317 obj
= getattr(self
.io
, name
, None)
2318 self
.assertTrue(obj
is not None, name
)
2321 elif "error" in name
.lower() or name
== "UnsupportedOperation":
2322 self
.assertTrue(issubclass(obj
, Exception), name
)
2323 elif not name
.startswith("SEEK_"):
2324 self
.assertTrue(issubclass(obj
, self
.IOBase
))
2326 def test_attributes(self
):
2327 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
2328 self
.assertEquals(f
.mode
, "wb")
2331 f
= self
.open(support
.TESTFN
, "U")
2332 self
.assertEquals(f
.name
, support
.TESTFN
)
2333 self
.assertEquals(f
.buffer.name
, support
.TESTFN
)
2334 self
.assertEquals(f
.buffer.raw
.name
, support
.TESTFN
)
2335 self
.assertEquals(f
.mode
, "U")
2336 self
.assertEquals(f
.buffer.mode
, "rb")
2337 self
.assertEquals(f
.buffer.raw
.mode
, "rb")
2340 f
= self
.open(support
.TESTFN
, "w+")
2341 self
.assertEquals(f
.mode
, "w+")
2342 self
.assertEquals(f
.buffer.mode
, "rb+") # Does it really matter?
2343 self
.assertEquals(f
.buffer.raw
.mode
, "rb+")
2345 g
= self
.open(f
.fileno(), "wb", closefd
=False)
2346 self
.assertEquals(g
.mode
, "wb")
2347 self
.assertEquals(g
.raw
.mode
, "wb")
2348 self
.assertEquals(g
.name
, f
.fileno())
2349 self
.assertEquals(g
.raw
.name
, f
.fileno())
2353 def test_io_after_close(self
):
2357 {"mode": "w", "buffering": 1},
2358 {"mode": "w", "buffering": 2},
2359 {"mode": "wb", "buffering": 0},
2362 {"mode": "r", "buffering": 1},
2363 {"mode": "r", "buffering": 2},
2364 {"mode": "rb", "buffering": 0},
2367 {"mode": "w+", "buffering": 1},
2368 {"mode": "w+", "buffering": 2},
2369 {"mode": "w+b", "buffering": 0},
2371 f
= self
.open(support
.TESTFN
, **kwargs
)
2373 self
.assertRaises(ValueError, f
.flush
)
2374 self
.assertRaises(ValueError, f
.fileno
)
2375 self
.assertRaises(ValueError, f
.isatty
)
2376 self
.assertRaises(ValueError, f
.__iter
__)
2377 if hasattr(f
, "peek"):
2378 self
.assertRaises(ValueError, f
.peek
, 1)
2379 self
.assertRaises(ValueError, f
.read
)
2380 if hasattr(f
, "read1"):
2381 self
.assertRaises(ValueError, f
.read1
, 1024)
2382 if hasattr(f
, "readinto"):
2383 self
.assertRaises(ValueError, f
.readinto
, bytearray(1024))
2384 self
.assertRaises(ValueError, f
.readline
)
2385 self
.assertRaises(ValueError, f
.readlines
)
2386 self
.assertRaises(ValueError, f
.seek
, 0)
2387 self
.assertRaises(ValueError, f
.tell
)
2388 self
.assertRaises(ValueError, f
.truncate
)
2389 self
.assertRaises(ValueError, f
.write
,
2390 b
"" if "b" in kwargs
['mode'] else "")
2391 self
.assertRaises(ValueError, f
.writelines
, [])
2392 self
.assertRaises(ValueError, next
, f
)
2394 def test_blockingioerror(self
):
2395 # Various BlockingIOError issues
2396 self
.assertRaises(TypeError, self
.BlockingIOError
)
2397 self
.assertRaises(TypeError, self
.BlockingIOError
, 1)
2398 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, 2, 3, 4)
2399 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, "", None)
2400 b
= self
.BlockingIOError(1, "")
2401 self
.assertEqual(b
.characters_written
, 0)
2405 b
= self
.BlockingIOError(1, c
)
2410 support
.gc_collect()
2411 self
.assertTrue(wr() is None, wr
)
2413 def test_abcs(self
):
2414 # Test the visible base classes are ABCs.
2415 self
.assertIsInstance(self
.IOBase
, abc
.ABCMeta
)
2416 self
.assertIsInstance(self
.RawIOBase
, abc
.ABCMeta
)
2417 self
.assertIsInstance(self
.BufferedIOBase
, abc
.ABCMeta
)
2418 self
.assertIsInstance(self
.TextIOBase
, abc
.ABCMeta
)
2420 def _check_abc_inheritance(self
, abcmodule
):
2421 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
2422 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2423 self
.assertIsInstance(f
, abcmodule
.RawIOBase
)
2424 self
.assertNotIsInstance(f
, abcmodule
.BufferedIOBase
)
2425 self
.assertNotIsInstance(f
, abcmodule
.TextIOBase
)
2426 with self
.open(support
.TESTFN
, "wb") as f
:
2427 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2428 self
.assertNotIsInstance(f
, abcmodule
.RawIOBase
)
2429 self
.assertIsInstance(f
, abcmodule
.BufferedIOBase
)
2430 self
.assertNotIsInstance(f
, abcmodule
.TextIOBase
)
2431 with self
.open(support
.TESTFN
, "w") as f
:
2432 self
.assertIsInstance(f
, abcmodule
.IOBase
)
2433 self
.assertNotIsInstance(f
, abcmodule
.RawIOBase
)
2434 self
.assertNotIsInstance(f
, abcmodule
.BufferedIOBase
)
2435 self
.assertIsInstance(f
, abcmodule
.TextIOBase
)
2437 def test_abc_inheritance(self
):
2438 # Test implementations inherit from their respective ABCs
2439 self
._check
_abc
_inheritance
(self
)
2441 def test_abc_inheritance_official(self
):
2442 # Test implementations inherit from the official ABCs of the
2443 # baseline "io" module.
2444 self
._check
_abc
_inheritance
(io
)
2446 class CMiscIOTest(MiscIOTest
):
2449 class PyMiscIOTest(MiscIOTest
):
2453 tests
= (CIOTest
, PyIOTest
,
2454 CBufferedReaderTest
, PyBufferedReaderTest
,
2455 CBufferedWriterTest
, PyBufferedWriterTest
,
2456 CBufferedRWPairTest
, PyBufferedRWPairTest
,
2457 CBufferedRandomTest
, PyBufferedRandomTest
,
2458 StatefulIncrementalDecoderTest
,
2459 CIncrementalNewlineDecoderTest
, PyIncrementalNewlineDecoderTest
,
2460 CTextIOWrapperTest
, PyTextIOWrapperTest
,
2461 CMiscIOTest
, PyMiscIOTest
,
2464 # Put the namespaces of the IO module we are testing and some useful mock
2465 # classes in the __dict__ of each test.
2466 mocks
= (MockRawIO
, MisbehavedRawIO
, MockFileIO
, CloseFailureIO
,
2467 MockNonBlockWriterIO
)
2468 all_members
= io
.__all
__ + ["IncrementalNewlineDecoder"]
2469 c_io_ns
= dict((name
, getattr(io
, name
)) for name
in all_members
)
2470 py_io_ns
= dict((name
, getattr(pyio
, name
)) for name
in all_members
)
2472 c_io_ns
.update((x
.__name
__, globs
["C" + x
.__name
__]) for x
in mocks
)
2473 py_io_ns
.update((x
.__name
__, globs
["Py" + x
.__name
__]) for x
in mocks
)
2474 # Avoid turning open into a bound method.
2475 py_io_ns
["open"] = pyio
.OpenWrapper
2477 if test
.__name
__.startswith("C"):
2478 for name
, obj
in c_io_ns
.items():
2479 setattr(test
, name
, obj
)
2480 elif test
.__name
__.startswith("Py"):
2481 for name
, obj
in py_io_ns
.items():
2482 setattr(test
, name
, obj
)
2484 support
.run_unittest(*tests
)
2486 if __name__
== "__main__":