1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__
import print_function
23 from __future__
import unicode_literals
36 from itertools
import chain
, cycle
, count
37 from collections
import deque
38 from test
import test_support
as support
41 import io
# C implementation of io
42 import _pyio
as pyio
# Python implementation of io
45 bytes
= support
.py3k_bytes
47 def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io
.open(__file__
, "r", encoding
="latin1") as f
:
55 def __init__(self
, read_stack
=()):
56 self
._read
_stack
= list(read_stack
)
57 self
._write
_stack
= []
60 def read(self
, n
=None):
63 return self
._read
_stack
.pop(0)
68 self
._write
_stack
.append(bytes(b
))
83 def seek(self
, pos
, whence
):
84 return 0 # wrong but we gotta return something
87 return 0 # same comment as above
89 def readinto(self
, buf
):
93 data
= self
._read
_stack
[0]
97 del self
._read
_stack
[0]
100 if len(data
) <= max_len
:
101 del self
._read
_stack
[0]
105 buf
[:] = data
[:max_len
]
106 self
._read
_stack
[0] = data
[max_len
:]
109 def truncate(self
, pos
=None):
112 class CMockRawIO(MockRawIO
, io
.RawIOBase
):
115 class PyMockRawIO(MockRawIO
, pyio
.RawIOBase
):
119 class MisbehavedRawIO(MockRawIO
):
121 return MockRawIO
.write(self
, b
) * 2
123 def read(self
, n
=None):
124 return MockRawIO
.read(self
, n
) * 2
126 def seek(self
, pos
, whence
):
132 def readinto(self
, buf
):
133 MockRawIO
.readinto(self
, buf
)
136 class CMisbehavedRawIO(MisbehavedRawIO
, io
.RawIOBase
):
139 class PyMisbehavedRawIO(MisbehavedRawIO
, pyio
.RawIOBase
):
143 class CloseFailureIO(MockRawIO
):
151 class CCloseFailureIO(CloseFailureIO
, io
.RawIOBase
):
154 class PyCloseFailureIO(CloseFailureIO
, pyio
.RawIOBase
):
160 def __init__(self
, data
):
161 self
.read_history
= []
162 super(MockFileIO
, self
).__init
__(data
)
164 def read(self
, n
=None):
165 res
= super(MockFileIO
, self
).read(n
)
166 self
.read_history
.append(None if res
is None else len(res
))
169 def readinto(self
, b
):
170 res
= super(MockFileIO
, self
).readinto(b
)
171 self
.read_history
.append(res
)
174 class CMockFileIO(MockFileIO
, io
.BytesIO
):
177 class PyMockFileIO(MockFileIO
, pyio
.BytesIO
):
181 class MockNonBlockWriterIO
:
184 self
._write
_stack
= []
185 self
._blocker
_char
= None
187 def pop_written(self
):
188 s
= b
"".join(self
._write
_stack
)
189 self
._write
_stack
[:] = []
192 def block_on(self
, char
):
193 """Block when a given char is encountered."""
194 self
._blocker
_char
= char
208 if self
._blocker
_char
:
210 n
= b
.index(self
._blocker
_char
)
214 self
._blocker
_char
= None
215 self
._write
_stack
.append(b
[:n
])
216 raise self
.BlockingIOError(0, "test blocking", n
)
217 self
._write
_stack
.append(b
)
220 class CMockNonBlockWriterIO(MockNonBlockWriterIO
, io
.RawIOBase
):
221 BlockingIOError
= io
.BlockingIOError
223 class PyMockNonBlockWriterIO(MockNonBlockWriterIO
, pyio
.RawIOBase
):
224 BlockingIOError
= pyio
.BlockingIOError
227 class IOTest(unittest
.TestCase
):
230 support
.unlink(support
.TESTFN
)
233 support
.unlink(support
.TESTFN
)
235 def write_ops(self
, f
):
236 self
.assertEqual(f
.write(b
"blah."), 5)
237 self
.assertEqual(f
.seek(0), 0)
238 self
.assertEqual(f
.write(b
"Hello."), 6)
239 self
.assertEqual(f
.tell(), 6)
240 self
.assertEqual(f
.seek(-1, 1), 5)
241 self
.assertEqual(f
.tell(), 5)
242 self
.assertEqual(f
.write(bytearray(b
" world\n\n\n")), 9)
243 self
.assertEqual(f
.seek(0), 0)
244 self
.assertEqual(f
.write(b
"h"), 1)
245 self
.assertEqual(f
.seek(-1, 2), 13)
246 self
.assertEqual(f
.tell(), 13)
247 self
.assertEqual(f
.truncate(12), 12)
248 self
.assertEqual(f
.tell(), 12)
249 self
.assertRaises(TypeError, f
.seek
, 0.0)
251 def read_ops(self
, f
, buffered
=False):
253 self
.assertEqual(data
, b
"hello")
254 data
= bytearray(data
)
255 self
.assertEqual(f
.readinto(data
), 5)
256 self
.assertEqual(data
, b
" worl")
257 self
.assertEqual(f
.readinto(data
), 2)
258 self
.assertEqual(len(data
), 5)
259 self
.assertEqual(data
[:2], b
"d\n")
260 self
.assertEqual(f
.seek(0), 0)
261 self
.assertEqual(f
.read(20), b
"hello world\n")
262 self
.assertEqual(f
.read(1), b
"")
263 self
.assertEqual(f
.readinto(bytearray(b
"x")), 0)
264 self
.assertEqual(f
.seek(-6, 2), 6)
265 self
.assertEqual(f
.read(5), b
"world")
266 self
.assertEqual(f
.read(0), b
"")
267 self
.assertEqual(f
.readinto(bytearray()), 0)
268 self
.assertEqual(f
.seek(-6, 1), 5)
269 self
.assertEqual(f
.read(5), b
" worl")
270 self
.assertEqual(f
.tell(), 10)
271 self
.assertRaises(TypeError, f
.seek
, 0.0)
274 self
.assertEqual(f
.read(), b
"hello world\n")
276 self
.assertEqual(f
.read(), b
"world\n")
277 self
.assertEqual(f
.read(), b
"")
281 def large_file_ops(self
, f
):
284 self
.assertEqual(f
.seek(self
.LARGE
), self
.LARGE
)
285 self
.assertEqual(f
.tell(), self
.LARGE
)
286 self
.assertEqual(f
.write(b
"xxx"), 3)
287 self
.assertEqual(f
.tell(), self
.LARGE
+ 3)
288 self
.assertEqual(f
.seek(-1, 1), self
.LARGE
+ 2)
289 self
.assertEqual(f
.truncate(), self
.LARGE
+ 2)
290 self
.assertEqual(f
.tell(), self
.LARGE
+ 2)
291 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 2)
292 self
.assertEqual(f
.truncate(self
.LARGE
+ 1), self
.LARGE
+ 1)
293 self
.assertEqual(f
.tell(), self
.LARGE
+ 1)
294 self
.assertEqual(f
.seek(0, 2), self
.LARGE
+ 1)
295 self
.assertEqual(f
.seek(-1, 2), self
.LARGE
)
296 self
.assertEqual(f
.read(2), b
"x")
298 def test_invalid_operations(self
):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode
in ("w", "wb"):
301 with self
.open(support
.TESTFN
, mode
) as fp
:
302 self
.assertRaises(IOError, fp
.read
)
303 self
.assertRaises(IOError, fp
.readline
)
304 with self
.open(support
.TESTFN
, "rb") as fp
:
305 self
.assertRaises(IOError, fp
.write
, b
"blah")
306 self
.assertRaises(IOError, fp
.writelines
, [b
"blah\n"])
307 with self
.open(support
.TESTFN
, "r") as fp
:
308 self
.assertRaises(IOError, fp
.write
, "blah")
309 self
.assertRaises(IOError, fp
.writelines
, ["blah\n"])
311 def test_raw_file_io(self
):
312 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
313 self
.assertEqual(f
.readable(), False)
314 self
.assertEqual(f
.writable(), True)
315 self
.assertEqual(f
.seekable(), True)
317 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
318 self
.assertEqual(f
.readable(), True)
319 self
.assertEqual(f
.writable(), False)
320 self
.assertEqual(f
.seekable(), True)
323 def test_buffered_file_io(self
):
324 with self
.open(support
.TESTFN
, "wb") as f
:
325 self
.assertEqual(f
.readable(), False)
326 self
.assertEqual(f
.writable(), True)
327 self
.assertEqual(f
.seekable(), True)
329 with self
.open(support
.TESTFN
, "rb") as f
:
330 self
.assertEqual(f
.readable(), True)
331 self
.assertEqual(f
.writable(), False)
332 self
.assertEqual(f
.seekable(), True)
333 self
.read_ops(f
, True)
335 def test_readline(self
):
336 with self
.open(support
.TESTFN
, "wb") as f
:
337 f
.write(b
"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self
.open(support
.TESTFN
, "rb") as f
:
339 self
.assertEqual(f
.readline(), b
"abc\n")
340 self
.assertEqual(f
.readline(10), b
"def\n")
341 self
.assertEqual(f
.readline(2), b
"xy")
342 self
.assertEqual(f
.readline(4), b
"zzy\n")
343 self
.assertEqual(f
.readline(), b
"foo\x00bar\n")
344 self
.assertEqual(f
.readline(), b
"another line")
345 self
.assertRaises(TypeError, f
.readline
, 5.3)
346 with self
.open(support
.TESTFN
, "r") as f
:
347 self
.assertRaises(TypeError, f
.readline
, 5.3)
349 def test_raw_bytes_io(self
):
353 self
.assertEqual(data
, b
"hello world\n")
354 f
= self
.BytesIO(data
)
355 self
.read_ops(f
, True)
357 def test_large_file_ops(self
):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys
.platform
[:3] == 'win' or sys
.platform
== 'darwin':
362 if not support
.is_resource_enabled("largefile"):
363 print("\nTesting large file ops skipped on %s." % sys
.platform
,
365 print("It requires %d bytes and a long time." % self
.LARGE
,
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
370 with self
.open(support
.TESTFN
, "w+b", 0) as f
:
371 self
.large_file_ops(f
)
372 with self
.open(support
.TESTFN
, "w+b") as f
:
373 self
.large_file_ops(f
)
375 def test_with_open(self
):
376 for bufsize
in (0, 1, 100):
378 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
380 self
.assertEqual(f
.closed
, True)
383 with self
.open(support
.TESTFN
, "wb", bufsize
) as f
:
385 except ZeroDivisionError:
386 self
.assertEqual(f
.closed
, True)
388 self
.fail("1/0 didn't raise an exception")
391 def test_append_mode_tell(self
):
392 with self
.open(support
.TESTFN
, "wb") as f
:
394 with self
.open(support
.TESTFN
, "ab", buffering
=0) as f
:
395 self
.assertEqual(f
.tell(), 3)
396 with self
.open(support
.TESTFN
, "ab") as f
:
397 self
.assertEqual(f
.tell(), 3)
398 with self
.open(support
.TESTFN
, "a") as f
:
399 self
.assertTrue(f
.tell() > 0)
401 def test_destructor(self
):
403 class MyFileIO(self
.FileIO
):
407 f
= super(MyFileIO
, self
).__del
__
408 except AttributeError:
414 super(MyFileIO
, self
).close()
417 super(MyFileIO
, self
).flush()
418 f
= MyFileIO(support
.TESTFN
, "wb")
422 self
.assertEqual(record
, [1, 2, 3])
423 with self
.open(support
.TESTFN
, "rb") as f
:
424 self
.assertEqual(f
.read(), b
"xxx")
426 def _check_base_destructor(self
, base
):
430 # This exercises the availability of attributes on object
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
438 record
.append(self
.on_del
)
440 f
= super(MyIO
, self
).__del
__
441 except AttributeError:
446 record
.append(self
.on_close
)
447 super(MyIO
, self
).close()
449 record
.append(self
.on_flush
)
450 super(MyIO
, self
).flush()
454 self
.assertEqual(record
, [1, 2, 3])
456 def test_IOBase_destructor(self
):
457 self
._check
_base
_destructor
(self
.IOBase
)
459 def test_RawIOBase_destructor(self
):
460 self
._check
_base
_destructor
(self
.RawIOBase
)
462 def test_BufferedIOBase_destructor(self
):
463 self
._check
_base
_destructor
(self
.BufferedIOBase
)
465 def test_TextIOBase_destructor(self
):
466 self
._check
_base
_destructor
(self
.TextIOBase
)
468 def test_close_flushes(self
):
469 with self
.open(support
.TESTFN
, "wb") as f
:
471 with self
.open(support
.TESTFN
, "rb") as f
:
472 self
.assertEqual(f
.read(), b
"xxx")
474 def test_array_writes(self
):
475 a
= array
.array(b
'i', range(10))
476 n
= len(a
.tostring())
477 with self
.open(support
.TESTFN
, "wb", 0) as f
:
478 self
.assertEqual(f
.write(a
), n
)
479 with self
.open(support
.TESTFN
, "wb") as f
:
480 self
.assertEqual(f
.write(a
), n
)
482 def test_closefd(self
):
483 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, 'w',
486 def test_read_closed(self
):
487 with self
.open(support
.TESTFN
, "w") as f
:
489 with self
.open(support
.TESTFN
, "r") as f
:
490 file = self
.open(f
.fileno(), "r", closefd
=False)
491 self
.assertEqual(file.read(), "egg\n")
494 self
.assertRaises(ValueError, file.read
)
496 def test_no_closefd_with_filename(self
):
497 # can't use closefd in combination with a file name
498 self
.assertRaises(ValueError, self
.open, support
.TESTFN
, "r", closefd
=False)
500 def test_closefd_attr(self
):
501 with self
.open(support
.TESTFN
, "wb") as f
:
503 with self
.open(support
.TESTFN
, "r") as f
:
504 self
.assertEqual(f
.buffer.raw
.closefd
, True)
505 file = self
.open(f
.fileno(), "r", closefd
=False)
506 self
.assertEqual(file.buffer.raw
.closefd
, False)
508 def test_garbage_collection(self
):
509 # FileIO objects are collected, and collecting them flushes
511 f
= self
.FileIO(support
.TESTFN
, "wb")
517 self
.assertTrue(wr() is None, wr
)
518 with self
.open(support
.TESTFN
, "rb") as f
:
519 self
.assertEqual(f
.read(), b
"abcxxx")
521 def test_unbounded_file(self
):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
524 if not os
.path
.exists(zero
):
525 self
.skipTest("{0} does not exist".format(zero
))
526 if sys
.maxsize
> 0x7FFFFFFF:
527 self
.skipTest("test can only run in a 32-bit address space")
528 if support
.real_max_memuse
< support
._2G
:
529 self
.skipTest("test requires at least 2GB of memory")
530 with self
.open(zero
, "rb", buffering
=0) as f
:
531 self
.assertRaises(OverflowError, f
.read
)
532 with self
.open(zero
, "rb") as f
:
533 self
.assertRaises(OverflowError, f
.read
)
534 with self
.open(zero
, "r") as f
:
535 self
.assertRaises(OverflowError, f
.read
)
537 class CIOTest(IOTest
):
540 class PyIOTest(IOTest
):
541 test_array_writes
= unittest
.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest
.test_array_writes
)
546 class CommonBufferedTests
:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
549 def test_detach(self
):
550 raw
= self
.MockRawIO()
552 self
.assertIs(buf
.detach(), raw
)
553 self
.assertRaises(ValueError, buf
.detach
)
555 def test_fileno(self
):
556 rawio
= self
.MockRawIO()
557 bufio
= self
.tp(rawio
)
559 self
.assertEquals(42, bufio
.fileno())
561 def test_no_fileno(self
):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
566 def test_invalid_args(self
):
567 rawio
= self
.MockRawIO()
568 bufio
= self
.tp(rawio
)
570 self
.assertRaises(ValueError, bufio
.seek
, 0, -1)
571 self
.assertRaises(ValueError, bufio
.seek
, 0, 3)
573 def test_override_destructor(self
):
576 class MyBufferedIO(tp
):
580 f
= super(MyBufferedIO
, self
).__del
__
581 except AttributeError:
587 super(MyBufferedIO
, self
).close()
590 super(MyBufferedIO
, self
).flush()
591 rawio
= self
.MockRawIO()
592 bufio
= MyBufferedIO(rawio
)
593 writable
= bufio
.writable()
597 self
.assertEqual(record
, [1, 2, 3])
599 self
.assertEqual(record
, [1, 2])
601 def test_context_manager(self
):
602 # Test usability as a context manager
603 rawio
= self
.MockRawIO()
604 bufio
= self
.tp(rawio
)
609 # bufio should now be closed, and using it a second time should raise
611 self
.assertRaises(ValueError, _with
)
613 def test_error_through_destructor(self
):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio
= self
.CloseFailureIO()
619 with support
.captured_output("stderr") as s
:
620 self
.assertRaises(AttributeError, f
)
621 s
= s
.getvalue().strip()
623 # The destructor *may* have printed an unraisable error, check it
624 self
.assertEqual(len(s
.splitlines()), 1)
625 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
626 self
.assertTrue(s
.endswith(" ignored"), s
)
629 raw
= self
.MockRawIO()
631 clsname
= "%s.%s" % (self
.tp
.__module
__, self
.tp
.__name
__)
632 self
.assertEqual(repr(b
), "<%s>" % clsname
)
634 self
.assertEqual(repr(b
), "<%s name=u'dummy'>" % clsname
)
636 self
.assertEqual(repr(b
), "<%s name='dummy'>" % clsname
)
639 class BufferedReaderTest(unittest
.TestCase
, CommonBufferedTests
):
642 def test_constructor(self
):
643 rawio
= self
.MockRawIO([b
"abc"])
644 bufio
= self
.tp(rawio
)
645 bufio
.__init
__(rawio
)
646 bufio
.__init
__(rawio
, buffer_size
=1024)
647 bufio
.__init
__(rawio
, buffer_size
=16)
648 self
.assertEquals(b
"abc", bufio
.read())
649 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
650 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
651 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
652 rawio
= self
.MockRawIO([b
"abc"])
653 bufio
.__init
__(rawio
)
654 self
.assertEquals(b
"abc", bufio
.read())
657 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
658 bufio
= self
.tp(rawio
)
659 self
.assertEquals(b
"abcdef", bufio
.read(6))
661 self
.assertRaises(ValueError, bufio
.read
, -2)
663 def test_read1(self
):
664 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
665 bufio
= self
.tp(rawio
)
666 self
.assertEquals(b
"a", bufio
.read(1))
667 self
.assertEquals(b
"b", bufio
.read1(1))
668 self
.assertEquals(rawio
._reads
, 1)
669 self
.assertEquals(b
"c", bufio
.read1(100))
670 self
.assertEquals(rawio
._reads
, 1)
671 self
.assertEquals(b
"d", bufio
.read1(100))
672 self
.assertEquals(rawio
._reads
, 2)
673 self
.assertEquals(b
"efg", bufio
.read1(100))
674 self
.assertEquals(rawio
._reads
, 3)
675 self
.assertEquals(b
"", bufio
.read1(100))
677 self
.assertRaises(ValueError, bufio
.read1
, -1)
679 def test_readinto(self
):
680 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
681 bufio
= self
.tp(rawio
)
683 self
.assertEquals(bufio
.readinto(b
), 2)
684 self
.assertEquals(b
, b
"ab")
685 self
.assertEquals(bufio
.readinto(b
), 2)
686 self
.assertEquals(b
, b
"cd")
687 self
.assertEquals(bufio
.readinto(b
), 2)
688 self
.assertEquals(b
, b
"ef")
689 self
.assertEquals(bufio
.readinto(b
), 1)
690 self
.assertEquals(b
, b
"gf")
691 self
.assertEquals(bufio
.readinto(b
), 0)
692 self
.assertEquals(b
, b
"gf")
694 def test_buffering(self
):
699 [ 100, [ 3, 1, 4, 8 ], [ dlen
, 0 ] ],
700 [ 100, [ 3, 3, 3], [ dlen
] ],
701 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
704 for bufsize
, buf_read_sizes
, raw_read_sizes
in tests
:
705 rawio
= self
.MockFileIO(data
)
706 bufio
= self
.tp(rawio
, buffer_size
=bufsize
)
708 for nbytes
in buf_read_sizes
:
709 self
.assertEquals(bufio
.read(nbytes
), data
[pos
:pos
+nbytes
])
711 # this is mildly implementation-dependent
712 self
.assertEquals(rawio
.read_history
, raw_read_sizes
)
714 def test_read_non_blocking(self
):
715 # Inject some None's in there to simulate EWOULDBLOCK
716 rawio
= self
.MockRawIO((b
"abc", b
"d", None, b
"efg", None, None, None))
717 bufio
= self
.tp(rawio
)
719 self
.assertEquals(b
"abcd", bufio
.read(6))
720 self
.assertEquals(b
"e", bufio
.read(1))
721 self
.assertEquals(b
"fg", bufio
.read())
722 self
.assertEquals(b
"", bufio
.peek(1))
723 self
.assertTrue(None is bufio
.read())
724 self
.assertEquals(b
"", bufio
.read())
726 def test_read_past_eof(self
):
727 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
728 bufio
= self
.tp(rawio
)
730 self
.assertEquals(b
"abcdefg", bufio
.read(9000))
732 def test_read_all(self
):
733 rawio
= self
.MockRawIO((b
"abc", b
"d", b
"efg"))
734 bufio
= self
.tp(rawio
)
736 self
.assertEquals(b
"abcdefg", bufio
.read())
738 def test_threads(self
):
740 # Write out many bytes with exactly the same number of 0's,
741 # 1's... 255's. This will help us check that concurrent reading
742 # doesn't duplicate or forget contents.
744 l
= list(range(256)) * N
746 s
= bytes(bytearray(l
))
747 with self
.open(support
.TESTFN
, "wb") as f
:
749 with self
.open(support
.TESTFN
, self
.read_mode
, buffering
=0) as raw
:
750 bufio
= self
.tp(raw
, 8)
755 # Intra-buffer read then buffer-flushing read
756 for n
in cycle([1, 19]):
760 # list.append() is atomic
762 except Exception as e
:
765 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
768 time
.sleep(0.02) # yield
771 self
.assertFalse(errors
,
772 "the following exceptions were caught: %r" % errors
)
773 s
= b
''.join(results
)
775 c
= bytes(bytearray([i
]))
776 self
.assertEqual(s
.count(c
), N
)
778 support
.unlink(support
.TESTFN
)
780 def test_misbehaved_io(self
):
781 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
782 bufio
= self
.tp(rawio
)
783 self
.assertRaises(IOError, bufio
.seek
, 0)
784 self
.assertRaises(IOError, bufio
.tell
)
786 class CBufferedReaderTest(BufferedReaderTest
):
787 tp
= io
.BufferedReader
789 def test_constructor(self
):
790 BufferedReaderTest
.test_constructor(self
)
791 # The allocation can succeed on 32-bit builds, e.g. with more
792 # than 2GB RAM and a 64-bit kernel.
793 if sys
.maxsize
> 0x7FFFFFFF:
794 rawio
= self
.MockRawIO()
795 bufio
= self
.tp(rawio
)
796 self
.assertRaises((OverflowError, MemoryError, ValueError),
797 bufio
.__init
__, rawio
, sys
.maxsize
)
799 def test_initialization(self
):
800 rawio
= self
.MockRawIO([b
"abc"])
801 bufio
= self
.tp(rawio
)
802 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
803 self
.assertRaises(ValueError, bufio
.read
)
804 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
805 self
.assertRaises(ValueError, bufio
.read
)
806 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
807 self
.assertRaises(ValueError, bufio
.read
)
809 def test_misbehaved_io_read(self
):
810 rawio
= self
.MisbehavedRawIO((b
"abc", b
"d", b
"efg"))
811 bufio
= self
.tp(rawio
)
812 # _pyio.BufferedReader seems to implement reading different, so that
813 # checking this is not so easy.
814 self
.assertRaises(IOError, bufio
.read
, 10)
816 def test_garbage_collection(self
):
817 # C BufferedReader objects are collected.
818 # The Python version has __del__, so it ends into gc.garbage instead
819 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
825 self
.assertTrue(wr() is None, wr
)
827 class PyBufferedReaderTest(BufferedReaderTest
):
828 tp
= pyio
.BufferedReader
831 class BufferedWriterTest(unittest
.TestCase
, CommonBufferedTests
):
834 def test_constructor(self
):
835 rawio
= self
.MockRawIO()
836 bufio
= self
.tp(rawio
)
837 bufio
.__init
__(rawio
)
838 bufio
.__init
__(rawio
, buffer_size
=1024)
839 bufio
.__init
__(rawio
, buffer_size
=16)
840 self
.assertEquals(3, bufio
.write(b
"abc"))
842 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
843 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
844 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
845 bufio
.__init
__(rawio
)
846 self
.assertEquals(3, bufio
.write(b
"ghi"))
848 self
.assertEquals(b
"".join(rawio
._write
_stack
), b
"abcghi")
850 def test_detach_flush(self
):
851 raw
= self
.MockRawIO()
854 self
.assertFalse(raw
._write
_stack
)
856 self
.assertEqual(raw
._write
_stack
, [b
"howdy!"])
858 def test_write(self
):
859 # Write to the buffered IO but don't overflow the buffer.
860 writer
= self
.MockRawIO()
861 bufio
= self
.tp(writer
, 8)
863 self
.assertFalse(writer
._write
_stack
)
865 def test_write_overflow(self
):
866 writer
= self
.MockRawIO()
867 bufio
= self
.tp(writer
, 8)
868 contents
= b
"abcdefghijklmnop"
869 for n
in range(0, len(contents
), 3):
870 bufio
.write(contents
[n
:n
+3])
871 flushed
= b
"".join(writer
._write
_stack
)
872 # At least (total - 8) bytes were implicitly flushed, perhaps more
873 # depending on the implementation.
874 self
.assertTrue(flushed
.startswith(contents
[:-8]), flushed
)
876 def check_writes(self
, intermediate_func
):
877 # Lots of writes, test the flushed output is as expected.
878 contents
= bytes(range(256)) * 1000
880 writer
= self
.MockRawIO()
881 bufio
= self
.tp(writer
, 13)
882 # Generator of write sizes: repeat each N 15 times then proceed to N+1
884 for size
in count(1):
888 while n
< len(contents
):
889 size
= min(next(sizes
), len(contents
) - n
)
890 self
.assertEquals(bufio
.write(contents
[n
:n
+size
]), size
)
891 intermediate_func(bufio
)
894 self
.assertEquals(contents
,
895 b
"".join(writer
._write
_stack
))
897 def test_writes(self
):
898 self
.check_writes(lambda bufio
: None)
900 def test_writes_and_flushes(self
):
901 self
.check_writes(lambda bufio
: bufio
.flush())
903 def test_writes_and_seeks(self
):
906 bufio
.seek(pos
+ 1, 0)
907 bufio
.seek(pos
- 1, 0)
909 self
.check_writes(_seekabs
)
911 pos
= bufio
.seek(0, 1)
915 self
.check_writes(_seekrel
)
917 def test_writes_and_truncates(self
):
918 self
.check_writes(lambda bufio
: bufio
.truncate(bufio
.tell()))
920 def test_write_non_blocking(self
):
921 raw
= self
.MockNonBlockWriterIO()
922 bufio
= self
.tp(raw
, 8)
924 self
.assertEquals(bufio
.write(b
"abcd"), 4)
925 self
.assertEquals(bufio
.write(b
"efghi"), 5)
926 # 1 byte will be written, the rest will be buffered
928 self
.assertEquals(bufio
.write(b
"jklmn"), 5)
930 # 8 bytes will be written, 8 will be buffered and the rest will be lost
933 bufio
.write(b
"opqrwxyz0123456789")
934 except self
.BlockingIOError
as e
:
935 written
= e
.characters_written
937 self
.fail("BlockingIOError should have been raised")
938 self
.assertEquals(written
, 16)
939 self
.assertEquals(raw
.pop_written(),
940 b
"abcdefghijklmnopqrwxyz")
942 self
.assertEquals(bufio
.write(b
"ABCDEFGHI"), 9)
943 s
= raw
.pop_written()
944 # Previously buffered bytes were flushed
945 self
.assertTrue(s
.startswith(b
"01234567A"), s
)
947 def test_write_and_rewind(self
):
949 bufio
= self
.tp(raw
, 4)
950 self
.assertEqual(bufio
.write(b
"abcdef"), 6)
951 self
.assertEqual(bufio
.tell(), 6)
953 self
.assertEqual(bufio
.write(b
"XY"), 2)
955 self
.assertEqual(raw
.getvalue(), b
"XYcdef")
956 self
.assertEqual(bufio
.write(b
"123456"), 6)
958 self
.assertEqual(raw
.getvalue(), b
"XYcdef123456")
960 def test_flush(self
):
961 writer
= self
.MockRawIO()
962 bufio
= self
.tp(writer
, 8)
965 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
967 def test_destructor(self
):
968 writer
= self
.MockRawIO()
969 bufio
= self
.tp(writer
, 8)
973 self
.assertEquals(b
"abc", writer
._write
_stack
[0])
975 def test_truncate(self
):
976 # Truncate implicitly flushes the buffer.
977 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
978 bufio
= self
.tp(raw
, 8)
979 bufio
.write(b
"abcdef")
980 self
.assertEqual(bufio
.truncate(3), 3)
981 self
.assertEqual(bufio
.tell(), 3)
982 with self
.open(support
.TESTFN
, "rb", buffering
=0) as f
:
983 self
.assertEqual(f
.read(), b
"abc")
985 def test_threads(self
):
987 # Write out many bytes from many threads and test they were
990 contents
= bytes(range(256)) * N
991 sizes
= cycle([1, 19])
994 while n
< len(contents
):
996 queue
.append(contents
[n
:n
+size
])
999 # We use a real file object because it allows us to
1000 # exercise situations where the GIL is released before
1001 # writing the buffer to the raw streams. This is in addition
1002 # to concurrency issues due to switching threads in the middle
1004 with self
.open(support
.TESTFN
, self
.write_mode
, buffering
=0) as raw
:
1005 bufio
= self
.tp(raw
, 8)
1015 except Exception as e
:
1018 threads
= [threading
.Thread(target
=f
) for x
in range(20)]
1021 time
.sleep(0.02) # yield
1024 self
.assertFalse(errors
,
1025 "the following exceptions were caught: %r" % errors
)
1027 with self
.open(support
.TESTFN
, "rb") as f
:
1029 for i
in range(256):
1030 self
.assertEquals(s
.count(bytes([i
])), N
)
1032 support
.unlink(support
.TESTFN
)
1034 def test_misbehaved_io(self
):
1035 rawio
= self
.MisbehavedRawIO()
1036 bufio
= self
.tp(rawio
, 5)
1037 self
.assertRaises(IOError, bufio
.seek
, 0)
1038 self
.assertRaises(IOError, bufio
.tell
)
1039 self
.assertRaises(IOError, bufio
.write
, b
"abcdef")
1041 def test_max_buffer_size_deprecation(self
):
1042 with support
.check_warnings() as w
:
1043 warnings
.simplefilter("always", DeprecationWarning)
1044 self
.tp(self
.MockRawIO(), 8, 12)
1045 self
.assertEqual(len(w
.warnings
), 1)
1046 warning
= w
.warnings
[0]
1047 self
.assertTrue(warning
.category
is DeprecationWarning)
1048 self
.assertEqual(str(warning
.message
),
1049 "max_buffer_size is deprecated")
1052 class CBufferedWriterTest(BufferedWriterTest
):
1053 tp
= io
.BufferedWriter
1055 def test_constructor(self
):
1056 BufferedWriterTest
.test_constructor(self
)
1057 # The allocation can succeed on 32-bit builds, e.g. with more
1058 # than 2GB RAM and a 64-bit kernel.
1059 if sys
.maxsize
> 0x7FFFFFFF:
1060 rawio
= self
.MockRawIO()
1061 bufio
= self
.tp(rawio
)
1062 self
.assertRaises((OverflowError, MemoryError, ValueError),
1063 bufio
.__init
__, rawio
, sys
.maxsize
)
1065 def test_initialization(self
):
1066 rawio
= self
.MockRawIO()
1067 bufio
= self
.tp(rawio
)
1068 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=0)
1069 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1070 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-16)
1071 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1072 self
.assertRaises(ValueError, bufio
.__init
__, rawio
, buffer_size
=-1)
1073 self
.assertRaises(ValueError, bufio
.write
, b
"def")
1075 def test_garbage_collection(self
):
1076 # C BufferedWriter objects are collected, and collecting them flushes
1078 # The Python version has __del__, so it ends into gc.garbage instead
1079 rawio
= self
.FileIO(support
.TESTFN
, "w+b")
1085 support
.gc_collect()
1086 self
.assertTrue(wr() is None, wr
)
1087 with self
.open(support
.TESTFN
, "rb") as f
:
1088 self
.assertEqual(f
.read(), b
"123xxx")
1091 class PyBufferedWriterTest(BufferedWriterTest
):
1092 tp
= pyio
.BufferedWriter
1094 class BufferedRWPairTest(unittest
.TestCase
):
1096 def test_constructor(self
):
1097 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1098 self
.assertFalse(pair
.closed
)
1100 def test_detach(self
):
1101 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1102 self
.assertRaises(self
.UnsupportedOperation
, pair
.detach
)
1104 def test_constructor_max_buffer_size_deprecation(self
):
1105 with support
.check_warnings() as w
:
1106 warnings
.simplefilter("always", DeprecationWarning)
1107 self
.tp(self
.MockRawIO(), self
.MockRawIO(), 8, 12)
1108 self
.assertEqual(len(w
.warnings
), 1)
1109 warning
= w
.warnings
[0]
1110 self
.assertTrue(warning
.category
is DeprecationWarning)
1111 self
.assertEqual(str(warning
.message
),
1112 "max_buffer_size is deprecated")
1114 def test_constructor_with_not_readable(self
):
1115 class NotReadable(MockRawIO
):
1119 self
.assertRaises(IOError, self
.tp
, NotReadable(), self
.MockRawIO())
1121 def test_constructor_with_not_writeable(self
):
1122 class NotWriteable(MockRawIO
):
1126 self
.assertRaises(IOError, self
.tp
, self
.MockRawIO(), NotWriteable())
1128 def test_read(self
):
1129 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1131 self
.assertEqual(pair
.read(3), b
"abc")
1132 self
.assertEqual(pair
.read(1), b
"d")
1133 self
.assertEqual(pair
.read(), b
"ef")
1135 def test_read1(self
):
1136 # .read1() is delegated to the underlying reader object, so this test
1138 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1140 self
.assertEqual(pair
.read1(3), b
"abc")
1142 def test_readinto(self
):
1143 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1146 self
.assertEqual(pair
.readinto(data
), 5)
1147 self
.assertEqual(data
, b
"abcde")
1149 def test_write(self
):
1150 w
= self
.MockRawIO()
1151 pair
= self
.tp(self
.MockRawIO(), w
)
1157 self
.assertEqual(w
._write
_stack
, [b
"abc", b
"def"])
1159 def test_peek(self
):
1160 pair
= self
.tp(self
.BytesIO(b
"abcdef"), self
.MockRawIO())
1162 self
.assertTrue(pair
.peek(3).startswith(b
"abc"))
1163 self
.assertEqual(pair
.read(3), b
"abc")
1165 def test_readable(self
):
1166 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1167 self
.assertTrue(pair
.readable())
1169 def test_writeable(self
):
1170 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1171 self
.assertTrue(pair
.writable())
1173 def test_seekable(self
):
1174 # BufferedRWPairs are never seekable, even if their readers and writers
1176 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1177 self
.assertFalse(pair
.seekable())
1179 # .flush() is delegated to the underlying writer object and has been
1180 # tested in the test_write method.
1182 def test_close_and_closed(self
):
1183 pair
= self
.tp(self
.MockRawIO(), self
.MockRawIO())
1184 self
.assertFalse(pair
.closed
)
1186 self
.assertTrue(pair
.closed
)
1188 def test_isatty(self
):
1189 class SelectableIsAtty(MockRawIO
):
1190 def __init__(self
, isatty
):
1191 MockRawIO
.__init
__(self
)
1192 self
._isatty
= isatty
1197 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1198 self
.assertFalse(pair
.isatty())
1200 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1201 self
.assertTrue(pair
.isatty())
1203 pair
= self
.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1204 self
.assertTrue(pair
.isatty())
1206 pair
= self
.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1207 self
.assertTrue(pair
.isatty())
1209 class CBufferedRWPairTest(BufferedRWPairTest
):
1210 tp
= io
.BufferedRWPair
1212 class PyBufferedRWPairTest(BufferedRWPairTest
):
1213 tp
= pyio
.BufferedRWPair
1216 class BufferedRandomTest(BufferedReaderTest
, BufferedWriterTest
):
1220 def test_constructor(self
):
1221 BufferedReaderTest
.test_constructor(self
)
1222 BufferedWriterTest
.test_constructor(self
)
1224 def test_read_and_write(self
):
1225 raw
= self
.MockRawIO((b
"asdf", b
"ghjk"))
1226 rw
= self
.tp(raw
, 8)
1228 self
.assertEqual(b
"as", rw
.read(2))
1231 self
.assertFalse(raw
._write
_stack
) # Buffer writes
1232 self
.assertEqual(b
"ghjk", rw
.read())
1233 self
.assertEquals(b
"dddeee", raw
._write
_stack
[0])
1235 def test_seek_and_tell(self
):
1236 raw
= self
.BytesIO(b
"asdfghjkl")
1239 self
.assertEquals(b
"as", rw
.read(2))
1240 self
.assertEquals(2, rw
.tell())
1242 self
.assertEquals(b
"asdf", rw
.read(4))
1246 self
.assertEquals(b
"asdfasdfl", rw
.read())
1247 self
.assertEquals(9, rw
.tell())
1249 self
.assertEquals(5, rw
.tell())
1251 self
.assertEquals(7, rw
.tell())
1252 self
.assertEquals(b
"fl", rw
.read(11))
1253 self
.assertRaises(TypeError, rw
.seek
, 0.0)
1255 def check_flush_and_read(self
, read_func
):
1256 raw
= self
.BytesIO(b
"abcdefghi")
1257 bufio
= self
.tp(raw
)
1259 self
.assertEquals(b
"ab", read_func(bufio
, 2))
1261 self
.assertEquals(b
"ef", read_func(bufio
, 2))
1262 self
.assertEquals(6, bufio
.tell())
1264 self
.assertEquals(6, bufio
.tell())
1265 self
.assertEquals(b
"ghi", read_func(bufio
))
1268 # flush() resets the read buffer
1271 self
.assertEquals(b
"XYZ", read_func(bufio
, 3))
1273 def test_flush_and_read(self
):
1274 self
.check_flush_and_read(lambda bufio
, *args
: bufio
.read(*args
))
1276 def test_flush_and_readinto(self
):
1277 def _readinto(bufio
, n
=-1):
1278 b
= bytearray(n
if n
>= 0 else 9999)
1279 n
= bufio
.readinto(b
)
1281 self
.check_flush_and_read(_readinto
)
1283 def test_flush_and_peek(self
):
1284 def _peek(bufio
, n
=-1):
1285 # This relies on the fact that the buffer can contain the whole
1286 # raw stream, otherwise peek() can return less.
1290 bufio
.seek(len(b
), 1)
1292 self
.check_flush_and_read(_peek
)
1294 def test_flush_and_write(self
):
1295 raw
= self
.BytesIO(b
"abcdefghi")
1296 bufio
= self
.tp(raw
)
1303 self
.assertEquals(b
"12345fghi", raw
.getvalue())
1304 self
.assertEquals(b
"12345fghi", bufio
.read())
1306 def test_threads(self
):
1307 BufferedReaderTest
.test_threads(self
)
1308 BufferedWriterTest
.test_threads(self
)
1310 def test_writes_and_peek(self
):
1313 self
.check_writes(_peek
)
1319 self
.check_writes(_peek
)
1321 def test_writes_and_reads(self
):
1325 self
.check_writes(_read
)
1327 def test_writes_and_read1s(self
):
1331 self
.check_writes(_read1
)
1333 def test_writes_and_readintos(self
):
1336 bufio
.readinto(bytearray(1))
1337 self
.check_writes(_read
)
1339 def test_write_after_readahead(self
):
1340 # Issue #6629: writing after the buffer was filled by readahead should
1341 # first rewind the raw stream.
1342 for overwrite_size
in [1, 5]:
1343 raw
= self
.BytesIO(b
"A" * 10)
1344 bufio
= self
.tp(raw
, 4)
1346 self
.assertEqual(bufio
.read(1), b
"A")
1347 self
.assertEqual(bufio
.tell(), 1)
1348 # Overwriting should rewind the raw stream if it needs so
1349 bufio
.write(b
"B" * overwrite_size
)
1350 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1351 # If the write size was smaller than the buffer size, flush() and
1352 # check that rewind happens.
1354 self
.assertEqual(bufio
.tell(), overwrite_size
+ 1)
1357 b
"A" + b
"B" * overwrite_size
+ b
"A" * (9 - overwrite_size
))
1359 def test_misbehaved_io(self
):
1360 BufferedReaderTest
.test_misbehaved_io(self
)
1361 BufferedWriterTest
.test_misbehaved_io(self
)
1363 class CBufferedRandomTest(CBufferedReaderTest
, CBufferedWriterTest
, BufferedRandomTest
):
1364 tp
= io
.BufferedRandom
1366 def test_constructor(self
):
1367 BufferedRandomTest
.test_constructor(self
)
1368 # The allocation can succeed on 32-bit builds, e.g. with more
1369 # than 2GB RAM and a 64-bit kernel.
1370 if sys
.maxsize
> 0x7FFFFFFF:
1371 rawio
= self
.MockRawIO()
1372 bufio
= self
.tp(rawio
)
1373 self
.assertRaises((OverflowError, MemoryError, ValueError),
1374 bufio
.__init
__, rawio
, sys
.maxsize
)
1376 def test_garbage_collection(self
):
1377 CBufferedReaderTest
.test_garbage_collection(self
)
1378 CBufferedWriterTest
.test_garbage_collection(self
)
1380 class PyBufferedRandomTest(BufferedRandomTest
):
1381 tp
= pyio
.BufferedRandom
1384 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1386 # - A single output character can correspond to many bytes of input.
1387 # - The number of input bytes to complete the character can be
1388 # undetermined until the last input byte is received.
1389 # - The number of input bytes can vary depending on previous input.
1390 # - A single input byte can correspond to many characters of output.
1391 # - The number of output characters can be undetermined until the
1392 # last input byte is received.
1393 # - The number of output characters can vary depending on previous input.
1395 class StatefulIncrementalDecoder(codecs
.IncrementalDecoder
):
1397 For testing seek/tell behavior with a stateful, buffering decoder.
1399 Input is a sequence of words. Words may be fixed-length (length set
1400 by input) or variable-length (period-terminated). In variable-length
1401 mode, extra periods are ignored. Possible words are:
1402 - 'i' followed by a number sets the input length, I (maximum 99).
1403 When I is set to 0, words are space-terminated.
1404 - 'o' followed by a number sets the output length, O (maximum 99).
1405 - Any other word is converted into a word followed by a period on
1406 the output. The output word consists of the input word truncated
1407 or padded out with hyphens to make its length equal to O. If O
1408 is 0, the word is output verbatim without truncating or padding.
1409 I and O are initially set to 1. When I changes, any buffered input is
1410 re-scanned according to the new I. EOF also terminates the last word.
1413 def __init__(self
, errors
='strict'):
1414 codecs
.IncrementalDecoder
.__init
__(self
, errors
)
1418 return '<SID %x>' % id(self
)
1423 self
.buffer = bytearray()
1426 i
, o
= self
.i ^
1, self
.o ^
1 # so that flags = 0 after reset()
1427 return bytes(self
.buffer), i
*100 + o
1429 def setstate(self
, state
):
1431 self
.buffer = bytearray(buffer)
1432 i
, o
= divmod(io
, 100)
1433 self
.i
, self
.o
= i ^
1, o ^
1
1435 def decode(self
, input, final
=False):
1438 if self
.i
== 0: # variable-length, terminated with period
1441 output
+= self
.process_word()
1443 self
.buffer.append(b
)
1444 else: # fixed-length, terminate after self.i bytes
1445 self
.buffer.append(b
)
1446 if len(self
.buffer) == self
.i
:
1447 output
+= self
.process_word()
1448 if final
and self
.buffer: # EOF terminates the last word
1449 output
+= self
.process_word()
1452 def process_word(self
):
1454 if self
.buffer[0] == ord('i'):
1455 self
.i
= min(99, int(self
.buffer[1:] or 0)) # set input length
1456 elif self
.buffer[0] == ord('o'):
1457 self
.o
= min(99, int(self
.buffer[1:] or 0)) # set output length
1459 output
= self
.buffer.decode('ascii')
1460 if len(output
) < self
.o
:
1461 output
+= '-'*self
.o
# pad out with hyphens
1463 output
= output
[:self
.o
] # truncate to output length
1465 self
.buffer = bytearray()
1468 codecEnabled
= False
1471 def lookupTestDecoder(cls
, name
):
1472 if cls
.codecEnabled
and name
== 'test_decoder':
1473 latin1
= codecs
.lookup('latin-1')
1474 return codecs
.CodecInfo(
1475 name
='test_decoder', encode
=latin1
.encode
, decode
=None,
1476 incrementalencoder
=None,
1477 streamreader
=None, streamwriter
=None,
1478 incrementaldecoder
=cls
)
1480 # Register the previous decoder for testing.
1481 # Disabled by default, tests will enable it.
1482 codecs
.register(StatefulIncrementalDecoder
.lookupTestDecoder
)
1485 class StatefulIncrementalDecoderTest(unittest
.TestCase
):
1487 Make sure the StatefulIncrementalDecoder actually works.
1491 # I=1, O=1 (fixed-length input == fixed-length output)
1492 (b
'abcd', False, 'a.b.c.d.'),
1493 # I=0, O=0 (variable-length input, variable-length output)
1494 (b
'oiabcd', True, 'abcd.'),
1495 # I=0, O=0 (should ignore extra periods)
1496 (b
'oi...abcd...', True, 'abcd.'),
1497 # I=0, O=6 (variable-length input, fixed-length output)
1498 (b
'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1499 # I=2, O=6 (fixed-length input < fixed-length output)
1500 (b
'i.i2.o6xyz', True, 'xy----.z-----.'),
1501 # I=6, O=3 (fixed-length input > fixed-length output)
1502 (b
'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1503 # I=0, then 3; O=29, then 15 (with longer output)
1504 (b
'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1505 'a----------------------------.' +
1506 'b----------------------------.' +
1507 'cde--------------------------.' +
1508 'abcdefghijabcde.' +
1509 'a.b------------.' +
1510 '.c.------------.' +
1511 'd.e------------.' +
1512 'k--------------.' +
1513 'l--------------.' +
1517 def test_decoder(self
):
1518 # Try a few one-shot test cases.
1519 for input, eof
, output
in self
.test_cases
:
1520 d
= StatefulIncrementalDecoder()
1521 self
.assertEquals(d
.decode(input, eof
), output
)
1523 # Also test an unfinished decode, followed by forcing EOF.
1524 d
= StatefulIncrementalDecoder()
1525 self
.assertEquals(d
.decode(b
'oiabcd'), '')
1526 self
.assertEquals(d
.decode(b
'', 1), 'abcd.')
1528 class TextIOWrapperTest(unittest
.TestCase
):
1531 self
.testdata
= b
"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1532 self
.normalized
= b
"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1533 support
.unlink(support
.TESTFN
)
1536 support
.unlink(support
.TESTFN
)
1538 def test_constructor(self
):
1539 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
1540 b
= self
.BufferedReader(r
, 1000)
1541 t
= self
.TextIOWrapper(b
)
1542 t
.__init
__(b
, encoding
="latin1", newline
="\r\n")
1543 self
.assertEquals(t
.encoding
, "latin1")
1544 self
.assertEquals(t
.line_buffering
, False)
1545 t
.__init
__(b
, encoding
="utf8", line_buffering
=True)
1546 self
.assertEquals(t
.encoding
, "utf8")
1547 self
.assertEquals(t
.line_buffering
, True)
1548 self
.assertEquals("\xe9\n", t
.readline())
1549 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
1550 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
1552 def test_detach(self
):
1554 b
= self
.BufferedWriter(r
)
1555 t
= self
.TextIOWrapper(b
)
1556 self
.assertIs(t
.detach(), b
)
1558 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1560 self
.assertFalse(r
.getvalue())
1562 self
.assertEqual(r
.getvalue(), b
"howdy")
1563 self
.assertRaises(ValueError, t
.detach
)
1565 def test_repr(self
):
1566 raw
= self
.BytesIO("hello".encode("utf-8"))
1567 b
= self
.BufferedReader(raw
)
1568 t
= self
.TextIOWrapper(b
, encoding
="utf-8")
1569 modname
= self
.TextIOWrapper
.__module
__
1570 self
.assertEqual(repr(t
),
1571 "<%s.TextIOWrapper encoding='utf-8'>" % modname
)
1573 self
.assertEqual(repr(t
),
1574 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname
)
1576 self
.assertEqual(repr(t
),
1577 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname
)
1579 def test_line_buffering(self
):
1581 b
= self
.BufferedWriter(r
, 1000)
1582 t
= self
.TextIOWrapper(b
, newline
="\n", line_buffering
=True)
1584 self
.assertEquals(r
.getvalue(), b
"") # No flush happened
1586 self
.assertEquals(r
.getvalue(), b
"XY\nZ") # All got flushed
1588 self
.assertEquals(r
.getvalue(), b
"XY\nZA\rB")
1590 def test_encoding(self
):
1591 # Check the encoding attribute is always set, and valid
1593 t
= self
.TextIOWrapper(b
, encoding
="utf8")
1594 self
.assertEqual(t
.encoding
, "utf8")
1595 t
= self
.TextIOWrapper(b
)
1596 self
.assertTrue(t
.encoding
is not None)
1597 codecs
.lookup(t
.encoding
)
1599 def test_encoding_errors_reading(self
):
1601 b
= self
.BytesIO(b
"abc\n\xff\n")
1602 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1603 self
.assertRaises(UnicodeError, t
.read
)
1604 # (2) explicit strict
1605 b
= self
.BytesIO(b
"abc\n\xff\n")
1606 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1607 self
.assertRaises(UnicodeError, t
.read
)
1609 b
= self
.BytesIO(b
"abc\n\xff\n")
1610 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore")
1611 self
.assertEquals(t
.read(), "abc\n\n")
1613 b
= self
.BytesIO(b
"abc\n\xff\n")
1614 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace")
1615 self
.assertEquals(t
.read(), "abc\n\ufffd\n")
1617 def test_encoding_errors_writing(self
):
1620 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1621 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1622 # (2) explicit strict
1624 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="strict")
1625 self
.assertRaises(UnicodeError, t
.write
, "\xff")
1628 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="ignore",
1630 t
.write("abc\xffdef\n")
1632 self
.assertEquals(b
.getvalue(), b
"abcdef\n")
1635 t
= self
.TextIOWrapper(b
, encoding
="ascii", errors
="replace",
1637 t
.write("abc\xffdef\n")
1639 self
.assertEquals(b
.getvalue(), b
"abc?def\n")
1641 def test_newlines(self
):
1642 input_lines
= [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1645 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1646 [ '', input_lines
],
1647 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1648 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1649 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1653 'utf-16', 'utf-16-le', 'utf-16-be',
1654 'utf-32', 'utf-32-le', 'utf-32-be',
1657 # Try a range of buffer sizes to test the case where \r is the last
1658 # character in TextIOWrapper._pending_line.
1659 for encoding
in encodings
:
1660 # XXX: str.encode() should return bytes
1661 data
= bytes(''.join(input_lines
).encode(encoding
))
1662 for do_reads
in (False, True):
1663 for bufsize
in range(1, 10):
1664 for newline
, exp_lines
in tests
:
1665 bufio
= self
.BufferedReader(self
.BytesIO(data
), bufsize
)
1666 textio
= self
.TextIOWrapper(bufio
, newline
=newline
,
1674 self
.assertEquals(len(c2
), 2)
1675 got_lines
.append(c2
+ textio
.readline())
1677 got_lines
= list(textio
)
1679 for got_line
, exp_line
in zip(got_lines
, exp_lines
):
1680 self
.assertEquals(got_line
, exp_line
)
1681 self
.assertEquals(len(got_lines
), len(exp_lines
))
1683 def test_newlines_input(self
):
1684 testdata
= b
"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1685 normalized
= testdata
.replace(b
"\r\n", b
"\n").replace(b
"\r", b
"\n")
1686 for newline
, expected
in [
1687 (None, normalized
.decode("ascii").splitlines(True)),
1688 ("", testdata
.decode("ascii").splitlines(True)),
1689 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1690 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1691 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1693 buf
= self
.BytesIO(testdata
)
1694 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1695 self
.assertEquals(txt
.readlines(), expected
)
1697 self
.assertEquals(txt
.read(), "".join(expected
))
1699 def test_newlines_output(self
):
1701 "": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1702 "\n": b
"AAA\nBBB\nCCC\nX\rY\r\nZ",
1703 "\r": b
"AAA\rBBB\rCCC\rX\rY\r\rZ",
1704 "\r\n": b
"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1706 tests
= [(None, testdict
[os
.linesep
])] + sorted(testdict
.items())
1707 for newline
, expected
in tests
:
1708 buf
= self
.BytesIO()
1709 txt
= self
.TextIOWrapper(buf
, encoding
="ascii", newline
=newline
)
1711 txt
.write("BB\nCCC\n")
1712 txt
.write("X\rY\r\nZ")
1714 self
.assertEquals(buf
.closed
, False)
1715 self
.assertEquals(buf
.getvalue(), expected
)
1717 def test_destructor(self
):
1720 class MyBytesIO(base
):
1722 l
.append(self
.getvalue())
1725 t
= self
.TextIOWrapper(b
, encoding
="ascii")
1728 support
.gc_collect()
1729 self
.assertEquals([b
"abc"], l
)
1731 def test_override_destructor(self
):
1733 class MyTextIO(self
.TextIOWrapper
):
1737 f
= super(MyTextIO
, self
).__del
__
1738 except AttributeError:
1744 super(MyTextIO
, self
).close()
1747 super(MyTextIO
, self
).flush()
1749 t
= MyTextIO(b
, encoding
="ascii")
1751 support
.gc_collect()
1752 self
.assertEqual(record
, [1, 2, 3])
1754 def test_error_through_destructor(self
):
1755 # Test that the exception state is not modified by a destructor,
1756 # even if close() fails.
1757 rawio
= self
.CloseFailureIO()
1759 self
.TextIOWrapper(rawio
).xyzzy
1760 with support
.captured_output("stderr") as s
:
1761 self
.assertRaises(AttributeError, f
)
1762 s
= s
.getvalue().strip()
1764 # The destructor *may* have printed an unraisable error, check it
1765 self
.assertEqual(len(s
.splitlines()), 1)
1766 self
.assertTrue(s
.startswith("Exception IOError: "), s
)
1767 self
.assertTrue(s
.endswith(" ignored"), s
)
1769 # Systematic tests of the text I/O API
1771 def test_basic_io(self
):
1772 for chunksize
in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1773 for enc
in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1774 f
= self
.open(support
.TESTFN
, "w+", encoding
=enc
)
1775 f
._CHUNK
_SIZE
= chunksize
1776 self
.assertEquals(f
.write("abc"), 3)
1778 f
= self
.open(support
.TESTFN
, "r+", encoding
=enc
)
1779 f
._CHUNK
_SIZE
= chunksize
1780 self
.assertEquals(f
.tell(), 0)
1781 self
.assertEquals(f
.read(), "abc")
1783 self
.assertEquals(f
.seek(0), 0)
1784 self
.assertEquals(f
.read(2), "ab")
1785 self
.assertEquals(f
.read(1), "c")
1786 self
.assertEquals(f
.read(1), "")
1787 self
.assertEquals(f
.read(), "")
1788 self
.assertEquals(f
.tell(), cookie
)
1789 self
.assertEquals(f
.seek(0), 0)
1790 self
.assertEquals(f
.seek(0, 2), cookie
)
1791 self
.assertEquals(f
.write("def"), 3)
1792 self
.assertEquals(f
.seek(cookie
), cookie
)
1793 self
.assertEquals(f
.read(), "def")
1794 if enc
.startswith("utf"):
1795 self
.multi_line_test(f
, enc
)
1798 def multi_line_test(self
, f
, enc
):
1801 sample
= "s\xff\u0fff\uffff"
1803 for size
in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1805 for i
in range(size
):
1806 chars
.append(sample
[i
% len(sample
)])
1807 line
= "".join(chars
) + "\n"
1808 wlines
.append((f
.tell(), line
))
1817 rlines
.append((pos
, line
))
1818 self
.assertEquals(rlines
, wlines
)
1820 def test_telling(self
):
1821 f
= self
.open(support
.TESTFN
, "w+", encoding
="utf8")
1828 self
.assertEquals(f
.tell(), p0
)
1829 self
.assertEquals(f
.readline(), "\xff\n")
1830 self
.assertEquals(f
.tell(), p1
)
1831 self
.assertEquals(f
.readline(), "\xff\n")
1832 self
.assertEquals(f
.tell(), p2
)
1835 self
.assertEquals(line
, "\xff\n")
1836 self
.assertRaises(IOError, f
.tell
)
1837 self
.assertEquals(f
.tell(), p2
)
1840 def test_seeking(self
):
1841 chunk_size
= _default_chunk_size()
1842 prefix_size
= chunk_size
- 2
1843 u_prefix
= "a" * prefix_size
1844 prefix
= bytes(u_prefix
.encode("utf-8"))
1845 self
.assertEquals(len(u_prefix
), len(prefix
))
1846 u_suffix
= "\u8888\n"
1847 suffix
= bytes(u_suffix
.encode("utf-8"))
1848 line
= prefix
+ suffix
1849 f
= self
.open(support
.TESTFN
, "wb")
1852 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1853 s
= f
.read(prefix_size
)
1854 self
.assertEquals(s
, prefix
.decode("ascii"))
1855 self
.assertEquals(f
.tell(), prefix_size
)
1856 self
.assertEquals(f
.readline(), u_suffix
)
1858 def test_seeking_too(self
):
1859 # Regression test for a specific bug
1860 data
= b
'\xe0\xbf\xbf\n'
1861 f
= self
.open(support
.TESTFN
, "wb")
1864 f
= self
.open(support
.TESTFN
, "r", encoding
="utf-8")
1865 f
._CHUNK
_SIZE
# Just test that it exists
1870 def test_seek_and_tell(self
):
1871 #Test seek/tell using the StatefulIncrementalDecoder.
1872 # Make test faster by doing smaller seeks
1875 def test_seek_and_tell_with_data(data
, min_pos
=0):
1876 """Tell/seek to various points within a data stream and ensure
1877 that the decoded data returned by read() is consistent."""
1878 f
= self
.open(support
.TESTFN
, 'wb')
1881 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1882 f
._CHUNK
_SIZE
= CHUNK_SIZE
1886 for i
in range(min_pos
, len(decoded
) + 1): # seek positions
1887 for j
in [1, 5, len(decoded
) - i
]: # read lengths
1888 f
= self
.open(support
.TESTFN
, encoding
='test_decoder')
1889 self
.assertEquals(f
.read(i
), decoded
[:i
])
1891 self
.assertEquals(f
.read(j
), decoded
[i
:i
+ j
])
1893 self
.assertEquals(f
.read(), decoded
[i
:])
1896 # Enable the test decoder.
1897 StatefulIncrementalDecoder
.codecEnabled
= 1
1901 # Try each test case.
1902 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1903 test_seek_and_tell_with_data(input)
1905 # Position each test case so that it crosses a chunk boundary.
1906 for input, _
, _
in StatefulIncrementalDecoderTest
.test_cases
:
1907 offset
= CHUNK_SIZE
- len(input)//2
1908 prefix
= b
'.'*offset
1909 # Don't bother seeking into the prefix (takes too long).
1911 test_seek_and_tell_with_data(prefix
+ input, min_pos
)
1913 # Ensure our test decoder won't interfere with subsequent tests.
1915 StatefulIncrementalDecoder
.codecEnabled
= 0
1917 def test_encoded_writes(self
):
1925 for encoding
in tests
:
1926 buf
= self
.BytesIO()
1927 f
= self
.TextIOWrapper(buf
, encoding
=encoding
)
1928 # Check if the BOM is written only once (see issue1753).
1932 self
.assertEquals(f
.read(), data
* 2)
1934 self
.assertEquals(f
.read(), data
* 2)
1935 self
.assertEquals(buf
.getvalue(), (data
* 2).encode(encoding
))
1937 def test_unreadable(self
):
1938 class UnReadable(self
.BytesIO
):
1941 txt
= self
.TextIOWrapper(UnReadable())
1942 self
.assertRaises(IOError, txt
.read
)
1944 def test_read_one_by_one(self
):
1945 txt
= self
.TextIOWrapper(self
.BytesIO(b
"AA\r\nBB"))
1952 self
.assertEquals(reads
, "AA\nBB")
1954 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1955 def test_read_by_chunk(self
):
1956 # make sure "\r\n" straddles 128 char boundary.
1957 txt
= self
.TextIOWrapper(self
.BytesIO(b
"A" * 127 + b
"\r\nB"))
1964 self
.assertEquals(reads
, "A"*127+"\nB")
1966 def test_issue1395_1(self
):
1967 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
1969 # read one char at a time
1976 self
.assertEquals(reads
, self
.normalized
)
1978 def test_issue1395_2(self
):
1979 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
1988 self
.assertEquals(reads
, self
.normalized
)
1990 def test_issue1395_3(self
):
1991 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
1995 reads
+= txt
.read(4)
1996 reads
+= txt
.readline()
1997 reads
+= txt
.readline()
1998 reads
+= txt
.readline()
1999 self
.assertEquals(reads
, self
.normalized
)
2001 def test_issue1395_4(self
):
2002 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2007 self
.assertEquals(reads
, self
.normalized
)
2009 def test_issue1395_5(self
):
2010 txt
= self
.TextIOWrapper(self
.BytesIO(self
.testdata
), encoding
="ascii")
2017 self
.assertEquals(txt
.read(4), "BBB\n")
2019 def test_issue2282(self
):
2020 buffer = self
.BytesIO(self
.testdata
)
2021 txt
= self
.TextIOWrapper(buffer, encoding
="ascii")
2023 self
.assertEqual(buffer.seekable(), txt
.seekable())
2025 @unittest.skip("Issue #6213 with incremental encoders")
2026 def test_append_bom(self
):
2027 # The BOM is not written again when appending to a non-empty file
2028 filename
= support
.TESTFN
2029 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2030 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2033 with self
.open(filename
, 'rb') as f
:
2034 self
.assertEquals(f
.read(), 'aaa'.encode(charset
))
2036 with self
.open(filename
, 'a', encoding
=charset
) as f
:
2038 with self
.open(filename
, 'rb') as f
:
2039 self
.assertEquals(f
.read(), 'aaaxxx'.encode(charset
))
2041 @unittest.skip("Issue #6213 with incremental encoders")
2042 def test_seek_bom(self
):
2043 # Same test, but when seeking manually
2044 filename
= support
.TESTFN
2045 for charset
in ('utf-8-sig', 'utf-16', 'utf-32'):
2046 with self
.open(filename
, 'w', encoding
=charset
) as f
:
2049 with self
.open(filename
, 'r+', encoding
=charset
) as f
:
2054 with self
.open(filename
, 'rb') as f
:
2055 self
.assertEquals(f
.read(), 'bbbzzz'.encode(charset
))
2057 def test_errors_property(self
):
2058 with self
.open(support
.TESTFN
, "w") as f
:
2059 self
.assertEqual(f
.errors
, "strict")
2060 with self
.open(support
.TESTFN
, "w", errors
="replace") as f
:
2061 self
.assertEqual(f
.errors
, "replace")
2064 def test_threads_write(self
):
2065 # Issue6750: concurrent writes could duplicate data
2066 event
= threading
.Event()
2067 with self
.open(support
.TESTFN
, "w", buffering
=1) as f
:
2069 text
= "Thread%03d\n" % n
2072 threads
= [threading
.Thread(target
=lambda n
=x
: run(n
))
2080 with self
.open(support
.TESTFN
) as f
:
2083 self
.assertEquals(content
.count("Thread%03d\n" % n
), 1)
2085 class CTextIOWrapperTest(TextIOWrapperTest
):
2087 def test_initialization(self
):
2088 r
= self
.BytesIO(b
"\xc3\xa9\n\n")
2089 b
= self
.BufferedReader(r
, 1000)
2090 t
= self
.TextIOWrapper(b
)
2091 self
.assertRaises(TypeError, t
.__init
__, b
, newline
=42)
2092 self
.assertRaises(ValueError, t
.read
)
2093 self
.assertRaises(ValueError, t
.__init
__, b
, newline
='xyzzy')
2094 self
.assertRaises(ValueError, t
.read
)
2096 def test_garbage_collection(self
):
2097 # C TextIOWrapper objects are collected, and collecting them flushes
2099 # The Python version has __del__, so it ends in gc.garbage instead.
2100 rawio
= io
.FileIO(support
.TESTFN
, "wb")
2101 b
= self
.BufferedWriter(rawio
)
2102 t
= self
.TextIOWrapper(b
, encoding
="ascii")
2107 support
.gc_collect()
2108 self
.assertTrue(wr() is None, wr
)
2109 with self
.open(support
.TESTFN
, "rb") as f
:
2110 self
.assertEqual(f
.read(), b
"456def")
2112 class PyTextIOWrapperTest(TextIOWrapperTest
):
2116 class IncrementalNewlineDecoderTest(unittest
.TestCase
):
2118 def check_newline_decoding_utf8(self
, decoder
):
2119 # UTF-8 specific tests for a newline decoder
2120 def _check_decode(b
, s
, **kwargs
):
2121 # We exercise getstate() / setstate() as well as decode()
2122 state
= decoder
.getstate()
2123 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2124 decoder
.setstate(state
)
2125 self
.assertEquals(decoder
.decode(b
, **kwargs
), s
)
2127 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2129 _check_decode(b
'\xe8', "")
2130 _check_decode(b
'\xa2', "")
2131 _check_decode(b
'\x88', "\u8888")
2133 _check_decode(b
'\xe8', "")
2134 _check_decode(b
'\xa2', "")
2135 _check_decode(b
'\x88', "\u8888")
2137 _check_decode(b
'\xe8', "")
2138 self
.assertRaises(UnicodeDecodeError, decoder
.decode
, b
'', final
=True)
2141 _check_decode(b
'\n', "\n")
2142 _check_decode(b
'\r', "")
2143 _check_decode(b
'', "\n", final
=True)
2144 _check_decode(b
'\r', "\n", final
=True)
2146 _check_decode(b
'\r', "")
2147 _check_decode(b
'a', "\na")
2149 _check_decode(b
'\r\r\n', "\n\n")
2150 _check_decode(b
'\r', "")
2151 _check_decode(b
'\r', "\n")
2152 _check_decode(b
'\na', "\na")
2154 _check_decode(b
'\xe8\xa2\x88\r\n', "\u8888\n")
2155 _check_decode(b
'\xe8\xa2\x88', "\u8888")
2156 _check_decode(b
'\n', "\n")
2157 _check_decode(b
'\xe8\xa2\x88\r', "\u8888")
2158 _check_decode(b
'\n', "\n")
2160 def check_newline_decoding(self
, decoder
, encoding
):
2162 if encoding
is not None:
2163 encoder
= codecs
.getincrementalencoder(encoding
)()
2164 def _decode_bytewise(s
):
2165 # Decode one byte at a time
2166 for b
in encoder
.encode(s
):
2167 result
.append(decoder
.decode(b
))
2170 def _decode_bytewise(s
):
2171 # Decode one char at a time
2173 result
.append(decoder
.decode(c
))
2174 self
.assertEquals(decoder
.newlines
, None)
2175 _decode_bytewise("abc\n\r")
2176 self
.assertEquals(decoder
.newlines
, '\n')
2177 _decode_bytewise("\nabc")
2178 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2179 _decode_bytewise("abc\r")
2180 self
.assertEquals(decoder
.newlines
, ('\n', '\r\n'))
2181 _decode_bytewise("abc")
2182 self
.assertEquals(decoder
.newlines
, ('\r', '\n', '\r\n'))
2183 _decode_bytewise("abc\r")
2184 self
.assertEquals("".join(result
), "abc\n\nabcabc\nabcabc")
2187 if encoder
is not None:
2189 input = encoder
.encode(input)
2190 self
.assertEquals(decoder
.decode(input), "abc")
2191 self
.assertEquals(decoder
.newlines
, None)
2193 def test_newline_decoder(self
):
2195 # None meaning the IncrementalNewlineDecoder takes unicode input
2196 # rather than bytes input
2197 None, 'utf-8', 'latin-1',
2198 'utf-16', 'utf-16-le', 'utf-16-be',
2199 'utf-32', 'utf-32-le', 'utf-32-be',
2201 for enc
in encodings
:
2202 decoder
= enc
and codecs
.getincrementaldecoder(enc
)()
2203 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2204 self
.check_newline_decoding(decoder
, enc
)
2205 decoder
= codecs
.getincrementaldecoder("utf-8")()
2206 decoder
= self
.IncrementalNewlineDecoder(decoder
, translate
=True)
2207 self
.check_newline_decoding_utf8(decoder
)
2209 def test_newline_bytes(self
):
2210 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2212 self
.assertEquals(dec
.newlines
, None)
2213 self
.assertEquals(dec
.decode("\u0D00"), "\u0D00")
2214 self
.assertEquals(dec
.newlines
, None)
2215 self
.assertEquals(dec
.decode("\u0A00"), "\u0A00")
2216 self
.assertEquals(dec
.newlines
, None)
2217 dec
= self
.IncrementalNewlineDecoder(None, translate
=False)
2219 dec
= self
.IncrementalNewlineDecoder(None, translate
=True)
2222 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2225 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest
):
2229 # XXX Tests for open()
2231 class MiscIOTest(unittest
.TestCase
):
2234 support
.unlink(support
.TESTFN
)
2236 def test___all__(self
):
2237 for name
in self
.io
.__all
__:
2238 obj
= getattr(self
.io
, name
, None)
2239 self
.assertTrue(obj
is not None, name
)
2242 elif "error" in name
.lower() or name
== "UnsupportedOperation":
2243 self
.assertTrue(issubclass(obj
, Exception), name
)
2244 elif not name
.startswith("SEEK_"):
2245 self
.assertTrue(issubclass(obj
, self
.IOBase
))
2247 def test_attributes(self
):
2248 f
= self
.open(support
.TESTFN
, "wb", buffering
=0)
2249 self
.assertEquals(f
.mode
, "wb")
2252 f
= self
.open(support
.TESTFN
, "U")
2253 self
.assertEquals(f
.name
, support
.TESTFN
)
2254 self
.assertEquals(f
.buffer.name
, support
.TESTFN
)
2255 self
.assertEquals(f
.buffer.raw
.name
, support
.TESTFN
)
2256 self
.assertEquals(f
.mode
, "U")
2257 self
.assertEquals(f
.buffer.mode
, "rb")
2258 self
.assertEquals(f
.buffer.raw
.mode
, "rb")
2261 f
= self
.open(support
.TESTFN
, "w+")
2262 self
.assertEquals(f
.mode
, "w+")
2263 self
.assertEquals(f
.buffer.mode
, "rb+") # Does it really matter?
2264 self
.assertEquals(f
.buffer.raw
.mode
, "rb+")
2266 g
= self
.open(f
.fileno(), "wb", closefd
=False)
2267 self
.assertEquals(g
.mode
, "wb")
2268 self
.assertEquals(g
.raw
.mode
, "wb")
2269 self
.assertEquals(g
.name
, f
.fileno())
2270 self
.assertEquals(g
.raw
.name
, f
.fileno())
2274 def test_io_after_close(self
):
2278 {"mode": "w", "buffering": 1},
2279 {"mode": "w", "buffering": 2},
2280 {"mode": "wb", "buffering": 0},
2283 {"mode": "r", "buffering": 1},
2284 {"mode": "r", "buffering": 2},
2285 {"mode": "rb", "buffering": 0},
2288 {"mode": "w+", "buffering": 1},
2289 {"mode": "w+", "buffering": 2},
2290 {"mode": "w+b", "buffering": 0},
2292 f
= self
.open(support
.TESTFN
, **kwargs
)
2294 self
.assertRaises(ValueError, f
.flush
)
2295 self
.assertRaises(ValueError, f
.fileno
)
2296 self
.assertRaises(ValueError, f
.isatty
)
2297 self
.assertRaises(ValueError, f
.__iter
__)
2298 if hasattr(f
, "peek"):
2299 self
.assertRaises(ValueError, f
.peek
, 1)
2300 self
.assertRaises(ValueError, f
.read
)
2301 if hasattr(f
, "read1"):
2302 self
.assertRaises(ValueError, f
.read1
, 1024)
2303 if hasattr(f
, "readinto"):
2304 self
.assertRaises(ValueError, f
.readinto
, bytearray(1024))
2305 self
.assertRaises(ValueError, f
.readline
)
2306 self
.assertRaises(ValueError, f
.readlines
)
2307 self
.assertRaises(ValueError, f
.seek
, 0)
2308 self
.assertRaises(ValueError, f
.tell
)
2309 self
.assertRaises(ValueError, f
.truncate
)
2310 self
.assertRaises(ValueError, f
.write
,
2311 b
"" if "b" in kwargs
['mode'] else "")
2312 self
.assertRaises(ValueError, f
.writelines
, [])
2313 self
.assertRaises(ValueError, next
, f
)
2315 def test_blockingioerror(self
):
2316 # Various BlockingIOError issues
2317 self
.assertRaises(TypeError, self
.BlockingIOError
)
2318 self
.assertRaises(TypeError, self
.BlockingIOError
, 1)
2319 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, 2, 3, 4)
2320 self
.assertRaises(TypeError, self
.BlockingIOError
, 1, "", None)
2321 b
= self
.BlockingIOError(1, "")
2322 self
.assertEqual(b
.characters_written
, 0)
2326 b
= self
.BlockingIOError(1, c
)
2331 support
.gc_collect()
2332 self
.assertTrue(wr() is None, wr
)
2334 def test_abcs(self
):
2335 # Test the visible base classes are ABCs.
2336 self
.assertTrue(isinstance(self
.IOBase
, abc
.ABCMeta
))
2337 self
.assertTrue(isinstance(self
.RawIOBase
, abc
.ABCMeta
))
2338 self
.assertTrue(isinstance(self
.BufferedIOBase
, abc
.ABCMeta
))
2339 self
.assertTrue(isinstance(self
.TextIOBase
, abc
.ABCMeta
))
2341 def _check_abc_inheritance(self
, abcmodule
):
2342 with self
.open(support
.TESTFN
, "wb", buffering
=0) as f
:
2343 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2344 self
.assertTrue(isinstance(f
, abcmodule
.RawIOBase
))
2345 self
.assertFalse(isinstance(f
, abcmodule
.BufferedIOBase
))
2346 self
.assertFalse(isinstance(f
, abcmodule
.TextIOBase
))
2347 with self
.open(support
.TESTFN
, "wb") as f
:
2348 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2349 self
.assertFalse(isinstance(f
, abcmodule
.RawIOBase
))
2350 self
.assertTrue(isinstance(f
, abcmodule
.BufferedIOBase
))
2351 self
.assertFalse(isinstance(f
, abcmodule
.TextIOBase
))
2352 with self
.open(support
.TESTFN
, "w") as f
:
2353 self
.assertTrue(isinstance(f
, abcmodule
.IOBase
))
2354 self
.assertFalse(isinstance(f
, abcmodule
.RawIOBase
))
2355 self
.assertFalse(isinstance(f
, abcmodule
.BufferedIOBase
))
2356 self
.assertTrue(isinstance(f
, abcmodule
.TextIOBase
))
2358 def test_abc_inheritance(self
):
2359 # Test implementations inherit from their respective ABCs
2360 self
._check
_abc
_inheritance
(self
)
2362 def test_abc_inheritance_official(self
):
2363 # Test implementations inherit from the official ABCs of the
2364 # baseline "io" module.
2365 self
._check
_abc
_inheritance
(io
)
2367 class CMiscIOTest(MiscIOTest
):
2370 class PyMiscIOTest(MiscIOTest
):
2374 tests
= (CIOTest
, PyIOTest
,
2375 CBufferedReaderTest
, PyBufferedReaderTest
,
2376 CBufferedWriterTest
, PyBufferedWriterTest
,
2377 CBufferedRWPairTest
, PyBufferedRWPairTest
,
2378 CBufferedRandomTest
, PyBufferedRandomTest
,
2379 StatefulIncrementalDecoderTest
,
2380 CIncrementalNewlineDecoderTest
, PyIncrementalNewlineDecoderTest
,
2381 CTextIOWrapperTest
, PyTextIOWrapperTest
,
2382 CMiscIOTest
, PyMiscIOTest
,
2385 # Put the namespaces of the IO module we are testing and some useful mock
2386 # classes in the __dict__ of each test.
2387 mocks
= (MockRawIO
, MisbehavedRawIO
, MockFileIO
, CloseFailureIO
,
2388 MockNonBlockWriterIO
)
2389 all_members
= io
.__all
__ + ["IncrementalNewlineDecoder"]
2390 c_io_ns
= dict((name
, getattr(io
, name
)) for name
in all_members
)
2391 py_io_ns
= dict((name
, getattr(pyio
, name
)) for name
in all_members
)
2393 c_io_ns
.update((x
.__name
__, globs
["C" + x
.__name
__]) for x
in mocks
)
2394 py_io_ns
.update((x
.__name
__, globs
["Py" + x
.__name
__]) for x
in mocks
)
2395 # Avoid turning open into a bound method.
2396 py_io_ns
["open"] = pyio
.OpenWrapper
2398 if test
.__name
__.startswith("C"):
2399 for name
, obj
in c_io_ns
.items():
2400 setattr(test
, name
, obj
)
2401 elif test
.__name
__.startswith("Py"):
2402 for name
, obj
in py_io_ns
.items():
2403 setattr(test
, name
, obj
)
2405 support
.run_unittest(*tests
)
2407 if __name__
== "__main__":