Issue #7051: Clarify behaviour of 'g' and 'G'-style formatting.
[python.git] / Lib / test / test_io.py
blob4288f1ba05cdf24840415e21657010d0ce0804f2
1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__ import print_function
23 from __future__ import unicode_literals
25 import os
26 import sys
27 import time
28 import array
29 import threading
30 import random
31 import unittest
32 import warnings
33 import weakref
34 import gc
35 import abc
36 from itertools import chain, cycle, count
37 from collections import deque
38 from test import test_support as support
40 import codecs
41 import io # C implementation of io
42 import _pyio as pyio # Python implementation of io
44 __metaclass__ = type
45 bytes = support.py3k_bytes
47 def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
53 class MockRawIO:
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
58 self._reads = 0
60 def read(self, n=None):
61 self._reads += 1
62 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
67 def write(self, b):
68 self._write_stack.append(bytes(b))
69 return len(b)
71 def writable(self):
72 return True
74 def fileno(self):
75 return 42
77 def readable(self):
78 return True
80 def seekable(self):
81 return True
83 def seek(self, pos, whence):
84 return 0 # wrong but we gotta return something
86 def tell(self):
87 return 0 # same comment as above
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
109 def truncate(self, pos=None):
110 return pos
112 class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
115 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
119 class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
126 def seek(self, pos, whence):
127 return -123
129 def tell(self):
130 return -456
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
136 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
139 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
143 class CloseFailureIO(MockRawIO):
144 closed = 0
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
151 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
154 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
158 class MockFileIO:
160 def __init__(self, data):
161 self.read_history = []
162 super(MockFileIO, self).__init__(data)
164 def read(self, n=None):
165 res = super(MockFileIO, self).read(n)
166 self.read_history.append(None if res is None else len(res))
167 return res
169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
174 class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
177 class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
181 class MockNonBlockWriterIO:
183 def __init__(self):
184 self._write_stack = []
185 self._blocker_char = None
187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
196 def readable(self):
197 return True
199 def seekable(self):
200 return True
202 def writable(self):
203 return True
205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
220 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
223 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
227 class IOTest(unittest.TestCase):
229 def setUp(self):
230 support.unlink(support.TESTFN)
232 def tearDown(self):
233 support.unlink(support.TESTFN)
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
239 self.assertEqual(f.tell(), 6)
240 self.assertEqual(f.seek(-1, 1), 5)
241 self.assertEqual(f.tell(), 5)
242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"h"), 1)
245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
247 self.assertEqual(f.truncate(12), 12)
248 self.assertEqual(f.tell(), 12)
249 self.assertRaises(TypeError, f.seek, 0.0)
251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
254 data = bytearray(data)
255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
267 self.assertEqual(f.readinto(bytearray()), 0)
268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
271 self.assertRaises(TypeError, f.seek, 0.0)
272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
279 LARGE = 2**31
281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
286 self.assertEqual(f.write(b"xxx"), 3)
287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
289 self.assertEqual(f.truncate(), self.LARGE + 2)
290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
293 self.assertEqual(f.tell(), self.LARGE + 1)
294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
296 self.assertEqual(f.read(2), b"x")
298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
301 with self.open(support.TESTFN, mode) as fp:
302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
304 with self.open(support.TESTFN, "rb") as fp:
305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
307 with self.open(support.TESTFN, "r") as fp:
308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
311 def test_raw_file_io(self):
312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
323 def test_buffered_file_io(self):
324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
335 def test_readline(self):
336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
344 self.assertEqual(f.readline(), b"another line")
345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
349 def test_raw_bytes_io(self):
350 f = self.BytesIO()
351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
354 f = self.BytesIO(data)
355 self.read_ops(f, True)
357 def test_large_file_ops(self):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
362 if not support.is_resource_enabled("largefile"):
363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
378 with self.open(support.TESTFN, "wb", bufsize) as f:
379 f.write(b"xxx")
380 self.assertEqual(f.closed, True)
381 f = None
382 try:
383 with self.open(support.TESTFN, "wb", bufsize) as f:
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
390 # issue 5008
391 def test_append_mode_tell(self):
392 with self.open(support.TESTFN, "wb") as f:
393 f.write(b"xxx")
394 with self.open(support.TESTFN, "ab", buffering=0) as f:
395 self.assertEqual(f.tell(), 3)
396 with self.open(support.TESTFN, "ab") as f:
397 self.assertEqual(f.tell(), 3)
398 with self.open(support.TESTFN, "a") as f:
399 self.assertTrue(f.tell() > 0)
401 def test_destructor(self):
402 record = []
403 class MyFileIO(self.FileIO):
404 def __del__(self):
405 record.append(1)
406 try:
407 f = super(MyFileIO, self).__del__
408 except AttributeError:
409 pass
410 else:
412 def close(self):
413 record.append(2)
414 super(MyFileIO, self).close()
415 def flush(self):
416 record.append(3)
417 super(MyFileIO, self).flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
420 del f
421 support.gc_collect()
422 self.assertEqual(record, [1, 2, 3])
423 with self.open(support.TESTFN, "rb") as f:
424 self.assertEqual(f.read(), b"xxx")
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super(MyIO, self).__del__
441 except AttributeError:
442 pass
443 else:
445 def close(self):
446 record.append(self.on_close)
447 super(MyIO, self).close()
448 def flush(self):
449 record.append(self.on_flush)
450 super(MyIO, self).flush()
451 f = MyIO()
452 del f
453 support.gc_collect()
454 self.assertEqual(record, [1, 2, 3])
456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
468 def test_close_flushes(self):
469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
474 def test_array_writes(self):
475 a = array.array(b'i', range(10))
476 n = len(a.tostring())
477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
482 def test_closefd(self):
483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
484 closefd=False)
486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
488 f.write("egg\n")
489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
500 def test_closefd_attr(self):
501 with self.open(support.TESTFN, "wb") as f:
502 f.write(b"egg\n")
503 with self.open(support.TESTFN, "r") as f:
504 self.assertEqual(f.buffer.raw.closefd, True)
505 file = self.open(f.fileno(), "r", closefd=False)
506 self.assertEqual(file.buffer.raw.closefd, False)
508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
516 support.gc_collect()
517 self.assertTrue(wr() is None, wr)
518 with self.open(support.TESTFN, "rb") as f:
519 self.assertEqual(f.read(), b"abcxxx")
521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
525 self.skipTest("{0} does not exist".format(zero))
526 if sys.maxsize > 0x7FFFFFFF:
527 self.skipTest("test can only run in a 32-bit address space")
528 if support.real_max_memuse < support._2G:
529 self.skipTest("test requires at least 2GB of memory")
530 with self.open(zero, "rb", buffering=0) as f:
531 self.assertRaises(OverflowError, f.read)
532 with self.open(zero, "rb") as f:
533 self.assertRaises(OverflowError, f.read)
534 with self.open(zero, "r") as f:
535 self.assertRaises(OverflowError, f.read)
537 class CIOTest(IOTest):
538 pass
540 class PyIOTest(IOTest):
541 test_array_writes = unittest.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest.test_array_writes)
546 class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
559 self.assertEquals(42, bufio.fileno())
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super(MyBufferedIO, self).__del__
581 except AttributeError:
582 pass
583 else:
585 def close(self):
586 record.append(2)
587 super(MyBufferedIO, self).close()
588 def flush(self):
589 record.append(3)
590 super(MyBufferedIO, self).flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
595 support.gc_collect()
596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
625 self.assertTrue(s.startswith("Exception IOError: "), s)
626 self.assertTrue(s.endswith(" ignored"), s)
628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
639 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
656 def test_read(self):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
659 self.assertEquals(b"abcdef", bufio.read(6))
660 # Invalid args
661 self.assertRaises(ValueError, bufio.read, -2)
663 def test_read1(self):
664 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
665 bufio = self.tp(rawio)
666 self.assertEquals(b"a", bufio.read(1))
667 self.assertEquals(b"b", bufio.read1(1))
668 self.assertEquals(rawio._reads, 1)
669 self.assertEquals(b"c", bufio.read1(100))
670 self.assertEquals(rawio._reads, 1)
671 self.assertEquals(b"d", bufio.read1(100))
672 self.assertEquals(rawio._reads, 2)
673 self.assertEquals(b"efg", bufio.read1(100))
674 self.assertEquals(rawio._reads, 3)
675 self.assertEquals(b"", bufio.read1(100))
676 # Invalid args
677 self.assertRaises(ValueError, bufio.read1, -1)
679 def test_readinto(self):
680 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
681 bufio = self.tp(rawio)
682 b = bytearray(2)
683 self.assertEquals(bufio.readinto(b), 2)
684 self.assertEquals(b, b"ab")
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"cd")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"ef")
689 self.assertEquals(bufio.readinto(b), 1)
690 self.assertEquals(b, b"gf")
691 self.assertEquals(bufio.readinto(b), 0)
692 self.assertEquals(b, b"gf")
694 def test_buffering(self):
695 data = b"abcdefghi"
696 dlen = len(data)
698 tests = [
699 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
700 [ 100, [ 3, 3, 3], [ dlen ] ],
701 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
704 for bufsize, buf_read_sizes, raw_read_sizes in tests:
705 rawio = self.MockFileIO(data)
706 bufio = self.tp(rawio, buffer_size=bufsize)
707 pos = 0
708 for nbytes in buf_read_sizes:
709 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
710 pos += nbytes
711 # this is mildly implementation-dependent
712 self.assertEquals(rawio.read_history, raw_read_sizes)
714 def test_read_non_blocking(self):
715 # Inject some None's in there to simulate EWOULDBLOCK
716 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
717 bufio = self.tp(rawio)
719 self.assertEquals(b"abcd", bufio.read(6))
720 self.assertEquals(b"e", bufio.read(1))
721 self.assertEquals(b"fg", bufio.read())
722 self.assertEquals(b"", bufio.peek(1))
723 self.assertTrue(None is bufio.read())
724 self.assertEquals(b"", bufio.read())
726 def test_read_past_eof(self):
727 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
728 bufio = self.tp(rawio)
730 self.assertEquals(b"abcdefg", bufio.read(9000))
732 def test_read_all(self):
733 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
734 bufio = self.tp(rawio)
736 self.assertEquals(b"abcdefg", bufio.read())
738 def test_threads(self):
739 try:
740 # Write out many bytes with exactly the same number of 0's,
741 # 1's... 255's. This will help us check that concurrent reading
742 # doesn't duplicate or forget contents.
743 N = 1000
744 l = list(range(256)) * N
745 random.shuffle(l)
746 s = bytes(bytearray(l))
747 with self.open(support.TESTFN, "wb") as f:
748 f.write(s)
749 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
750 bufio = self.tp(raw, 8)
751 errors = []
752 results = []
753 def f():
754 try:
755 # Intra-buffer read then buffer-flushing read
756 for n in cycle([1, 19]):
757 s = bufio.read(n)
758 if not s:
759 break
760 # list.append() is atomic
761 results.append(s)
762 except Exception as e:
763 errors.append(e)
764 raise
765 threads = [threading.Thread(target=f) for x in range(20)]
766 for t in threads:
767 t.start()
768 time.sleep(0.02) # yield
769 for t in threads:
770 t.join()
771 self.assertFalse(errors,
772 "the following exceptions were caught: %r" % errors)
773 s = b''.join(results)
774 for i in range(256):
775 c = bytes(bytearray([i]))
776 self.assertEqual(s.count(c), N)
777 finally:
778 support.unlink(support.TESTFN)
780 def test_misbehaved_io(self):
781 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
782 bufio = self.tp(rawio)
783 self.assertRaises(IOError, bufio.seek, 0)
784 self.assertRaises(IOError, bufio.tell)
786 class CBufferedReaderTest(BufferedReaderTest):
787 tp = io.BufferedReader
789 def test_constructor(self):
790 BufferedReaderTest.test_constructor(self)
791 # The allocation can succeed on 32-bit builds, e.g. with more
792 # than 2GB RAM and a 64-bit kernel.
793 if sys.maxsize > 0x7FFFFFFF:
794 rawio = self.MockRawIO()
795 bufio = self.tp(rawio)
796 self.assertRaises((OverflowError, MemoryError, ValueError),
797 bufio.__init__, rawio, sys.maxsize)
799 def test_initialization(self):
800 rawio = self.MockRawIO([b"abc"])
801 bufio = self.tp(rawio)
802 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
803 self.assertRaises(ValueError, bufio.read)
804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
805 self.assertRaises(ValueError, bufio.read)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
807 self.assertRaises(ValueError, bufio.read)
809 def test_misbehaved_io_read(self):
810 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
812 # _pyio.BufferedReader seems to implement reading different, so that
813 # checking this is not so easy.
814 self.assertRaises(IOError, bufio.read, 10)
816 def test_garbage_collection(self):
817 # C BufferedReader objects are collected.
818 # The Python version has __del__, so it ends into gc.garbage instead
819 rawio = self.FileIO(support.TESTFN, "w+b")
820 f = self.tp(rawio)
821 f.f = f
822 wr = weakref.ref(f)
823 del f
824 support.gc_collect()
825 self.assertTrue(wr() is None, wr)
827 class PyBufferedReaderTest(BufferedReaderTest):
828 tp = pyio.BufferedReader
831 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
832 write_mode = "wb"
834 def test_constructor(self):
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio)
837 bufio.__init__(rawio)
838 bufio.__init__(rawio, buffer_size=1024)
839 bufio.__init__(rawio, buffer_size=16)
840 self.assertEquals(3, bufio.write(b"abc"))
841 bufio.flush()
842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
845 bufio.__init__(rawio)
846 self.assertEquals(3, bufio.write(b"ghi"))
847 bufio.flush()
848 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
850 def test_detach_flush(self):
851 raw = self.MockRawIO()
852 buf = self.tp(raw)
853 buf.write(b"howdy!")
854 self.assertFalse(raw._write_stack)
855 buf.detach()
856 self.assertEqual(raw._write_stack, [b"howdy!"])
858 def test_write(self):
859 # Write to the buffered IO but don't overflow the buffer.
860 writer = self.MockRawIO()
861 bufio = self.tp(writer, 8)
862 bufio.write(b"abc")
863 self.assertFalse(writer._write_stack)
865 def test_write_overflow(self):
866 writer = self.MockRawIO()
867 bufio = self.tp(writer, 8)
868 contents = b"abcdefghijklmnop"
869 for n in range(0, len(contents), 3):
870 bufio.write(contents[n:n+3])
871 flushed = b"".join(writer._write_stack)
872 # At least (total - 8) bytes were implicitly flushed, perhaps more
873 # depending on the implementation.
874 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
876 def check_writes(self, intermediate_func):
877 # Lots of writes, test the flushed output is as expected.
878 contents = bytes(range(256)) * 1000
879 n = 0
880 writer = self.MockRawIO()
881 bufio = self.tp(writer, 13)
882 # Generator of write sizes: repeat each N 15 times then proceed to N+1
883 def gen_sizes():
884 for size in count(1):
885 for i in range(15):
886 yield size
887 sizes = gen_sizes()
888 while n < len(contents):
889 size = min(next(sizes), len(contents) - n)
890 self.assertEquals(bufio.write(contents[n:n+size]), size)
891 intermediate_func(bufio)
892 n += size
893 bufio.flush()
894 self.assertEquals(contents,
895 b"".join(writer._write_stack))
897 def test_writes(self):
898 self.check_writes(lambda bufio: None)
900 def test_writes_and_flushes(self):
901 self.check_writes(lambda bufio: bufio.flush())
903 def test_writes_and_seeks(self):
904 def _seekabs(bufio):
905 pos = bufio.tell()
906 bufio.seek(pos + 1, 0)
907 bufio.seek(pos - 1, 0)
908 bufio.seek(pos, 0)
909 self.check_writes(_seekabs)
910 def _seekrel(bufio):
911 pos = bufio.seek(0, 1)
912 bufio.seek(+1, 1)
913 bufio.seek(-1, 1)
914 bufio.seek(pos, 0)
915 self.check_writes(_seekrel)
917 def test_writes_and_truncates(self):
918 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
920 def test_write_non_blocking(self):
921 raw = self.MockNonBlockWriterIO()
922 bufio = self.tp(raw, 8)
924 self.assertEquals(bufio.write(b"abcd"), 4)
925 self.assertEquals(bufio.write(b"efghi"), 5)
926 # 1 byte will be written, the rest will be buffered
927 raw.block_on(b"k")
928 self.assertEquals(bufio.write(b"jklmn"), 5)
930 # 8 bytes will be written, 8 will be buffered and the rest will be lost
931 raw.block_on(b"0")
932 try:
933 bufio.write(b"opqrwxyz0123456789")
934 except self.BlockingIOError as e:
935 written = e.characters_written
936 else:
937 self.fail("BlockingIOError should have been raised")
938 self.assertEquals(written, 16)
939 self.assertEquals(raw.pop_written(),
940 b"abcdefghijklmnopqrwxyz")
942 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
943 s = raw.pop_written()
944 # Previously buffered bytes were flushed
945 self.assertTrue(s.startswith(b"01234567A"), s)
947 def test_write_and_rewind(self):
948 raw = io.BytesIO()
949 bufio = self.tp(raw, 4)
950 self.assertEqual(bufio.write(b"abcdef"), 6)
951 self.assertEqual(bufio.tell(), 6)
952 bufio.seek(0, 0)
953 self.assertEqual(bufio.write(b"XY"), 2)
954 bufio.seek(6, 0)
955 self.assertEqual(raw.getvalue(), b"XYcdef")
956 self.assertEqual(bufio.write(b"123456"), 6)
957 bufio.flush()
958 self.assertEqual(raw.getvalue(), b"XYcdef123456")
960 def test_flush(self):
961 writer = self.MockRawIO()
962 bufio = self.tp(writer, 8)
963 bufio.write(b"abc")
964 bufio.flush()
965 self.assertEquals(b"abc", writer._write_stack[0])
967 def test_destructor(self):
968 writer = self.MockRawIO()
969 bufio = self.tp(writer, 8)
970 bufio.write(b"abc")
971 del bufio
972 support.gc_collect()
973 self.assertEquals(b"abc", writer._write_stack[0])
975 def test_truncate(self):
976 # Truncate implicitly flushes the buffer.
977 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
978 bufio = self.tp(raw, 8)
979 bufio.write(b"abcdef")
980 self.assertEqual(bufio.truncate(3), 3)
981 self.assertEqual(bufio.tell(), 3)
982 with self.open(support.TESTFN, "rb", buffering=0) as f:
983 self.assertEqual(f.read(), b"abc")
985 def test_threads(self):
986 try:
987 # Write out many bytes from many threads and test they were
988 # all flushed.
989 N = 1000
990 contents = bytes(range(256)) * N
991 sizes = cycle([1, 19])
992 n = 0
993 queue = deque()
994 while n < len(contents):
995 size = next(sizes)
996 queue.append(contents[n:n+size])
997 n += size
998 del contents
999 # We use a real file object because it allows us to
1000 # exercise situations where the GIL is released before
1001 # writing the buffer to the raw streams. This is in addition
1002 # to concurrency issues due to switching threads in the middle
1003 # of Python code.
1004 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1005 bufio = self.tp(raw, 8)
1006 errors = []
1007 def f():
1008 try:
1009 while True:
1010 try:
1011 s = queue.popleft()
1012 except IndexError:
1013 return
1014 bufio.write(s)
1015 except Exception as e:
1016 errors.append(e)
1017 raise
1018 threads = [threading.Thread(target=f) for x in range(20)]
1019 for t in threads:
1020 t.start()
1021 time.sleep(0.02) # yield
1022 for t in threads:
1023 t.join()
1024 self.assertFalse(errors,
1025 "the following exceptions were caught: %r" % errors)
1026 bufio.close()
1027 with self.open(support.TESTFN, "rb") as f:
1028 s = f.read()
1029 for i in range(256):
1030 self.assertEquals(s.count(bytes([i])), N)
1031 finally:
1032 support.unlink(support.TESTFN)
1034 def test_misbehaved_io(self):
1035 rawio = self.MisbehavedRawIO()
1036 bufio = self.tp(rawio, 5)
1037 self.assertRaises(IOError, bufio.seek, 0)
1038 self.assertRaises(IOError, bufio.tell)
1039 self.assertRaises(IOError, bufio.write, b"abcdef")
1041 def test_max_buffer_size_deprecation(self):
1042 with support.check_warnings() as w:
1043 warnings.simplefilter("always", DeprecationWarning)
1044 self.tp(self.MockRawIO(), 8, 12)
1045 self.assertEqual(len(w.warnings), 1)
1046 warning = w.warnings[0]
1047 self.assertTrue(warning.category is DeprecationWarning)
1048 self.assertEqual(str(warning.message),
1049 "max_buffer_size is deprecated")
1052 class CBufferedWriterTest(BufferedWriterTest):
1053 tp = io.BufferedWriter
1055 def test_constructor(self):
1056 BufferedWriterTest.test_constructor(self)
1057 # The allocation can succeed on 32-bit builds, e.g. with more
1058 # than 2GB RAM and a 64-bit kernel.
1059 if sys.maxsize > 0x7FFFFFFF:
1060 rawio = self.MockRawIO()
1061 bufio = self.tp(rawio)
1062 self.assertRaises((OverflowError, MemoryError, ValueError),
1063 bufio.__init__, rawio, sys.maxsize)
1065 def test_initialization(self):
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1069 self.assertRaises(ValueError, bufio.write, b"def")
1070 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1071 self.assertRaises(ValueError, bufio.write, b"def")
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1073 self.assertRaises(ValueError, bufio.write, b"def")
1075 def test_garbage_collection(self):
1076 # C BufferedWriter objects are collected, and collecting them flushes
1077 # all data to disk.
1078 # The Python version has __del__, so it ends into gc.garbage instead
1079 rawio = self.FileIO(support.TESTFN, "w+b")
1080 f = self.tp(rawio)
1081 f.write(b"123xxx")
1082 f.x = f
1083 wr = weakref.ref(f)
1084 del f
1085 support.gc_collect()
1086 self.assertTrue(wr() is None, wr)
1087 with self.open(support.TESTFN, "rb") as f:
1088 self.assertEqual(f.read(), b"123xxx")
1091 class PyBufferedWriterTest(BufferedWriterTest):
1092 tp = pyio.BufferedWriter
1094 class BufferedRWPairTest(unittest.TestCase):
1096 def test_constructor(self):
1097 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1098 self.assertFalse(pair.closed)
1100 def test_detach(self):
1101 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1102 self.assertRaises(self.UnsupportedOperation, pair.detach)
1104 def test_constructor_max_buffer_size_deprecation(self):
1105 with support.check_warnings() as w:
1106 warnings.simplefilter("always", DeprecationWarning)
1107 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1108 self.assertEqual(len(w.warnings), 1)
1109 warning = w.warnings[0]
1110 self.assertTrue(warning.category is DeprecationWarning)
1111 self.assertEqual(str(warning.message),
1112 "max_buffer_size is deprecated")
1114 def test_constructor_with_not_readable(self):
1115 class NotReadable(MockRawIO):
1116 def readable(self):
1117 return False
1119 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1121 def test_constructor_with_not_writeable(self):
1122 class NotWriteable(MockRawIO):
1123 def writable(self):
1124 return False
1126 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1128 def test_read(self):
1129 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1131 self.assertEqual(pair.read(3), b"abc")
1132 self.assertEqual(pair.read(1), b"d")
1133 self.assertEqual(pair.read(), b"ef")
1135 def test_read1(self):
1136 # .read1() is delegated to the underlying reader object, so this test
1137 # can be shallow.
1138 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1140 self.assertEqual(pair.read1(3), b"abc")
1142 def test_readinto(self):
1143 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1145 data = bytearray(5)
1146 self.assertEqual(pair.readinto(data), 5)
1147 self.assertEqual(data, b"abcde")
1149 def test_write(self):
1150 w = self.MockRawIO()
1151 pair = self.tp(self.MockRawIO(), w)
1153 pair.write(b"abc")
1154 pair.flush()
1155 pair.write(b"def")
1156 pair.flush()
1157 self.assertEqual(w._write_stack, [b"abc", b"def"])
1159 def test_peek(self):
1160 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1162 self.assertTrue(pair.peek(3).startswith(b"abc"))
1163 self.assertEqual(pair.read(3), b"abc")
1165 def test_readable(self):
1166 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1167 self.assertTrue(pair.readable())
1169 def test_writeable(self):
1170 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1171 self.assertTrue(pair.writable())
1173 def test_seekable(self):
1174 # BufferedRWPairs are never seekable, even if their readers and writers
1175 # are.
1176 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1177 self.assertFalse(pair.seekable())
1179 # .flush() is delegated to the underlying writer object and has been
1180 # tested in the test_write method.
1182 def test_close_and_closed(self):
1183 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1184 self.assertFalse(pair.closed)
1185 pair.close()
1186 self.assertTrue(pair.closed)
1188 def test_isatty(self):
1189 class SelectableIsAtty(MockRawIO):
1190 def __init__(self, isatty):
1191 MockRawIO.__init__(self)
1192 self._isatty = isatty
1194 def isatty(self):
1195 return self._isatty
1197 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1198 self.assertFalse(pair.isatty())
1200 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1201 self.assertTrue(pair.isatty())
1203 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1204 self.assertTrue(pair.isatty())
1206 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1207 self.assertTrue(pair.isatty())
1209 class CBufferedRWPairTest(BufferedRWPairTest):
1210 tp = io.BufferedRWPair
1212 class PyBufferedRWPairTest(BufferedRWPairTest):
1213 tp = pyio.BufferedRWPair
1216 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1217 read_mode = "rb+"
1218 write_mode = "wb+"
1220 def test_constructor(self):
1221 BufferedReaderTest.test_constructor(self)
1222 BufferedWriterTest.test_constructor(self)
1224 def test_read_and_write(self):
1225 raw = self.MockRawIO((b"asdf", b"ghjk"))
1226 rw = self.tp(raw, 8)
1228 self.assertEqual(b"as", rw.read(2))
1229 rw.write(b"ddd")
1230 rw.write(b"eee")
1231 self.assertFalse(raw._write_stack) # Buffer writes
1232 self.assertEqual(b"ghjk", rw.read())
1233 self.assertEquals(b"dddeee", raw._write_stack[0])
1235 def test_seek_and_tell(self):
1236 raw = self.BytesIO(b"asdfghjkl")
1237 rw = self.tp(raw)
1239 self.assertEquals(b"as", rw.read(2))
1240 self.assertEquals(2, rw.tell())
1241 rw.seek(0, 0)
1242 self.assertEquals(b"asdf", rw.read(4))
1244 rw.write(b"asdf")
1245 rw.seek(0, 0)
1246 self.assertEquals(b"asdfasdfl", rw.read())
1247 self.assertEquals(9, rw.tell())
1248 rw.seek(-4, 2)
1249 self.assertEquals(5, rw.tell())
1250 rw.seek(2, 1)
1251 self.assertEquals(7, rw.tell())
1252 self.assertEquals(b"fl", rw.read(11))
1253 self.assertRaises(TypeError, rw.seek, 0.0)
1255 def check_flush_and_read(self, read_func):
1256 raw = self.BytesIO(b"abcdefghi")
1257 bufio = self.tp(raw)
1259 self.assertEquals(b"ab", read_func(bufio, 2))
1260 bufio.write(b"12")
1261 self.assertEquals(b"ef", read_func(bufio, 2))
1262 self.assertEquals(6, bufio.tell())
1263 bufio.flush()
1264 self.assertEquals(6, bufio.tell())
1265 self.assertEquals(b"ghi", read_func(bufio))
1266 raw.seek(0, 0)
1267 raw.write(b"XYZ")
1268 # flush() resets the read buffer
1269 bufio.flush()
1270 bufio.seek(0, 0)
1271 self.assertEquals(b"XYZ", read_func(bufio, 3))
1273 def test_flush_and_read(self):
1274 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1276 def test_flush_and_readinto(self):
1277 def _readinto(bufio, n=-1):
1278 b = bytearray(n if n >= 0 else 9999)
1279 n = bufio.readinto(b)
1280 return bytes(b[:n])
1281 self.check_flush_and_read(_readinto)
1283 def test_flush_and_peek(self):
1284 def _peek(bufio, n=-1):
1285 # This relies on the fact that the buffer can contain the whole
1286 # raw stream, otherwise peek() can return less.
1287 b = bufio.peek(n)
1288 if n != -1:
1289 b = b[:n]
1290 bufio.seek(len(b), 1)
1291 return b
1292 self.check_flush_and_read(_peek)
1294 def test_flush_and_write(self):
1295 raw = self.BytesIO(b"abcdefghi")
1296 bufio = self.tp(raw)
1298 bufio.write(b"123")
1299 bufio.flush()
1300 bufio.write(b"45")
1301 bufio.flush()
1302 bufio.seek(0, 0)
1303 self.assertEquals(b"12345fghi", raw.getvalue())
1304 self.assertEquals(b"12345fghi", bufio.read())
1306 def test_threads(self):
1307 BufferedReaderTest.test_threads(self)
1308 BufferedWriterTest.test_threads(self)
1310 def test_writes_and_peek(self):
1311 def _peek(bufio):
1312 bufio.peek(1)
1313 self.check_writes(_peek)
1314 def _peek(bufio):
1315 pos = bufio.tell()
1316 bufio.seek(-1, 1)
1317 bufio.peek(1)
1318 bufio.seek(pos, 0)
1319 self.check_writes(_peek)
1321 def test_writes_and_reads(self):
1322 def _read(bufio):
1323 bufio.seek(-1, 1)
1324 bufio.read(1)
1325 self.check_writes(_read)
1327 def test_writes_and_read1s(self):
1328 def _read1(bufio):
1329 bufio.seek(-1, 1)
1330 bufio.read1(1)
1331 self.check_writes(_read1)
1333 def test_writes_and_readintos(self):
1334 def _read(bufio):
1335 bufio.seek(-1, 1)
1336 bufio.readinto(bytearray(1))
1337 self.check_writes(_read)
1339 def test_write_after_readahead(self):
1340 # Issue #6629: writing after the buffer was filled by readahead should
1341 # first rewind the raw stream.
1342 for overwrite_size in [1, 5]:
1343 raw = self.BytesIO(b"A" * 10)
1344 bufio = self.tp(raw, 4)
1345 # Trigger readahead
1346 self.assertEqual(bufio.read(1), b"A")
1347 self.assertEqual(bufio.tell(), 1)
1348 # Overwriting should rewind the raw stream if it needs so
1349 bufio.write(b"B" * overwrite_size)
1350 self.assertEqual(bufio.tell(), overwrite_size + 1)
1351 # If the write size was smaller than the buffer size, flush() and
1352 # check that rewind happens.
1353 bufio.flush()
1354 self.assertEqual(bufio.tell(), overwrite_size + 1)
1355 s = raw.getvalue()
1356 self.assertEqual(s,
1357 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1359 def test_misbehaved_io(self):
1360 BufferedReaderTest.test_misbehaved_io(self)
1361 BufferedWriterTest.test_misbehaved_io(self)
1363 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1364 tp = io.BufferedRandom
1366 def test_constructor(self):
1367 BufferedRandomTest.test_constructor(self)
1368 # The allocation can succeed on 32-bit builds, e.g. with more
1369 # than 2GB RAM and a 64-bit kernel.
1370 if sys.maxsize > 0x7FFFFFFF:
1371 rawio = self.MockRawIO()
1372 bufio = self.tp(rawio)
1373 self.assertRaises((OverflowError, MemoryError, ValueError),
1374 bufio.__init__, rawio, sys.maxsize)
1376 def test_garbage_collection(self):
1377 CBufferedReaderTest.test_garbage_collection(self)
1378 CBufferedWriterTest.test_garbage_collection(self)
1380 class PyBufferedRandomTest(BufferedRandomTest):
1381 tp = pyio.BufferedRandom
1384 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1385 # properties:
1386 # - A single output character can correspond to many bytes of input.
1387 # - The number of input bytes to complete the character can be
1388 # undetermined until the last input byte is received.
1389 # - The number of input bytes can vary depending on previous input.
1390 # - A single input byte can correspond to many characters of output.
1391 # - The number of output characters can be undetermined until the
1392 # last input byte is received.
1393 # - The number of output characters can vary depending on previous input.
1395 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1397 For testing seek/tell behavior with a stateful, buffering decoder.
1399 Input is a sequence of words. Words may be fixed-length (length set
1400 by input) or variable-length (period-terminated). In variable-length
1401 mode, extra periods are ignored. Possible words are:
1402 - 'i' followed by a number sets the input length, I (maximum 99).
1403 When I is set to 0, words are space-terminated.
1404 - 'o' followed by a number sets the output length, O (maximum 99).
1405 - Any other word is converted into a word followed by a period on
1406 the output. The output word consists of the input word truncated
1407 or padded out with hyphens to make its length equal to O. If O
1408 is 0, the word is output verbatim without truncating or padding.
1409 I and O are initially set to 1. When I changes, any buffered input is
1410 re-scanned according to the new I. EOF also terminates the last word.
1413 def __init__(self, errors='strict'):
1414 codecs.IncrementalDecoder.__init__(self, errors)
1415 self.reset()
1417 def __repr__(self):
1418 return '<SID %x>' % id(self)
1420 def reset(self):
1421 self.i = 1
1422 self.o = 1
1423 self.buffer = bytearray()
1425 def getstate(self):
1426 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1427 return bytes(self.buffer), i*100 + o
1429 def setstate(self, state):
1430 buffer, io = state
1431 self.buffer = bytearray(buffer)
1432 i, o = divmod(io, 100)
1433 self.i, self.o = i ^ 1, o ^ 1
1435 def decode(self, input, final=False):
1436 output = ''
1437 for b in input:
1438 if self.i == 0: # variable-length, terminated with period
1439 if b == '.':
1440 if self.buffer:
1441 output += self.process_word()
1442 else:
1443 self.buffer.append(b)
1444 else: # fixed-length, terminate after self.i bytes
1445 self.buffer.append(b)
1446 if len(self.buffer) == self.i:
1447 output += self.process_word()
1448 if final and self.buffer: # EOF terminates the last word
1449 output += self.process_word()
1450 return output
1452 def process_word(self):
1453 output = ''
1454 if self.buffer[0] == ord('i'):
1455 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1456 elif self.buffer[0] == ord('o'):
1457 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1458 else:
1459 output = self.buffer.decode('ascii')
1460 if len(output) < self.o:
1461 output += '-'*self.o # pad out with hyphens
1462 if self.o:
1463 output = output[:self.o] # truncate to output length
1464 output += '.'
1465 self.buffer = bytearray()
1466 return output
1468 codecEnabled = False
1470 @classmethod
1471 def lookupTestDecoder(cls, name):
1472 if cls.codecEnabled and name == 'test_decoder':
1473 latin1 = codecs.lookup('latin-1')
1474 return codecs.CodecInfo(
1475 name='test_decoder', encode=latin1.encode, decode=None,
1476 incrementalencoder=None,
1477 streamreader=None, streamwriter=None,
1478 incrementaldecoder=cls)
1480 # Register the previous decoder for testing.
1481 # Disabled by default, tests will enable it.
1482 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1485 class StatefulIncrementalDecoderTest(unittest.TestCase):
1487 Make sure the StatefulIncrementalDecoder actually works.
1490 test_cases = [
1491 # I=1, O=1 (fixed-length input == fixed-length output)
1492 (b'abcd', False, 'a.b.c.d.'),
1493 # I=0, O=0 (variable-length input, variable-length output)
1494 (b'oiabcd', True, 'abcd.'),
1495 # I=0, O=0 (should ignore extra periods)
1496 (b'oi...abcd...', True, 'abcd.'),
1497 # I=0, O=6 (variable-length input, fixed-length output)
1498 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1499 # I=2, O=6 (fixed-length input < fixed-length output)
1500 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1501 # I=6, O=3 (fixed-length input > fixed-length output)
1502 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1503 # I=0, then 3; O=29, then 15 (with longer output)
1504 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1505 'a----------------------------.' +
1506 'b----------------------------.' +
1507 'cde--------------------------.' +
1508 'abcdefghijabcde.' +
1509 'a.b------------.' +
1510 '.c.------------.' +
1511 'd.e------------.' +
1512 'k--------------.' +
1513 'l--------------.' +
1514 'm--------------.')
1517 def test_decoder(self):
1518 # Try a few one-shot test cases.
1519 for input, eof, output in self.test_cases:
1520 d = StatefulIncrementalDecoder()
1521 self.assertEquals(d.decode(input, eof), output)
1523 # Also test an unfinished decode, followed by forcing EOF.
1524 d = StatefulIncrementalDecoder()
1525 self.assertEquals(d.decode(b'oiabcd'), '')
1526 self.assertEquals(d.decode(b'', 1), 'abcd.')
1528 class TextIOWrapperTest(unittest.TestCase):
1530 def setUp(self):
1531 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1532 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1533 support.unlink(support.TESTFN)
1535 def tearDown(self):
1536 support.unlink(support.TESTFN)
1538 def test_constructor(self):
1539 r = self.BytesIO(b"\xc3\xa9\n\n")
1540 b = self.BufferedReader(r, 1000)
1541 t = self.TextIOWrapper(b)
1542 t.__init__(b, encoding="latin1", newline="\r\n")
1543 self.assertEquals(t.encoding, "latin1")
1544 self.assertEquals(t.line_buffering, False)
1545 t.__init__(b, encoding="utf8", line_buffering=True)
1546 self.assertEquals(t.encoding, "utf8")
1547 self.assertEquals(t.line_buffering, True)
1548 self.assertEquals("\xe9\n", t.readline())
1549 self.assertRaises(TypeError, t.__init__, b, newline=42)
1550 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1552 def test_detach(self):
1553 r = self.BytesIO()
1554 b = self.BufferedWriter(r)
1555 t = self.TextIOWrapper(b)
1556 self.assertIs(t.detach(), b)
1558 t = self.TextIOWrapper(b, encoding="ascii")
1559 t.write("howdy")
1560 self.assertFalse(r.getvalue())
1561 t.detach()
1562 self.assertEqual(r.getvalue(), b"howdy")
1563 self.assertRaises(ValueError, t.detach)
1565 def test_repr(self):
1566 raw = self.BytesIO("hello".encode("utf-8"))
1567 b = self.BufferedReader(raw)
1568 t = self.TextIOWrapper(b, encoding="utf-8")
1569 modname = self.TextIOWrapper.__module__
1570 self.assertEqual(repr(t),
1571 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1572 raw.name = "dummy"
1573 self.assertEqual(repr(t),
1574 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1575 raw.name = b"dummy"
1576 self.assertEqual(repr(t),
1577 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1579 def test_line_buffering(self):
1580 r = self.BytesIO()
1581 b = self.BufferedWriter(r, 1000)
1582 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1583 t.write("X")
1584 self.assertEquals(r.getvalue(), b"") # No flush happened
1585 t.write("Y\nZ")
1586 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1587 t.write("A\rB")
1588 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1590 def test_encoding(self):
1591 # Check the encoding attribute is always set, and valid
1592 b = self.BytesIO()
1593 t = self.TextIOWrapper(b, encoding="utf8")
1594 self.assertEqual(t.encoding, "utf8")
1595 t = self.TextIOWrapper(b)
1596 self.assertTrue(t.encoding is not None)
1597 codecs.lookup(t.encoding)
1599 def test_encoding_errors_reading(self):
1600 # (1) default
1601 b = self.BytesIO(b"abc\n\xff\n")
1602 t = self.TextIOWrapper(b, encoding="ascii")
1603 self.assertRaises(UnicodeError, t.read)
1604 # (2) explicit strict
1605 b = self.BytesIO(b"abc\n\xff\n")
1606 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1607 self.assertRaises(UnicodeError, t.read)
1608 # (3) ignore
1609 b = self.BytesIO(b"abc\n\xff\n")
1610 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1611 self.assertEquals(t.read(), "abc\n\n")
1612 # (4) replace
1613 b = self.BytesIO(b"abc\n\xff\n")
1614 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1615 self.assertEquals(t.read(), "abc\n\ufffd\n")
1617 def test_encoding_errors_writing(self):
1618 # (1) default
1619 b = self.BytesIO()
1620 t = self.TextIOWrapper(b, encoding="ascii")
1621 self.assertRaises(UnicodeError, t.write, "\xff")
1622 # (2) explicit strict
1623 b = self.BytesIO()
1624 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1625 self.assertRaises(UnicodeError, t.write, "\xff")
1626 # (3) ignore
1627 b = self.BytesIO()
1628 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1629 newline="\n")
1630 t.write("abc\xffdef\n")
1631 t.flush()
1632 self.assertEquals(b.getvalue(), b"abcdef\n")
1633 # (4) replace
1634 b = self.BytesIO()
1635 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1636 newline="\n")
1637 t.write("abc\xffdef\n")
1638 t.flush()
1639 self.assertEquals(b.getvalue(), b"abc?def\n")
1641 def test_newlines(self):
1642 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1644 tests = [
1645 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1646 [ '', input_lines ],
1647 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1648 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1649 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1651 encodings = (
1652 'utf-8', 'latin-1',
1653 'utf-16', 'utf-16-le', 'utf-16-be',
1654 'utf-32', 'utf-32-le', 'utf-32-be',
1657 # Try a range of buffer sizes to test the case where \r is the last
1658 # character in TextIOWrapper._pending_line.
1659 for encoding in encodings:
1660 # XXX: str.encode() should return bytes
1661 data = bytes(''.join(input_lines).encode(encoding))
1662 for do_reads in (False, True):
1663 for bufsize in range(1, 10):
1664 for newline, exp_lines in tests:
1665 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1666 textio = self.TextIOWrapper(bufio, newline=newline,
1667 encoding=encoding)
1668 if do_reads:
1669 got_lines = []
1670 while True:
1671 c2 = textio.read(2)
1672 if c2 == '':
1673 break
1674 self.assertEquals(len(c2), 2)
1675 got_lines.append(c2 + textio.readline())
1676 else:
1677 got_lines = list(textio)
1679 for got_line, exp_line in zip(got_lines, exp_lines):
1680 self.assertEquals(got_line, exp_line)
1681 self.assertEquals(len(got_lines), len(exp_lines))
1683 def test_newlines_input(self):
1684 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1685 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1686 for newline, expected in [
1687 (None, normalized.decode("ascii").splitlines(True)),
1688 ("", testdata.decode("ascii").splitlines(True)),
1689 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1690 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1691 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1693 buf = self.BytesIO(testdata)
1694 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1695 self.assertEquals(txt.readlines(), expected)
1696 txt.seek(0)
1697 self.assertEquals(txt.read(), "".join(expected))
1699 def test_newlines_output(self):
1700 testdict = {
1701 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1702 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1703 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1704 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1706 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1707 for newline, expected in tests:
1708 buf = self.BytesIO()
1709 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1710 txt.write("AAA\nB")
1711 txt.write("BB\nCCC\n")
1712 txt.write("X\rY\r\nZ")
1713 txt.flush()
1714 self.assertEquals(buf.closed, False)
1715 self.assertEquals(buf.getvalue(), expected)
1717 def test_destructor(self):
1718 l = []
1719 base = self.BytesIO
1720 class MyBytesIO(base):
1721 def close(self):
1722 l.append(self.getvalue())
1723 base.close(self)
1724 b = MyBytesIO()
1725 t = self.TextIOWrapper(b, encoding="ascii")
1726 t.write("abc")
1727 del t
1728 support.gc_collect()
1729 self.assertEquals([b"abc"], l)
1731 def test_override_destructor(self):
1732 record = []
1733 class MyTextIO(self.TextIOWrapper):
1734 def __del__(self):
1735 record.append(1)
1736 try:
1737 f = super(MyTextIO, self).__del__
1738 except AttributeError:
1739 pass
1740 else:
1742 def close(self):
1743 record.append(2)
1744 super(MyTextIO, self).close()
1745 def flush(self):
1746 record.append(3)
1747 super(MyTextIO, self).flush()
1748 b = self.BytesIO()
1749 t = MyTextIO(b, encoding="ascii")
1750 del t
1751 support.gc_collect()
1752 self.assertEqual(record, [1, 2, 3])
1754 def test_error_through_destructor(self):
1755 # Test that the exception state is not modified by a destructor,
1756 # even if close() fails.
1757 rawio = self.CloseFailureIO()
1758 def f():
1759 self.TextIOWrapper(rawio).xyzzy
1760 with support.captured_output("stderr") as s:
1761 self.assertRaises(AttributeError, f)
1762 s = s.getvalue().strip()
1763 if s:
1764 # The destructor *may* have printed an unraisable error, check it
1765 self.assertEqual(len(s.splitlines()), 1)
1766 self.assertTrue(s.startswith("Exception IOError: "), s)
1767 self.assertTrue(s.endswith(" ignored"), s)
1769 # Systematic tests of the text I/O API
1771 def test_basic_io(self):
1772 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1773 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1774 f = self.open(support.TESTFN, "w+", encoding=enc)
1775 f._CHUNK_SIZE = chunksize
1776 self.assertEquals(f.write("abc"), 3)
1777 f.close()
1778 f = self.open(support.TESTFN, "r+", encoding=enc)
1779 f._CHUNK_SIZE = chunksize
1780 self.assertEquals(f.tell(), 0)
1781 self.assertEquals(f.read(), "abc")
1782 cookie = f.tell()
1783 self.assertEquals(f.seek(0), 0)
1784 self.assertEquals(f.read(2), "ab")
1785 self.assertEquals(f.read(1), "c")
1786 self.assertEquals(f.read(1), "")
1787 self.assertEquals(f.read(), "")
1788 self.assertEquals(f.tell(), cookie)
1789 self.assertEquals(f.seek(0), 0)
1790 self.assertEquals(f.seek(0, 2), cookie)
1791 self.assertEquals(f.write("def"), 3)
1792 self.assertEquals(f.seek(cookie), cookie)
1793 self.assertEquals(f.read(), "def")
1794 if enc.startswith("utf"):
1795 self.multi_line_test(f, enc)
1796 f.close()
1798 def multi_line_test(self, f, enc):
1799 f.seek(0)
1800 f.truncate()
1801 sample = "s\xff\u0fff\uffff"
1802 wlines = []
1803 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1804 chars = []
1805 for i in range(size):
1806 chars.append(sample[i % len(sample)])
1807 line = "".join(chars) + "\n"
1808 wlines.append((f.tell(), line))
1809 f.write(line)
1810 f.seek(0)
1811 rlines = []
1812 while True:
1813 pos = f.tell()
1814 line = f.readline()
1815 if not line:
1816 break
1817 rlines.append((pos, line))
1818 self.assertEquals(rlines, wlines)
1820 def test_telling(self):
1821 f = self.open(support.TESTFN, "w+", encoding="utf8")
1822 p0 = f.tell()
1823 f.write("\xff\n")
1824 p1 = f.tell()
1825 f.write("\xff\n")
1826 p2 = f.tell()
1827 f.seek(0)
1828 self.assertEquals(f.tell(), p0)
1829 self.assertEquals(f.readline(), "\xff\n")
1830 self.assertEquals(f.tell(), p1)
1831 self.assertEquals(f.readline(), "\xff\n")
1832 self.assertEquals(f.tell(), p2)
1833 f.seek(0)
1834 for line in f:
1835 self.assertEquals(line, "\xff\n")
1836 self.assertRaises(IOError, f.tell)
1837 self.assertEquals(f.tell(), p2)
1838 f.close()
1840 def test_seeking(self):
1841 chunk_size = _default_chunk_size()
1842 prefix_size = chunk_size - 2
1843 u_prefix = "a" * prefix_size
1844 prefix = bytes(u_prefix.encode("utf-8"))
1845 self.assertEquals(len(u_prefix), len(prefix))
1846 u_suffix = "\u8888\n"
1847 suffix = bytes(u_suffix.encode("utf-8"))
1848 line = prefix + suffix
1849 f = self.open(support.TESTFN, "wb")
1850 f.write(line*2)
1851 f.close()
1852 f = self.open(support.TESTFN, "r", encoding="utf-8")
1853 s = f.read(prefix_size)
1854 self.assertEquals(s, prefix.decode("ascii"))
1855 self.assertEquals(f.tell(), prefix_size)
1856 self.assertEquals(f.readline(), u_suffix)
1858 def test_seeking_too(self):
1859 # Regression test for a specific bug
1860 data = b'\xe0\xbf\xbf\n'
1861 f = self.open(support.TESTFN, "wb")
1862 f.write(data)
1863 f.close()
1864 f = self.open(support.TESTFN, "r", encoding="utf-8")
1865 f._CHUNK_SIZE # Just test that it exists
1866 f._CHUNK_SIZE = 2
1867 f.readline()
1868 f.tell()
1870 def test_seek_and_tell(self):
1871 #Test seek/tell using the StatefulIncrementalDecoder.
1872 # Make test faster by doing smaller seeks
1873 CHUNK_SIZE = 128
1875 def test_seek_and_tell_with_data(data, min_pos=0):
1876 """Tell/seek to various points within a data stream and ensure
1877 that the decoded data returned by read() is consistent."""
1878 f = self.open(support.TESTFN, 'wb')
1879 f.write(data)
1880 f.close()
1881 f = self.open(support.TESTFN, encoding='test_decoder')
1882 f._CHUNK_SIZE = CHUNK_SIZE
1883 decoded = f.read()
1884 f.close()
1886 for i in range(min_pos, len(decoded) + 1): # seek positions
1887 for j in [1, 5, len(decoded) - i]: # read lengths
1888 f = self.open(support.TESTFN, encoding='test_decoder')
1889 self.assertEquals(f.read(i), decoded[:i])
1890 cookie = f.tell()
1891 self.assertEquals(f.read(j), decoded[i:i + j])
1892 f.seek(cookie)
1893 self.assertEquals(f.read(), decoded[i:])
1894 f.close()
1896 # Enable the test decoder.
1897 StatefulIncrementalDecoder.codecEnabled = 1
1899 # Run the tests.
1900 try:
1901 # Try each test case.
1902 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1903 test_seek_and_tell_with_data(input)
1905 # Position each test case so that it crosses a chunk boundary.
1906 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1907 offset = CHUNK_SIZE - len(input)//2
1908 prefix = b'.'*offset
1909 # Don't bother seeking into the prefix (takes too long).
1910 min_pos = offset*2
1911 test_seek_and_tell_with_data(prefix + input, min_pos)
1913 # Ensure our test decoder won't interfere with subsequent tests.
1914 finally:
1915 StatefulIncrementalDecoder.codecEnabled = 0
1917 def test_encoded_writes(self):
1918 data = "1234567890"
1919 tests = ("utf-16",
1920 "utf-16-le",
1921 "utf-16-be",
1922 "utf-32",
1923 "utf-32-le",
1924 "utf-32-be")
1925 for encoding in tests:
1926 buf = self.BytesIO()
1927 f = self.TextIOWrapper(buf, encoding=encoding)
1928 # Check if the BOM is written only once (see issue1753).
1929 f.write(data)
1930 f.write(data)
1931 f.seek(0)
1932 self.assertEquals(f.read(), data * 2)
1933 f.seek(0)
1934 self.assertEquals(f.read(), data * 2)
1935 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1937 def test_unreadable(self):
1938 class UnReadable(self.BytesIO):
1939 def readable(self):
1940 return False
1941 txt = self.TextIOWrapper(UnReadable())
1942 self.assertRaises(IOError, txt.read)
1944 def test_read_one_by_one(self):
1945 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
1946 reads = ""
1947 while True:
1948 c = txt.read(1)
1949 if not c:
1950 break
1951 reads += c
1952 self.assertEquals(reads, "AA\nBB")
1954 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1955 def test_read_by_chunk(self):
1956 # make sure "\r\n" straddles 128 char boundary.
1957 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
1958 reads = ""
1959 while True:
1960 c = txt.read(128)
1961 if not c:
1962 break
1963 reads += c
1964 self.assertEquals(reads, "A"*127+"\nB")
1966 def test_issue1395_1(self):
1967 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
1969 # read one char at a time
1970 reads = ""
1971 while True:
1972 c = txt.read(1)
1973 if not c:
1974 break
1975 reads += c
1976 self.assertEquals(reads, self.normalized)
1978 def test_issue1395_2(self):
1979 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
1980 txt._CHUNK_SIZE = 4
1982 reads = ""
1983 while True:
1984 c = txt.read(4)
1985 if not c:
1986 break
1987 reads += c
1988 self.assertEquals(reads, self.normalized)
1990 def test_issue1395_3(self):
1991 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
1992 txt._CHUNK_SIZE = 4
1994 reads = txt.read(4)
1995 reads += txt.read(4)
1996 reads += txt.readline()
1997 reads += txt.readline()
1998 reads += txt.readline()
1999 self.assertEquals(reads, self.normalized)
2001 def test_issue1395_4(self):
2002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2003 txt._CHUNK_SIZE = 4
2005 reads = txt.read(4)
2006 reads += txt.read()
2007 self.assertEquals(reads, self.normalized)
2009 def test_issue1395_5(self):
2010 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2011 txt._CHUNK_SIZE = 4
2013 reads = txt.read(4)
2014 pos = txt.tell()
2015 txt.seek(0)
2016 txt.seek(pos)
2017 self.assertEquals(txt.read(4), "BBB\n")
2019 def test_issue2282(self):
2020 buffer = self.BytesIO(self.testdata)
2021 txt = self.TextIOWrapper(buffer, encoding="ascii")
2023 self.assertEqual(buffer.seekable(), txt.seekable())
2025 @unittest.skip("Issue #6213 with incremental encoders")
2026 def test_append_bom(self):
2027 # The BOM is not written again when appending to a non-empty file
2028 filename = support.TESTFN
2029 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2030 with self.open(filename, 'w', encoding=charset) as f:
2031 f.write('aaa')
2032 pos = f.tell()
2033 with self.open(filename, 'rb') as f:
2034 self.assertEquals(f.read(), 'aaa'.encode(charset))
2036 with self.open(filename, 'a', encoding=charset) as f:
2037 f.write('xxx')
2038 with self.open(filename, 'rb') as f:
2039 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2041 @unittest.skip("Issue #6213 with incremental encoders")
2042 def test_seek_bom(self):
2043 # Same test, but when seeking manually
2044 filename = support.TESTFN
2045 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2046 with self.open(filename, 'w', encoding=charset) as f:
2047 f.write('aaa')
2048 pos = f.tell()
2049 with self.open(filename, 'r+', encoding=charset) as f:
2050 f.seek(pos)
2051 f.write('zzz')
2052 f.seek(0)
2053 f.write('bbb')
2054 with self.open(filename, 'rb') as f:
2055 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2057 def test_errors_property(self):
2058 with self.open(support.TESTFN, "w") as f:
2059 self.assertEqual(f.errors, "strict")
2060 with self.open(support.TESTFN, "w", errors="replace") as f:
2061 self.assertEqual(f.errors, "replace")
2064 def test_threads_write(self):
2065 # Issue6750: concurrent writes could duplicate data
2066 event = threading.Event()
2067 with self.open(support.TESTFN, "w", buffering=1) as f:
2068 def run(n):
2069 text = "Thread%03d\n" % n
2070 event.wait()
2071 f.write(text)
2072 threads = [threading.Thread(target=lambda n=x: run(n))
2073 for x in range(20)]
2074 for t in threads:
2075 t.start()
2076 time.sleep(0.02)
2077 event.set()
2078 for t in threads:
2079 t.join()
2080 with self.open(support.TESTFN) as f:
2081 content = f.read()
2082 for n in range(20):
2083 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2085 class CTextIOWrapperTest(TextIOWrapperTest):
2087 def test_initialization(self):
2088 r = self.BytesIO(b"\xc3\xa9\n\n")
2089 b = self.BufferedReader(r, 1000)
2090 t = self.TextIOWrapper(b)
2091 self.assertRaises(TypeError, t.__init__, b, newline=42)
2092 self.assertRaises(ValueError, t.read)
2093 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2094 self.assertRaises(ValueError, t.read)
2096 def test_garbage_collection(self):
2097 # C TextIOWrapper objects are collected, and collecting them flushes
2098 # all data to disk.
2099 # The Python version has __del__, so it ends in gc.garbage instead.
2100 rawio = io.FileIO(support.TESTFN, "wb")
2101 b = self.BufferedWriter(rawio)
2102 t = self.TextIOWrapper(b, encoding="ascii")
2103 t.write("456def")
2104 t.x = t
2105 wr = weakref.ref(t)
2106 del t
2107 support.gc_collect()
2108 self.assertTrue(wr() is None, wr)
2109 with self.open(support.TESTFN, "rb") as f:
2110 self.assertEqual(f.read(), b"456def")
2112 class PyTextIOWrapperTest(TextIOWrapperTest):
2113 pass
2116 class IncrementalNewlineDecoderTest(unittest.TestCase):
2118 def check_newline_decoding_utf8(self, decoder):
2119 # UTF-8 specific tests for a newline decoder
2120 def _check_decode(b, s, **kwargs):
2121 # We exercise getstate() / setstate() as well as decode()
2122 state = decoder.getstate()
2123 self.assertEquals(decoder.decode(b, **kwargs), s)
2124 decoder.setstate(state)
2125 self.assertEquals(decoder.decode(b, **kwargs), s)
2127 _check_decode(b'\xe8\xa2\x88', "\u8888")
2129 _check_decode(b'\xe8', "")
2130 _check_decode(b'\xa2', "")
2131 _check_decode(b'\x88', "\u8888")
2133 _check_decode(b'\xe8', "")
2134 _check_decode(b'\xa2', "")
2135 _check_decode(b'\x88', "\u8888")
2137 _check_decode(b'\xe8', "")
2138 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2140 decoder.reset()
2141 _check_decode(b'\n', "\n")
2142 _check_decode(b'\r', "")
2143 _check_decode(b'', "\n", final=True)
2144 _check_decode(b'\r', "\n", final=True)
2146 _check_decode(b'\r', "")
2147 _check_decode(b'a', "\na")
2149 _check_decode(b'\r\r\n', "\n\n")
2150 _check_decode(b'\r', "")
2151 _check_decode(b'\r', "\n")
2152 _check_decode(b'\na', "\na")
2154 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2155 _check_decode(b'\xe8\xa2\x88', "\u8888")
2156 _check_decode(b'\n', "\n")
2157 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2158 _check_decode(b'\n', "\n")
2160 def check_newline_decoding(self, decoder, encoding):
2161 result = []
2162 if encoding is not None:
2163 encoder = codecs.getincrementalencoder(encoding)()
2164 def _decode_bytewise(s):
2165 # Decode one byte at a time
2166 for b in encoder.encode(s):
2167 result.append(decoder.decode(b))
2168 else:
2169 encoder = None
2170 def _decode_bytewise(s):
2171 # Decode one char at a time
2172 for c in s:
2173 result.append(decoder.decode(c))
2174 self.assertEquals(decoder.newlines, None)
2175 _decode_bytewise("abc\n\r")
2176 self.assertEquals(decoder.newlines, '\n')
2177 _decode_bytewise("\nabc")
2178 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2179 _decode_bytewise("abc\r")
2180 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2181 _decode_bytewise("abc")
2182 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2183 _decode_bytewise("abc\r")
2184 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2185 decoder.reset()
2186 input = "abc"
2187 if encoder is not None:
2188 encoder.reset()
2189 input = encoder.encode(input)
2190 self.assertEquals(decoder.decode(input), "abc")
2191 self.assertEquals(decoder.newlines, None)
2193 def test_newline_decoder(self):
2194 encodings = (
2195 # None meaning the IncrementalNewlineDecoder takes unicode input
2196 # rather than bytes input
2197 None, 'utf-8', 'latin-1',
2198 'utf-16', 'utf-16-le', 'utf-16-be',
2199 'utf-32', 'utf-32-le', 'utf-32-be',
2201 for enc in encodings:
2202 decoder = enc and codecs.getincrementaldecoder(enc)()
2203 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2204 self.check_newline_decoding(decoder, enc)
2205 decoder = codecs.getincrementaldecoder("utf-8")()
2206 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2207 self.check_newline_decoding_utf8(decoder)
2209 def test_newline_bytes(self):
2210 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2211 def _check(dec):
2212 self.assertEquals(dec.newlines, None)
2213 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2214 self.assertEquals(dec.newlines, None)
2215 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2216 self.assertEquals(dec.newlines, None)
2217 dec = self.IncrementalNewlineDecoder(None, translate=False)
2218 _check(dec)
2219 dec = self.IncrementalNewlineDecoder(None, translate=True)
2220 _check(dec)
2222 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2223 pass
2225 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2226 pass
2229 # XXX Tests for open()
2231 class MiscIOTest(unittest.TestCase):
2233 def tearDown(self):
2234 support.unlink(support.TESTFN)
2236 def test___all__(self):
2237 for name in self.io.__all__:
2238 obj = getattr(self.io, name, None)
2239 self.assertTrue(obj is not None, name)
2240 if name == "open":
2241 continue
2242 elif "error" in name.lower() or name == "UnsupportedOperation":
2243 self.assertTrue(issubclass(obj, Exception), name)
2244 elif not name.startswith("SEEK_"):
2245 self.assertTrue(issubclass(obj, self.IOBase))
2247 def test_attributes(self):
2248 f = self.open(support.TESTFN, "wb", buffering=0)
2249 self.assertEquals(f.mode, "wb")
2250 f.close()
2252 f = self.open(support.TESTFN, "U")
2253 self.assertEquals(f.name, support.TESTFN)
2254 self.assertEquals(f.buffer.name, support.TESTFN)
2255 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2256 self.assertEquals(f.mode, "U")
2257 self.assertEquals(f.buffer.mode, "rb")
2258 self.assertEquals(f.buffer.raw.mode, "rb")
2259 f.close()
2261 f = self.open(support.TESTFN, "w+")
2262 self.assertEquals(f.mode, "w+")
2263 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2264 self.assertEquals(f.buffer.raw.mode, "rb+")
2266 g = self.open(f.fileno(), "wb", closefd=False)
2267 self.assertEquals(g.mode, "wb")
2268 self.assertEquals(g.raw.mode, "wb")
2269 self.assertEquals(g.name, f.fileno())
2270 self.assertEquals(g.raw.name, f.fileno())
2271 f.close()
2272 g.close()
2274 def test_io_after_close(self):
2275 for kwargs in [
2276 {"mode": "w"},
2277 {"mode": "wb"},
2278 {"mode": "w", "buffering": 1},
2279 {"mode": "w", "buffering": 2},
2280 {"mode": "wb", "buffering": 0},
2281 {"mode": "r"},
2282 {"mode": "rb"},
2283 {"mode": "r", "buffering": 1},
2284 {"mode": "r", "buffering": 2},
2285 {"mode": "rb", "buffering": 0},
2286 {"mode": "w+"},
2287 {"mode": "w+b"},
2288 {"mode": "w+", "buffering": 1},
2289 {"mode": "w+", "buffering": 2},
2290 {"mode": "w+b", "buffering": 0},
2292 f = self.open(support.TESTFN, **kwargs)
2293 f.close()
2294 self.assertRaises(ValueError, f.flush)
2295 self.assertRaises(ValueError, f.fileno)
2296 self.assertRaises(ValueError, f.isatty)
2297 self.assertRaises(ValueError, f.__iter__)
2298 if hasattr(f, "peek"):
2299 self.assertRaises(ValueError, f.peek, 1)
2300 self.assertRaises(ValueError, f.read)
2301 if hasattr(f, "read1"):
2302 self.assertRaises(ValueError, f.read1, 1024)
2303 if hasattr(f, "readinto"):
2304 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2305 self.assertRaises(ValueError, f.readline)
2306 self.assertRaises(ValueError, f.readlines)
2307 self.assertRaises(ValueError, f.seek, 0)
2308 self.assertRaises(ValueError, f.tell)
2309 self.assertRaises(ValueError, f.truncate)
2310 self.assertRaises(ValueError, f.write,
2311 b"" if "b" in kwargs['mode'] else "")
2312 self.assertRaises(ValueError, f.writelines, [])
2313 self.assertRaises(ValueError, next, f)
2315 def test_blockingioerror(self):
2316 # Various BlockingIOError issues
2317 self.assertRaises(TypeError, self.BlockingIOError)
2318 self.assertRaises(TypeError, self.BlockingIOError, 1)
2319 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2320 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2321 b = self.BlockingIOError(1, "")
2322 self.assertEqual(b.characters_written, 0)
2323 class C(unicode):
2324 pass
2325 c = C("")
2326 b = self.BlockingIOError(1, c)
2327 c.b = b
2328 b.c = c
2329 wr = weakref.ref(c)
2330 del c, b
2331 support.gc_collect()
2332 self.assertTrue(wr() is None, wr)
2334 def test_abcs(self):
2335 # Test the visible base classes are ABCs.
2336 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2337 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2338 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2339 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2341 def _check_abc_inheritance(self, abcmodule):
2342 with self.open(support.TESTFN, "wb", buffering=0) as f:
2343 self.assertTrue(isinstance(f, abcmodule.IOBase))
2344 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2345 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2346 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2347 with self.open(support.TESTFN, "wb") as f:
2348 self.assertTrue(isinstance(f, abcmodule.IOBase))
2349 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2350 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2351 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2352 with self.open(support.TESTFN, "w") as f:
2353 self.assertTrue(isinstance(f, abcmodule.IOBase))
2354 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2355 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2356 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2358 def test_abc_inheritance(self):
2359 # Test implementations inherit from their respective ABCs
2360 self._check_abc_inheritance(self)
2362 def test_abc_inheritance_official(self):
2363 # Test implementations inherit from the official ABCs of the
2364 # baseline "io" module.
2365 self._check_abc_inheritance(io)
2367 class CMiscIOTest(MiscIOTest):
2368 io = io
2370 class PyMiscIOTest(MiscIOTest):
2371 io = pyio
2373 def test_main():
2374 tests = (CIOTest, PyIOTest,
2375 CBufferedReaderTest, PyBufferedReaderTest,
2376 CBufferedWriterTest, PyBufferedWriterTest,
2377 CBufferedRWPairTest, PyBufferedRWPairTest,
2378 CBufferedRandomTest, PyBufferedRandomTest,
2379 StatefulIncrementalDecoderTest,
2380 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2381 CTextIOWrapperTest, PyTextIOWrapperTest,
2382 CMiscIOTest, PyMiscIOTest,
2385 # Put the namespaces of the IO module we are testing and some useful mock
2386 # classes in the __dict__ of each test.
2387 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2388 MockNonBlockWriterIO)
2389 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2390 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2391 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2392 globs = globals()
2393 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2394 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2395 # Avoid turning open into a bound method.
2396 py_io_ns["open"] = pyio.OpenWrapper
2397 for test in tests:
2398 if test.__name__.startswith("C"):
2399 for name, obj in c_io_ns.items():
2400 setattr(test, name, obj)
2401 elif test.__name__.startswith("Py"):
2402 for name, obj in py_io_ns.items():
2403 setattr(test, name, obj)
2405 support.run_unittest(*tests)
2407 if __name__ == "__main__":
2408 test_main()