Change a variable type to avoid signed overflow; replace repeated '19999' constant...
[python.git] / Lib / test / test_io.py
bloba90dd504a388284e29a51245bdf7985c0ff77f00
1 """Unit tests for the io module."""
3 # Tests of io are scattered over the test suite:
4 # * test_bufio - tests file buffering
5 # * test_memoryio - tests BytesIO and StringIO
6 # * test_fileio - tests FileIO
7 # * test_file - tests the file interface
8 # * test_io - tests everything else in the io module
9 # * test_univnewlines - tests universal newline support
10 # * test_largefile - tests operations on a file greater than 2**32 bytes
11 # (only enabled with -ulargefile)
13 ################################################################################
14 # ATTENTION TEST WRITERS!!!
15 ################################################################################
16 # When writing tests for io, it's important to test both the C and Python
17 # implementations. This is usually done by writing a base test that refers to
18 # the type it is testing as a attribute. Then it provides custom subclasses to
19 # test both implementations. This file has lots of examples.
20 ################################################################################
22 from __future__ import print_function
23 from __future__ import unicode_literals
25 import os
26 import sys
27 import time
28 import array
29 import threading
30 import random
31 import unittest
32 import warnings
33 import weakref
34 import gc
35 import abc
36 from itertools import chain, cycle, count
37 from collections import deque
38 from test import test_support as support
40 import codecs
41 import io # C implementation of io
42 import _pyio as pyio # Python implementation of io
44 __metaclass__ = type
45 bytes = support.py3k_bytes
47 def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
53 class MockRawIO:
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
58 self._reads = 0
60 def read(self, n=None):
61 self._reads += 1
62 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
67 def write(self, b):
68 self._write_stack.append(bytes(b))
69 return len(b)
71 def writable(self):
72 return True
74 def fileno(self):
75 return 42
77 def readable(self):
78 return True
80 def seekable(self):
81 return True
83 def seek(self, pos, whence):
84 return 0 # wrong but we gotta return something
86 def tell(self):
87 return 0 # same comment as above
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
109 def truncate(self, pos=None):
110 return pos
112 class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
115 class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
119 class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
126 def seek(self, pos, whence):
127 return -123
129 def tell(self):
130 return -456
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
136 class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
139 class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
143 class CloseFailureIO(MockRawIO):
144 closed = 0
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
151 class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
154 class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
158 class MockFileIO:
160 def __init__(self, data):
161 self.read_history = []
162 super(MockFileIO, self).__init__(data)
164 def read(self, n=None):
165 res = super(MockFileIO, self).read(n)
166 self.read_history.append(None if res is None else len(res))
167 return res
169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
174 class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
177 class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
181 class MockNonBlockWriterIO:
183 def __init__(self):
184 self._write_stack = []
185 self._blocker_char = None
187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
196 def readable(self):
197 return True
199 def seekable(self):
200 return True
202 def writable(self):
203 return True
205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
220 class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
223 class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
227 class IOTest(unittest.TestCase):
229 def setUp(self):
230 support.unlink(support.TESTFN)
232 def tearDown(self):
233 support.unlink(support.TESTFN)
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
239 self.assertEqual(f.tell(), 6)
240 self.assertEqual(f.seek(-1, 1), 5)
241 self.assertEqual(f.tell(), 5)
242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"h"), 1)
245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
247 self.assertEqual(f.truncate(12), 12)
248 self.assertEqual(f.tell(), 12)
249 self.assertRaises(TypeError, f.seek, 0.0)
251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
254 data = bytearray(data)
255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
267 self.assertEqual(f.readinto(bytearray()), 0)
268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
271 self.assertRaises(TypeError, f.seek, 0.0)
272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
279 LARGE = 2**31
281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
286 self.assertEqual(f.write(b"xxx"), 3)
287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
289 self.assertEqual(f.truncate(), self.LARGE + 2)
290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
293 self.assertEqual(f.tell(), self.LARGE + 1)
294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
296 self.assertEqual(f.read(2), b"x")
298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
301 with self.open(support.TESTFN, mode) as fp:
302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
304 with self.open(support.TESTFN, "rb") as fp:
305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
307 with self.open(support.TESTFN, "r") as fp:
308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
311 def test_raw_file_io(self):
312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
323 def test_buffered_file_io(self):
324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
335 def test_readline(self):
336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
344 self.assertEqual(f.readline(None), b"another line")
345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
349 def test_raw_bytes_io(self):
350 f = self.BytesIO()
351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
354 f = self.BytesIO(data)
355 self.read_ops(f, True)
357 def test_large_file_ops(self):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
362 if not support.is_resource_enabled("largefile"):
363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
378 with self.open(support.TESTFN, "wb", bufsize) as f:
379 f.write(b"xxx")
380 self.assertEqual(f.closed, True)
381 f = None
382 try:
383 with self.open(support.TESTFN, "wb", bufsize) as f:
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
390 # issue 5008
391 def test_append_mode_tell(self):
392 with self.open(support.TESTFN, "wb") as f:
393 f.write(b"xxx")
394 with self.open(support.TESTFN, "ab", buffering=0) as f:
395 self.assertEqual(f.tell(), 3)
396 with self.open(support.TESTFN, "ab") as f:
397 self.assertEqual(f.tell(), 3)
398 with self.open(support.TESTFN, "a") as f:
399 self.assertTrue(f.tell() > 0)
401 def test_destructor(self):
402 record = []
403 class MyFileIO(self.FileIO):
404 def __del__(self):
405 record.append(1)
406 try:
407 f = super(MyFileIO, self).__del__
408 except AttributeError:
409 pass
410 else:
412 def close(self):
413 record.append(2)
414 super(MyFileIO, self).close()
415 def flush(self):
416 record.append(3)
417 super(MyFileIO, self).flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
420 del f
421 support.gc_collect()
422 self.assertEqual(record, [1, 2, 3])
423 with self.open(support.TESTFN, "rb") as f:
424 self.assertEqual(f.read(), b"xxx")
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super(MyIO, self).__del__
441 except AttributeError:
442 pass
443 else:
445 def close(self):
446 record.append(self.on_close)
447 super(MyIO, self).close()
448 def flush(self):
449 record.append(self.on_flush)
450 super(MyIO, self).flush()
451 f = MyIO()
452 del f
453 support.gc_collect()
454 self.assertEqual(record, [1, 2, 3])
456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
468 def test_close_flushes(self):
469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
474 def test_array_writes(self):
475 a = array.array(b'i', range(10))
476 n = len(a.tostring())
477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
482 def test_closefd(self):
483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
484 closefd=False)
486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
488 f.write("egg\n")
489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
500 def test_closefd_attr(self):
501 with self.open(support.TESTFN, "wb") as f:
502 f.write(b"egg\n")
503 with self.open(support.TESTFN, "r") as f:
504 self.assertEqual(f.buffer.raw.closefd, True)
505 file = self.open(f.fileno(), "r", closefd=False)
506 self.assertEqual(file.buffer.raw.closefd, False)
508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
516 support.gc_collect()
517 self.assertTrue(wr() is None, wr)
518 with self.open(support.TESTFN, "rb") as f:
519 self.assertEqual(f.read(), b"abcxxx")
521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
525 self.skipTest("{0} does not exist".format(zero))
526 if sys.maxsize > 0x7FFFFFFF:
527 self.skipTest("test can only run in a 32-bit address space")
528 if support.real_max_memuse < support._2G:
529 self.skipTest("test requires at least 2GB of memory")
530 with self.open(zero, "rb", buffering=0) as f:
531 self.assertRaises(OverflowError, f.read)
532 with self.open(zero, "rb") as f:
533 self.assertRaises(OverflowError, f.read)
534 with self.open(zero, "r") as f:
535 self.assertRaises(OverflowError, f.read)
537 class CIOTest(IOTest):
538 pass
540 class PyIOTest(IOTest):
541 test_array_writes = unittest.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest.test_array_writes)
546 class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
559 self.assertEquals(42, bufio.fileno())
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super(MyBufferedIO, self).__del__
581 except AttributeError:
582 pass
583 else:
585 def close(self):
586 record.append(2)
587 super(MyBufferedIO, self).close()
588 def flush(self):
589 record.append(3)
590 super(MyBufferedIO, self).flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
595 support.gc_collect()
596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
625 self.assertTrue(s.startswith("Exception IOError: "), s)
626 self.assertTrue(s.endswith(" ignored"), s)
628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
639 class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
656 def test_read(self):
657 for arg in (None, 7):
658 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
659 bufio = self.tp(rawio)
660 self.assertEquals(b"abcdefg", bufio.read(arg))
661 # Invalid args
662 self.assertRaises(ValueError, bufio.read, -2)
664 def test_read1(self):
665 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
666 bufio = self.tp(rawio)
667 self.assertEquals(b"a", bufio.read(1))
668 self.assertEquals(b"b", bufio.read1(1))
669 self.assertEquals(rawio._reads, 1)
670 self.assertEquals(b"c", bufio.read1(100))
671 self.assertEquals(rawio._reads, 1)
672 self.assertEquals(b"d", bufio.read1(100))
673 self.assertEquals(rawio._reads, 2)
674 self.assertEquals(b"efg", bufio.read1(100))
675 self.assertEquals(rawio._reads, 3)
676 self.assertEquals(b"", bufio.read1(100))
677 self.assertEquals(rawio._reads, 4)
678 # Invalid args
679 self.assertRaises(ValueError, bufio.read1, -1)
681 def test_readinto(self):
682 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
683 bufio = self.tp(rawio)
684 b = bytearray(2)
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"ab")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"cd")
689 self.assertEquals(bufio.readinto(b), 2)
690 self.assertEquals(b, b"ef")
691 self.assertEquals(bufio.readinto(b), 1)
692 self.assertEquals(b, b"gf")
693 self.assertEquals(bufio.readinto(b), 0)
694 self.assertEquals(b, b"gf")
696 def test_readlines(self):
697 def bufio():
698 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
699 return self.tp(rawio)
700 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
701 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
702 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
704 def test_buffering(self):
705 data = b"abcdefghi"
706 dlen = len(data)
708 tests = [
709 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
710 [ 100, [ 3, 3, 3], [ dlen ] ],
711 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
714 for bufsize, buf_read_sizes, raw_read_sizes in tests:
715 rawio = self.MockFileIO(data)
716 bufio = self.tp(rawio, buffer_size=bufsize)
717 pos = 0
718 for nbytes in buf_read_sizes:
719 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
720 pos += nbytes
721 # this is mildly implementation-dependent
722 self.assertEquals(rawio.read_history, raw_read_sizes)
724 def test_read_non_blocking(self):
725 # Inject some None's in there to simulate EWOULDBLOCK
726 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
727 bufio = self.tp(rawio)
729 self.assertEquals(b"abcd", bufio.read(6))
730 self.assertEquals(b"e", bufio.read(1))
731 self.assertEquals(b"fg", bufio.read())
732 self.assertEquals(b"", bufio.peek(1))
733 self.assertTrue(None is bufio.read())
734 self.assertEquals(b"", bufio.read())
736 def test_read_past_eof(self):
737 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
738 bufio = self.tp(rawio)
740 self.assertEquals(b"abcdefg", bufio.read(9000))
742 def test_read_all(self):
743 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
744 bufio = self.tp(rawio)
746 self.assertEquals(b"abcdefg", bufio.read())
748 def test_threads(self):
749 try:
750 # Write out many bytes with exactly the same number of 0's,
751 # 1's... 255's. This will help us check that concurrent reading
752 # doesn't duplicate or forget contents.
753 N = 1000
754 l = list(range(256)) * N
755 random.shuffle(l)
756 s = bytes(bytearray(l))
757 with self.open(support.TESTFN, "wb") as f:
758 f.write(s)
759 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
760 bufio = self.tp(raw, 8)
761 errors = []
762 results = []
763 def f():
764 try:
765 # Intra-buffer read then buffer-flushing read
766 for n in cycle([1, 19]):
767 s = bufio.read(n)
768 if not s:
769 break
770 # list.append() is atomic
771 results.append(s)
772 except Exception as e:
773 errors.append(e)
774 raise
775 threads = [threading.Thread(target=f) for x in range(20)]
776 for t in threads:
777 t.start()
778 time.sleep(0.02) # yield
779 for t in threads:
780 t.join()
781 self.assertFalse(errors,
782 "the following exceptions were caught: %r" % errors)
783 s = b''.join(results)
784 for i in range(256):
785 c = bytes(bytearray([i]))
786 self.assertEqual(s.count(c), N)
787 finally:
788 support.unlink(support.TESTFN)
790 def test_misbehaved_io(self):
791 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
792 bufio = self.tp(rawio)
793 self.assertRaises(IOError, bufio.seek, 0)
794 self.assertRaises(IOError, bufio.tell)
796 class CBufferedReaderTest(BufferedReaderTest):
797 tp = io.BufferedReader
799 def test_constructor(self):
800 BufferedReaderTest.test_constructor(self)
801 # The allocation can succeed on 32-bit builds, e.g. with more
802 # than 2GB RAM and a 64-bit kernel.
803 if sys.maxsize > 0x7FFFFFFF:
804 rawio = self.MockRawIO()
805 bufio = self.tp(rawio)
806 self.assertRaises((OverflowError, MemoryError, ValueError),
807 bufio.__init__, rawio, sys.maxsize)
809 def test_initialization(self):
810 rawio = self.MockRawIO([b"abc"])
811 bufio = self.tp(rawio)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
813 self.assertRaises(ValueError, bufio.read)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
815 self.assertRaises(ValueError, bufio.read)
816 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
817 self.assertRaises(ValueError, bufio.read)
819 def test_misbehaved_io_read(self):
820 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
821 bufio = self.tp(rawio)
822 # _pyio.BufferedReader seems to implement reading different, so that
823 # checking this is not so easy.
824 self.assertRaises(IOError, bufio.read, 10)
826 def test_garbage_collection(self):
827 # C BufferedReader objects are collected.
828 # The Python version has __del__, so it ends into gc.garbage instead
829 rawio = self.FileIO(support.TESTFN, "w+b")
830 f = self.tp(rawio)
831 f.f = f
832 wr = weakref.ref(f)
833 del f
834 support.gc_collect()
835 self.assertTrue(wr() is None, wr)
837 class PyBufferedReaderTest(BufferedReaderTest):
838 tp = pyio.BufferedReader
841 class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
842 write_mode = "wb"
844 def test_constructor(self):
845 rawio = self.MockRawIO()
846 bufio = self.tp(rawio)
847 bufio.__init__(rawio)
848 bufio.__init__(rawio, buffer_size=1024)
849 bufio.__init__(rawio, buffer_size=16)
850 self.assertEquals(3, bufio.write(b"abc"))
851 bufio.flush()
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
855 bufio.__init__(rawio)
856 self.assertEquals(3, bufio.write(b"ghi"))
857 bufio.flush()
858 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
860 def test_detach_flush(self):
861 raw = self.MockRawIO()
862 buf = self.tp(raw)
863 buf.write(b"howdy!")
864 self.assertFalse(raw._write_stack)
865 buf.detach()
866 self.assertEqual(raw._write_stack, [b"howdy!"])
868 def test_write(self):
869 # Write to the buffered IO but don't overflow the buffer.
870 writer = self.MockRawIO()
871 bufio = self.tp(writer, 8)
872 bufio.write(b"abc")
873 self.assertFalse(writer._write_stack)
875 def test_write_overflow(self):
876 writer = self.MockRawIO()
877 bufio = self.tp(writer, 8)
878 contents = b"abcdefghijklmnop"
879 for n in range(0, len(contents), 3):
880 bufio.write(contents[n:n+3])
881 flushed = b"".join(writer._write_stack)
882 # At least (total - 8) bytes were implicitly flushed, perhaps more
883 # depending on the implementation.
884 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
886 def check_writes(self, intermediate_func):
887 # Lots of writes, test the flushed output is as expected.
888 contents = bytes(range(256)) * 1000
889 n = 0
890 writer = self.MockRawIO()
891 bufio = self.tp(writer, 13)
892 # Generator of write sizes: repeat each N 15 times then proceed to N+1
893 def gen_sizes():
894 for size in count(1):
895 for i in range(15):
896 yield size
897 sizes = gen_sizes()
898 while n < len(contents):
899 size = min(next(sizes), len(contents) - n)
900 self.assertEquals(bufio.write(contents[n:n+size]), size)
901 intermediate_func(bufio)
902 n += size
903 bufio.flush()
904 self.assertEquals(contents,
905 b"".join(writer._write_stack))
907 def test_writes(self):
908 self.check_writes(lambda bufio: None)
910 def test_writes_and_flushes(self):
911 self.check_writes(lambda bufio: bufio.flush())
913 def test_writes_and_seeks(self):
914 def _seekabs(bufio):
915 pos = bufio.tell()
916 bufio.seek(pos + 1, 0)
917 bufio.seek(pos - 1, 0)
918 bufio.seek(pos, 0)
919 self.check_writes(_seekabs)
920 def _seekrel(bufio):
921 pos = bufio.seek(0, 1)
922 bufio.seek(+1, 1)
923 bufio.seek(-1, 1)
924 bufio.seek(pos, 0)
925 self.check_writes(_seekrel)
927 def test_writes_and_truncates(self):
928 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
930 def test_write_non_blocking(self):
931 raw = self.MockNonBlockWriterIO()
932 bufio = self.tp(raw, 8)
934 self.assertEquals(bufio.write(b"abcd"), 4)
935 self.assertEquals(bufio.write(b"efghi"), 5)
936 # 1 byte will be written, the rest will be buffered
937 raw.block_on(b"k")
938 self.assertEquals(bufio.write(b"jklmn"), 5)
940 # 8 bytes will be written, 8 will be buffered and the rest will be lost
941 raw.block_on(b"0")
942 try:
943 bufio.write(b"opqrwxyz0123456789")
944 except self.BlockingIOError as e:
945 written = e.characters_written
946 else:
947 self.fail("BlockingIOError should have been raised")
948 self.assertEquals(written, 16)
949 self.assertEquals(raw.pop_written(),
950 b"abcdefghijklmnopqrwxyz")
952 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
953 s = raw.pop_written()
954 # Previously buffered bytes were flushed
955 self.assertTrue(s.startswith(b"01234567A"), s)
957 def test_write_and_rewind(self):
958 raw = io.BytesIO()
959 bufio = self.tp(raw, 4)
960 self.assertEqual(bufio.write(b"abcdef"), 6)
961 self.assertEqual(bufio.tell(), 6)
962 bufio.seek(0, 0)
963 self.assertEqual(bufio.write(b"XY"), 2)
964 bufio.seek(6, 0)
965 self.assertEqual(raw.getvalue(), b"XYcdef")
966 self.assertEqual(bufio.write(b"123456"), 6)
967 bufio.flush()
968 self.assertEqual(raw.getvalue(), b"XYcdef123456")
970 def test_flush(self):
971 writer = self.MockRawIO()
972 bufio = self.tp(writer, 8)
973 bufio.write(b"abc")
974 bufio.flush()
975 self.assertEquals(b"abc", writer._write_stack[0])
977 def test_destructor(self):
978 writer = self.MockRawIO()
979 bufio = self.tp(writer, 8)
980 bufio.write(b"abc")
981 del bufio
982 support.gc_collect()
983 self.assertEquals(b"abc", writer._write_stack[0])
985 def test_truncate(self):
986 # Truncate implicitly flushes the buffer.
987 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
988 bufio = self.tp(raw, 8)
989 bufio.write(b"abcdef")
990 self.assertEqual(bufio.truncate(3), 3)
991 self.assertEqual(bufio.tell(), 3)
992 with self.open(support.TESTFN, "rb", buffering=0) as f:
993 self.assertEqual(f.read(), b"abc")
995 def test_threads(self):
996 try:
997 # Write out many bytes from many threads and test they were
998 # all flushed.
999 N = 1000
1000 contents = bytes(range(256)) * N
1001 sizes = cycle([1, 19])
1002 n = 0
1003 queue = deque()
1004 while n < len(contents):
1005 size = next(sizes)
1006 queue.append(contents[n:n+size])
1007 n += size
1008 del contents
1009 # We use a real file object because it allows us to
1010 # exercise situations where the GIL is released before
1011 # writing the buffer to the raw streams. This is in addition
1012 # to concurrency issues due to switching threads in the middle
1013 # of Python code.
1014 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1015 bufio = self.tp(raw, 8)
1016 errors = []
1017 def f():
1018 try:
1019 while True:
1020 try:
1021 s = queue.popleft()
1022 except IndexError:
1023 return
1024 bufio.write(s)
1025 except Exception as e:
1026 errors.append(e)
1027 raise
1028 threads = [threading.Thread(target=f) for x in range(20)]
1029 for t in threads:
1030 t.start()
1031 time.sleep(0.02) # yield
1032 for t in threads:
1033 t.join()
1034 self.assertFalse(errors,
1035 "the following exceptions were caught: %r" % errors)
1036 bufio.close()
1037 with self.open(support.TESTFN, "rb") as f:
1038 s = f.read()
1039 for i in range(256):
1040 self.assertEquals(s.count(bytes([i])), N)
1041 finally:
1042 support.unlink(support.TESTFN)
1044 def test_misbehaved_io(self):
1045 rawio = self.MisbehavedRawIO()
1046 bufio = self.tp(rawio, 5)
1047 self.assertRaises(IOError, bufio.seek, 0)
1048 self.assertRaises(IOError, bufio.tell)
1049 self.assertRaises(IOError, bufio.write, b"abcdef")
1051 def test_max_buffer_size_deprecation(self):
1052 with support.check_warnings() as w:
1053 warnings.simplefilter("always", DeprecationWarning)
1054 self.tp(self.MockRawIO(), 8, 12)
1055 self.assertEqual(len(w.warnings), 1)
1056 warning = w.warnings[0]
1057 self.assertTrue(warning.category is DeprecationWarning)
1058 self.assertEqual(str(warning.message),
1059 "max_buffer_size is deprecated")
1062 class CBufferedWriterTest(BufferedWriterTest):
1063 tp = io.BufferedWriter
1065 def test_constructor(self):
1066 BufferedWriterTest.test_constructor(self)
1067 # The allocation can succeed on 32-bit builds, e.g. with more
1068 # than 2GB RAM and a 64-bit kernel.
1069 if sys.maxsize > 0x7FFFFFFF:
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 self.assertRaises((OverflowError, MemoryError, ValueError),
1073 bufio.__init__, rawio, sys.maxsize)
1075 def test_initialization(self):
1076 rawio = self.MockRawIO()
1077 bufio = self.tp(rawio)
1078 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1079 self.assertRaises(ValueError, bufio.write, b"def")
1080 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1081 self.assertRaises(ValueError, bufio.write, b"def")
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1083 self.assertRaises(ValueError, bufio.write, b"def")
1085 def test_garbage_collection(self):
1086 # C BufferedWriter objects are collected, and collecting them flushes
1087 # all data to disk.
1088 # The Python version has __del__, so it ends into gc.garbage instead
1089 rawio = self.FileIO(support.TESTFN, "w+b")
1090 f = self.tp(rawio)
1091 f.write(b"123xxx")
1092 f.x = f
1093 wr = weakref.ref(f)
1094 del f
1095 support.gc_collect()
1096 self.assertTrue(wr() is None, wr)
1097 with self.open(support.TESTFN, "rb") as f:
1098 self.assertEqual(f.read(), b"123xxx")
1101 class PyBufferedWriterTest(BufferedWriterTest):
1102 tp = pyio.BufferedWriter
1104 class BufferedRWPairTest(unittest.TestCase):
1106 def test_constructor(self):
1107 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1108 self.assertFalse(pair.closed)
1110 def test_detach(self):
1111 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1112 self.assertRaises(self.UnsupportedOperation, pair.detach)
1114 def test_constructor_max_buffer_size_deprecation(self):
1115 with support.check_warnings() as w:
1116 warnings.simplefilter("always", DeprecationWarning)
1117 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1118 self.assertEqual(len(w.warnings), 1)
1119 warning = w.warnings[0]
1120 self.assertTrue(warning.category is DeprecationWarning)
1121 self.assertEqual(str(warning.message),
1122 "max_buffer_size is deprecated")
1124 def test_constructor_with_not_readable(self):
1125 class NotReadable(MockRawIO):
1126 def readable(self):
1127 return False
1129 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1131 def test_constructor_with_not_writeable(self):
1132 class NotWriteable(MockRawIO):
1133 def writable(self):
1134 return False
1136 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1138 def test_read(self):
1139 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1141 self.assertEqual(pair.read(3), b"abc")
1142 self.assertEqual(pair.read(1), b"d")
1143 self.assertEqual(pair.read(), b"ef")
1144 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1145 self.assertEqual(pair.read(None), b"abc")
1147 def test_readlines(self):
1148 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1149 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1150 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1151 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1153 def test_read1(self):
1154 # .read1() is delegated to the underlying reader object, so this test
1155 # can be shallow.
1156 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1158 self.assertEqual(pair.read1(3), b"abc")
1160 def test_readinto(self):
1161 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1163 data = bytearray(5)
1164 self.assertEqual(pair.readinto(data), 5)
1165 self.assertEqual(data, b"abcde")
1167 def test_write(self):
1168 w = self.MockRawIO()
1169 pair = self.tp(self.MockRawIO(), w)
1171 pair.write(b"abc")
1172 pair.flush()
1173 pair.write(b"def")
1174 pair.flush()
1175 self.assertEqual(w._write_stack, [b"abc", b"def"])
1177 def test_peek(self):
1178 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1180 self.assertTrue(pair.peek(3).startswith(b"abc"))
1181 self.assertEqual(pair.read(3), b"abc")
1183 def test_readable(self):
1184 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1185 self.assertTrue(pair.readable())
1187 def test_writeable(self):
1188 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1189 self.assertTrue(pair.writable())
1191 def test_seekable(self):
1192 # BufferedRWPairs are never seekable, even if their readers and writers
1193 # are.
1194 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1195 self.assertFalse(pair.seekable())
1197 # .flush() is delegated to the underlying writer object and has been
1198 # tested in the test_write method.
1200 def test_close_and_closed(self):
1201 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1202 self.assertFalse(pair.closed)
1203 pair.close()
1204 self.assertTrue(pair.closed)
1206 def test_isatty(self):
1207 class SelectableIsAtty(MockRawIO):
1208 def __init__(self, isatty):
1209 MockRawIO.__init__(self)
1210 self._isatty = isatty
1212 def isatty(self):
1213 return self._isatty
1215 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1216 self.assertFalse(pair.isatty())
1218 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1219 self.assertTrue(pair.isatty())
1221 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1222 self.assertTrue(pair.isatty())
1224 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1225 self.assertTrue(pair.isatty())
1227 class CBufferedRWPairTest(BufferedRWPairTest):
1228 tp = io.BufferedRWPair
1230 class PyBufferedRWPairTest(BufferedRWPairTest):
1231 tp = pyio.BufferedRWPair
1234 class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1235 read_mode = "rb+"
1236 write_mode = "wb+"
1238 def test_constructor(self):
1239 BufferedReaderTest.test_constructor(self)
1240 BufferedWriterTest.test_constructor(self)
1242 def test_read_and_write(self):
1243 raw = self.MockRawIO((b"asdf", b"ghjk"))
1244 rw = self.tp(raw, 8)
1246 self.assertEqual(b"as", rw.read(2))
1247 rw.write(b"ddd")
1248 rw.write(b"eee")
1249 self.assertFalse(raw._write_stack) # Buffer writes
1250 self.assertEqual(b"ghjk", rw.read())
1251 self.assertEquals(b"dddeee", raw._write_stack[0])
1253 def test_seek_and_tell(self):
1254 raw = self.BytesIO(b"asdfghjkl")
1255 rw = self.tp(raw)
1257 self.assertEquals(b"as", rw.read(2))
1258 self.assertEquals(2, rw.tell())
1259 rw.seek(0, 0)
1260 self.assertEquals(b"asdf", rw.read(4))
1262 rw.write(b"asdf")
1263 rw.seek(0, 0)
1264 self.assertEquals(b"asdfasdfl", rw.read())
1265 self.assertEquals(9, rw.tell())
1266 rw.seek(-4, 2)
1267 self.assertEquals(5, rw.tell())
1268 rw.seek(2, 1)
1269 self.assertEquals(7, rw.tell())
1270 self.assertEquals(b"fl", rw.read(11))
1271 self.assertRaises(TypeError, rw.seek, 0.0)
1273 def check_flush_and_read(self, read_func):
1274 raw = self.BytesIO(b"abcdefghi")
1275 bufio = self.tp(raw)
1277 self.assertEquals(b"ab", read_func(bufio, 2))
1278 bufio.write(b"12")
1279 self.assertEquals(b"ef", read_func(bufio, 2))
1280 self.assertEquals(6, bufio.tell())
1281 bufio.flush()
1282 self.assertEquals(6, bufio.tell())
1283 self.assertEquals(b"ghi", read_func(bufio))
1284 raw.seek(0, 0)
1285 raw.write(b"XYZ")
1286 # flush() resets the read buffer
1287 bufio.flush()
1288 bufio.seek(0, 0)
1289 self.assertEquals(b"XYZ", read_func(bufio, 3))
1291 def test_flush_and_read(self):
1292 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1294 def test_flush_and_readinto(self):
1295 def _readinto(bufio, n=-1):
1296 b = bytearray(n if n >= 0 else 9999)
1297 n = bufio.readinto(b)
1298 return bytes(b[:n])
1299 self.check_flush_and_read(_readinto)
1301 def test_flush_and_peek(self):
1302 def _peek(bufio, n=-1):
1303 # This relies on the fact that the buffer can contain the whole
1304 # raw stream, otherwise peek() can return less.
1305 b = bufio.peek(n)
1306 if n != -1:
1307 b = b[:n]
1308 bufio.seek(len(b), 1)
1309 return b
1310 self.check_flush_and_read(_peek)
1312 def test_flush_and_write(self):
1313 raw = self.BytesIO(b"abcdefghi")
1314 bufio = self.tp(raw)
1316 bufio.write(b"123")
1317 bufio.flush()
1318 bufio.write(b"45")
1319 bufio.flush()
1320 bufio.seek(0, 0)
1321 self.assertEquals(b"12345fghi", raw.getvalue())
1322 self.assertEquals(b"12345fghi", bufio.read())
1324 def test_threads(self):
1325 BufferedReaderTest.test_threads(self)
1326 BufferedWriterTest.test_threads(self)
1328 def test_writes_and_peek(self):
1329 def _peek(bufio):
1330 bufio.peek(1)
1331 self.check_writes(_peek)
1332 def _peek(bufio):
1333 pos = bufio.tell()
1334 bufio.seek(-1, 1)
1335 bufio.peek(1)
1336 bufio.seek(pos, 0)
1337 self.check_writes(_peek)
1339 def test_writes_and_reads(self):
1340 def _read(bufio):
1341 bufio.seek(-1, 1)
1342 bufio.read(1)
1343 self.check_writes(_read)
1345 def test_writes_and_read1s(self):
1346 def _read1(bufio):
1347 bufio.seek(-1, 1)
1348 bufio.read1(1)
1349 self.check_writes(_read1)
1351 def test_writes_and_readintos(self):
1352 def _read(bufio):
1353 bufio.seek(-1, 1)
1354 bufio.readinto(bytearray(1))
1355 self.check_writes(_read)
1357 def test_write_after_readahead(self):
1358 # Issue #6629: writing after the buffer was filled by readahead should
1359 # first rewind the raw stream.
1360 for overwrite_size in [1, 5]:
1361 raw = self.BytesIO(b"A" * 10)
1362 bufio = self.tp(raw, 4)
1363 # Trigger readahead
1364 self.assertEqual(bufio.read(1), b"A")
1365 self.assertEqual(bufio.tell(), 1)
1366 # Overwriting should rewind the raw stream if it needs so
1367 bufio.write(b"B" * overwrite_size)
1368 self.assertEqual(bufio.tell(), overwrite_size + 1)
1369 # If the write size was smaller than the buffer size, flush() and
1370 # check that rewind happens.
1371 bufio.flush()
1372 self.assertEqual(bufio.tell(), overwrite_size + 1)
1373 s = raw.getvalue()
1374 self.assertEqual(s,
1375 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1377 def test_misbehaved_io(self):
1378 BufferedReaderTest.test_misbehaved_io(self)
1379 BufferedWriterTest.test_misbehaved_io(self)
1381 class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1382 tp = io.BufferedRandom
1384 def test_constructor(self):
1385 BufferedRandomTest.test_constructor(self)
1386 # The allocation can succeed on 32-bit builds, e.g. with more
1387 # than 2GB RAM and a 64-bit kernel.
1388 if sys.maxsize > 0x7FFFFFFF:
1389 rawio = self.MockRawIO()
1390 bufio = self.tp(rawio)
1391 self.assertRaises((OverflowError, MemoryError, ValueError),
1392 bufio.__init__, rawio, sys.maxsize)
1394 def test_garbage_collection(self):
1395 CBufferedReaderTest.test_garbage_collection(self)
1396 CBufferedWriterTest.test_garbage_collection(self)
1398 class PyBufferedRandomTest(BufferedRandomTest):
1399 tp = pyio.BufferedRandom
1402 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1403 # properties:
1404 # - A single output character can correspond to many bytes of input.
1405 # - The number of input bytes to complete the character can be
1406 # undetermined until the last input byte is received.
1407 # - The number of input bytes can vary depending on previous input.
1408 # - A single input byte can correspond to many characters of output.
1409 # - The number of output characters can be undetermined until the
1410 # last input byte is received.
1411 # - The number of output characters can vary depending on previous input.
1413 class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1415 For testing seek/tell behavior with a stateful, buffering decoder.
1417 Input is a sequence of words. Words may be fixed-length (length set
1418 by input) or variable-length (period-terminated). In variable-length
1419 mode, extra periods are ignored. Possible words are:
1420 - 'i' followed by a number sets the input length, I (maximum 99).
1421 When I is set to 0, words are space-terminated.
1422 - 'o' followed by a number sets the output length, O (maximum 99).
1423 - Any other word is converted into a word followed by a period on
1424 the output. The output word consists of the input word truncated
1425 or padded out with hyphens to make its length equal to O. If O
1426 is 0, the word is output verbatim without truncating or padding.
1427 I and O are initially set to 1. When I changes, any buffered input is
1428 re-scanned according to the new I. EOF also terminates the last word.
1431 def __init__(self, errors='strict'):
1432 codecs.IncrementalDecoder.__init__(self, errors)
1433 self.reset()
1435 def __repr__(self):
1436 return '<SID %x>' % id(self)
1438 def reset(self):
1439 self.i = 1
1440 self.o = 1
1441 self.buffer = bytearray()
1443 def getstate(self):
1444 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1445 return bytes(self.buffer), i*100 + o
1447 def setstate(self, state):
1448 buffer, io = state
1449 self.buffer = bytearray(buffer)
1450 i, o = divmod(io, 100)
1451 self.i, self.o = i ^ 1, o ^ 1
1453 def decode(self, input, final=False):
1454 output = ''
1455 for b in input:
1456 if self.i == 0: # variable-length, terminated with period
1457 if b == '.':
1458 if self.buffer:
1459 output += self.process_word()
1460 else:
1461 self.buffer.append(b)
1462 else: # fixed-length, terminate after self.i bytes
1463 self.buffer.append(b)
1464 if len(self.buffer) == self.i:
1465 output += self.process_word()
1466 if final and self.buffer: # EOF terminates the last word
1467 output += self.process_word()
1468 return output
1470 def process_word(self):
1471 output = ''
1472 if self.buffer[0] == ord('i'):
1473 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1474 elif self.buffer[0] == ord('o'):
1475 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1476 else:
1477 output = self.buffer.decode('ascii')
1478 if len(output) < self.o:
1479 output += '-'*self.o # pad out with hyphens
1480 if self.o:
1481 output = output[:self.o] # truncate to output length
1482 output += '.'
1483 self.buffer = bytearray()
1484 return output
1486 codecEnabled = False
1488 @classmethod
1489 def lookupTestDecoder(cls, name):
1490 if cls.codecEnabled and name == 'test_decoder':
1491 latin1 = codecs.lookup('latin-1')
1492 return codecs.CodecInfo(
1493 name='test_decoder', encode=latin1.encode, decode=None,
1494 incrementalencoder=None,
1495 streamreader=None, streamwriter=None,
1496 incrementaldecoder=cls)
1498 # Register the previous decoder for testing.
1499 # Disabled by default, tests will enable it.
1500 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1503 class StatefulIncrementalDecoderTest(unittest.TestCase):
1505 Make sure the StatefulIncrementalDecoder actually works.
1508 test_cases = [
1509 # I=1, O=1 (fixed-length input == fixed-length output)
1510 (b'abcd', False, 'a.b.c.d.'),
1511 # I=0, O=0 (variable-length input, variable-length output)
1512 (b'oiabcd', True, 'abcd.'),
1513 # I=0, O=0 (should ignore extra periods)
1514 (b'oi...abcd...', True, 'abcd.'),
1515 # I=0, O=6 (variable-length input, fixed-length output)
1516 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1517 # I=2, O=6 (fixed-length input < fixed-length output)
1518 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1519 # I=6, O=3 (fixed-length input > fixed-length output)
1520 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1521 # I=0, then 3; O=29, then 15 (with longer output)
1522 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1523 'a----------------------------.' +
1524 'b----------------------------.' +
1525 'cde--------------------------.' +
1526 'abcdefghijabcde.' +
1527 'a.b------------.' +
1528 '.c.------------.' +
1529 'd.e------------.' +
1530 'k--------------.' +
1531 'l--------------.' +
1532 'm--------------.')
1535 def test_decoder(self):
1536 # Try a few one-shot test cases.
1537 for input, eof, output in self.test_cases:
1538 d = StatefulIncrementalDecoder()
1539 self.assertEquals(d.decode(input, eof), output)
1541 # Also test an unfinished decode, followed by forcing EOF.
1542 d = StatefulIncrementalDecoder()
1543 self.assertEquals(d.decode(b'oiabcd'), '')
1544 self.assertEquals(d.decode(b'', 1), 'abcd.')
1546 class TextIOWrapperTest(unittest.TestCase):
1548 def setUp(self):
1549 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1550 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1551 support.unlink(support.TESTFN)
1553 def tearDown(self):
1554 support.unlink(support.TESTFN)
1556 def test_constructor(self):
1557 r = self.BytesIO(b"\xc3\xa9\n\n")
1558 b = self.BufferedReader(r, 1000)
1559 t = self.TextIOWrapper(b)
1560 t.__init__(b, encoding="latin1", newline="\r\n")
1561 self.assertEquals(t.encoding, "latin1")
1562 self.assertEquals(t.line_buffering, False)
1563 t.__init__(b, encoding="utf8", line_buffering=True)
1564 self.assertEquals(t.encoding, "utf8")
1565 self.assertEquals(t.line_buffering, True)
1566 self.assertEquals("\xe9\n", t.readline())
1567 self.assertRaises(TypeError, t.__init__, b, newline=42)
1568 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1570 def test_detach(self):
1571 r = self.BytesIO()
1572 b = self.BufferedWriter(r)
1573 t = self.TextIOWrapper(b)
1574 self.assertIs(t.detach(), b)
1576 t = self.TextIOWrapper(b, encoding="ascii")
1577 t.write("howdy")
1578 self.assertFalse(r.getvalue())
1579 t.detach()
1580 self.assertEqual(r.getvalue(), b"howdy")
1581 self.assertRaises(ValueError, t.detach)
1583 def test_repr(self):
1584 raw = self.BytesIO("hello".encode("utf-8"))
1585 b = self.BufferedReader(raw)
1586 t = self.TextIOWrapper(b, encoding="utf-8")
1587 modname = self.TextIOWrapper.__module__
1588 self.assertEqual(repr(t),
1589 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1590 raw.name = "dummy"
1591 self.assertEqual(repr(t),
1592 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1593 raw.name = b"dummy"
1594 self.assertEqual(repr(t),
1595 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1597 def test_line_buffering(self):
1598 r = self.BytesIO()
1599 b = self.BufferedWriter(r, 1000)
1600 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1601 t.write("X")
1602 self.assertEquals(r.getvalue(), b"") # No flush happened
1603 t.write("Y\nZ")
1604 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1605 t.write("A\rB")
1606 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1608 def test_encoding(self):
1609 # Check the encoding attribute is always set, and valid
1610 b = self.BytesIO()
1611 t = self.TextIOWrapper(b, encoding="utf8")
1612 self.assertEqual(t.encoding, "utf8")
1613 t = self.TextIOWrapper(b)
1614 self.assertTrue(t.encoding is not None)
1615 codecs.lookup(t.encoding)
1617 def test_encoding_errors_reading(self):
1618 # (1) default
1619 b = self.BytesIO(b"abc\n\xff\n")
1620 t = self.TextIOWrapper(b, encoding="ascii")
1621 self.assertRaises(UnicodeError, t.read)
1622 # (2) explicit strict
1623 b = self.BytesIO(b"abc\n\xff\n")
1624 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1625 self.assertRaises(UnicodeError, t.read)
1626 # (3) ignore
1627 b = self.BytesIO(b"abc\n\xff\n")
1628 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1629 self.assertEquals(t.read(), "abc\n\n")
1630 # (4) replace
1631 b = self.BytesIO(b"abc\n\xff\n")
1632 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1633 self.assertEquals(t.read(), "abc\n\ufffd\n")
1635 def test_encoding_errors_writing(self):
1636 # (1) default
1637 b = self.BytesIO()
1638 t = self.TextIOWrapper(b, encoding="ascii")
1639 self.assertRaises(UnicodeError, t.write, "\xff")
1640 # (2) explicit strict
1641 b = self.BytesIO()
1642 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1643 self.assertRaises(UnicodeError, t.write, "\xff")
1644 # (3) ignore
1645 b = self.BytesIO()
1646 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1647 newline="\n")
1648 t.write("abc\xffdef\n")
1649 t.flush()
1650 self.assertEquals(b.getvalue(), b"abcdef\n")
1651 # (4) replace
1652 b = self.BytesIO()
1653 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1654 newline="\n")
1655 t.write("abc\xffdef\n")
1656 t.flush()
1657 self.assertEquals(b.getvalue(), b"abc?def\n")
1659 def test_newlines(self):
1660 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1662 tests = [
1663 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1664 [ '', input_lines ],
1665 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1666 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1667 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1669 encodings = (
1670 'utf-8', 'latin-1',
1671 'utf-16', 'utf-16-le', 'utf-16-be',
1672 'utf-32', 'utf-32-le', 'utf-32-be',
1675 # Try a range of buffer sizes to test the case where \r is the last
1676 # character in TextIOWrapper._pending_line.
1677 for encoding in encodings:
1678 # XXX: str.encode() should return bytes
1679 data = bytes(''.join(input_lines).encode(encoding))
1680 for do_reads in (False, True):
1681 for bufsize in range(1, 10):
1682 for newline, exp_lines in tests:
1683 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1684 textio = self.TextIOWrapper(bufio, newline=newline,
1685 encoding=encoding)
1686 if do_reads:
1687 got_lines = []
1688 while True:
1689 c2 = textio.read(2)
1690 if c2 == '':
1691 break
1692 self.assertEquals(len(c2), 2)
1693 got_lines.append(c2 + textio.readline())
1694 else:
1695 got_lines = list(textio)
1697 for got_line, exp_line in zip(got_lines, exp_lines):
1698 self.assertEquals(got_line, exp_line)
1699 self.assertEquals(len(got_lines), len(exp_lines))
1701 def test_newlines_input(self):
1702 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1703 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1704 for newline, expected in [
1705 (None, normalized.decode("ascii").splitlines(True)),
1706 ("", testdata.decode("ascii").splitlines(True)),
1707 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1708 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1709 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1711 buf = self.BytesIO(testdata)
1712 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1713 self.assertEquals(txt.readlines(), expected)
1714 txt.seek(0)
1715 self.assertEquals(txt.read(), "".join(expected))
1717 def test_newlines_output(self):
1718 testdict = {
1719 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1720 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1721 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1722 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1724 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1725 for newline, expected in tests:
1726 buf = self.BytesIO()
1727 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1728 txt.write("AAA\nB")
1729 txt.write("BB\nCCC\n")
1730 txt.write("X\rY\r\nZ")
1731 txt.flush()
1732 self.assertEquals(buf.closed, False)
1733 self.assertEquals(buf.getvalue(), expected)
1735 def test_destructor(self):
1736 l = []
1737 base = self.BytesIO
1738 class MyBytesIO(base):
1739 def close(self):
1740 l.append(self.getvalue())
1741 base.close(self)
1742 b = MyBytesIO()
1743 t = self.TextIOWrapper(b, encoding="ascii")
1744 t.write("abc")
1745 del t
1746 support.gc_collect()
1747 self.assertEquals([b"abc"], l)
1749 def test_override_destructor(self):
1750 record = []
1751 class MyTextIO(self.TextIOWrapper):
1752 def __del__(self):
1753 record.append(1)
1754 try:
1755 f = super(MyTextIO, self).__del__
1756 except AttributeError:
1757 pass
1758 else:
1760 def close(self):
1761 record.append(2)
1762 super(MyTextIO, self).close()
1763 def flush(self):
1764 record.append(3)
1765 super(MyTextIO, self).flush()
1766 b = self.BytesIO()
1767 t = MyTextIO(b, encoding="ascii")
1768 del t
1769 support.gc_collect()
1770 self.assertEqual(record, [1, 2, 3])
1772 def test_error_through_destructor(self):
1773 # Test that the exception state is not modified by a destructor,
1774 # even if close() fails.
1775 rawio = self.CloseFailureIO()
1776 def f():
1777 self.TextIOWrapper(rawio).xyzzy
1778 with support.captured_output("stderr") as s:
1779 self.assertRaises(AttributeError, f)
1780 s = s.getvalue().strip()
1781 if s:
1782 # The destructor *may* have printed an unraisable error, check it
1783 self.assertEqual(len(s.splitlines()), 1)
1784 self.assertTrue(s.startswith("Exception IOError: "), s)
1785 self.assertTrue(s.endswith(" ignored"), s)
1787 # Systematic tests of the text I/O API
1789 def test_basic_io(self):
1790 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1791 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
1792 f = self.open(support.TESTFN, "w+", encoding=enc)
1793 f._CHUNK_SIZE = chunksize
1794 self.assertEquals(f.write("abc"), 3)
1795 f.close()
1796 f = self.open(support.TESTFN, "r+", encoding=enc)
1797 f._CHUNK_SIZE = chunksize
1798 self.assertEquals(f.tell(), 0)
1799 self.assertEquals(f.read(), "abc")
1800 cookie = f.tell()
1801 self.assertEquals(f.seek(0), 0)
1802 self.assertEquals(f.read(None), "abc")
1803 f.seek(0)
1804 self.assertEquals(f.read(2), "ab")
1805 self.assertEquals(f.read(1), "c")
1806 self.assertEquals(f.read(1), "")
1807 self.assertEquals(f.read(), "")
1808 self.assertEquals(f.tell(), cookie)
1809 self.assertEquals(f.seek(0), 0)
1810 self.assertEquals(f.seek(0, 2), cookie)
1811 self.assertEquals(f.write("def"), 3)
1812 self.assertEquals(f.seek(cookie), cookie)
1813 self.assertEquals(f.read(), "def")
1814 if enc.startswith("utf"):
1815 self.multi_line_test(f, enc)
1816 f.close()
1818 def multi_line_test(self, f, enc):
1819 f.seek(0)
1820 f.truncate()
1821 sample = "s\xff\u0fff\uffff"
1822 wlines = []
1823 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1824 chars = []
1825 for i in range(size):
1826 chars.append(sample[i % len(sample)])
1827 line = "".join(chars) + "\n"
1828 wlines.append((f.tell(), line))
1829 f.write(line)
1830 f.seek(0)
1831 rlines = []
1832 while True:
1833 pos = f.tell()
1834 line = f.readline()
1835 if not line:
1836 break
1837 rlines.append((pos, line))
1838 self.assertEquals(rlines, wlines)
1840 def test_telling(self):
1841 f = self.open(support.TESTFN, "w+", encoding="utf8")
1842 p0 = f.tell()
1843 f.write("\xff\n")
1844 p1 = f.tell()
1845 f.write("\xff\n")
1846 p2 = f.tell()
1847 f.seek(0)
1848 self.assertEquals(f.tell(), p0)
1849 self.assertEquals(f.readline(), "\xff\n")
1850 self.assertEquals(f.tell(), p1)
1851 self.assertEquals(f.readline(), "\xff\n")
1852 self.assertEquals(f.tell(), p2)
1853 f.seek(0)
1854 for line in f:
1855 self.assertEquals(line, "\xff\n")
1856 self.assertRaises(IOError, f.tell)
1857 self.assertEquals(f.tell(), p2)
1858 f.close()
1860 def test_seeking(self):
1861 chunk_size = _default_chunk_size()
1862 prefix_size = chunk_size - 2
1863 u_prefix = "a" * prefix_size
1864 prefix = bytes(u_prefix.encode("utf-8"))
1865 self.assertEquals(len(u_prefix), len(prefix))
1866 u_suffix = "\u8888\n"
1867 suffix = bytes(u_suffix.encode("utf-8"))
1868 line = prefix + suffix
1869 f = self.open(support.TESTFN, "wb")
1870 f.write(line*2)
1871 f.close()
1872 f = self.open(support.TESTFN, "r", encoding="utf-8")
1873 s = f.read(prefix_size)
1874 self.assertEquals(s, prefix.decode("ascii"))
1875 self.assertEquals(f.tell(), prefix_size)
1876 self.assertEquals(f.readline(), u_suffix)
1878 def test_seeking_too(self):
1879 # Regression test for a specific bug
1880 data = b'\xe0\xbf\xbf\n'
1881 f = self.open(support.TESTFN, "wb")
1882 f.write(data)
1883 f.close()
1884 f = self.open(support.TESTFN, "r", encoding="utf-8")
1885 f._CHUNK_SIZE # Just test that it exists
1886 f._CHUNK_SIZE = 2
1887 f.readline()
1888 f.tell()
1890 def test_seek_and_tell(self):
1891 #Test seek/tell using the StatefulIncrementalDecoder.
1892 # Make test faster by doing smaller seeks
1893 CHUNK_SIZE = 128
1895 def test_seek_and_tell_with_data(data, min_pos=0):
1896 """Tell/seek to various points within a data stream and ensure
1897 that the decoded data returned by read() is consistent."""
1898 f = self.open(support.TESTFN, 'wb')
1899 f.write(data)
1900 f.close()
1901 f = self.open(support.TESTFN, encoding='test_decoder')
1902 f._CHUNK_SIZE = CHUNK_SIZE
1903 decoded = f.read()
1904 f.close()
1906 for i in range(min_pos, len(decoded) + 1): # seek positions
1907 for j in [1, 5, len(decoded) - i]: # read lengths
1908 f = self.open(support.TESTFN, encoding='test_decoder')
1909 self.assertEquals(f.read(i), decoded[:i])
1910 cookie = f.tell()
1911 self.assertEquals(f.read(j), decoded[i:i + j])
1912 f.seek(cookie)
1913 self.assertEquals(f.read(), decoded[i:])
1914 f.close()
1916 # Enable the test decoder.
1917 StatefulIncrementalDecoder.codecEnabled = 1
1919 # Run the tests.
1920 try:
1921 # Try each test case.
1922 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1923 test_seek_and_tell_with_data(input)
1925 # Position each test case so that it crosses a chunk boundary.
1926 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1927 offset = CHUNK_SIZE - len(input)//2
1928 prefix = b'.'*offset
1929 # Don't bother seeking into the prefix (takes too long).
1930 min_pos = offset*2
1931 test_seek_and_tell_with_data(prefix + input, min_pos)
1933 # Ensure our test decoder won't interfere with subsequent tests.
1934 finally:
1935 StatefulIncrementalDecoder.codecEnabled = 0
1937 def test_encoded_writes(self):
1938 data = "1234567890"
1939 tests = ("utf-16",
1940 "utf-16-le",
1941 "utf-16-be",
1942 "utf-32",
1943 "utf-32-le",
1944 "utf-32-be")
1945 for encoding in tests:
1946 buf = self.BytesIO()
1947 f = self.TextIOWrapper(buf, encoding=encoding)
1948 # Check if the BOM is written only once (see issue1753).
1949 f.write(data)
1950 f.write(data)
1951 f.seek(0)
1952 self.assertEquals(f.read(), data * 2)
1953 f.seek(0)
1954 self.assertEquals(f.read(), data * 2)
1955 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1957 def test_unreadable(self):
1958 class UnReadable(self.BytesIO):
1959 def readable(self):
1960 return False
1961 txt = self.TextIOWrapper(UnReadable())
1962 self.assertRaises(IOError, txt.read)
1964 def test_read_one_by_one(self):
1965 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
1966 reads = ""
1967 while True:
1968 c = txt.read(1)
1969 if not c:
1970 break
1971 reads += c
1972 self.assertEquals(reads, "AA\nBB")
1974 def test_readlines(self):
1975 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1976 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1977 txt.seek(0)
1978 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1979 txt.seek(0)
1980 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1982 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1983 def test_read_by_chunk(self):
1984 # make sure "\r\n" straddles 128 char boundary.
1985 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
1986 reads = ""
1987 while True:
1988 c = txt.read(128)
1989 if not c:
1990 break
1991 reads += c
1992 self.assertEquals(reads, "A"*127+"\nB")
1994 def test_issue1395_1(self):
1995 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
1997 # read one char at a time
1998 reads = ""
1999 while True:
2000 c = txt.read(1)
2001 if not c:
2002 break
2003 reads += c
2004 self.assertEquals(reads, self.normalized)
2006 def test_issue1395_2(self):
2007 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2008 txt._CHUNK_SIZE = 4
2010 reads = ""
2011 while True:
2012 c = txt.read(4)
2013 if not c:
2014 break
2015 reads += c
2016 self.assertEquals(reads, self.normalized)
2018 def test_issue1395_3(self):
2019 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2020 txt._CHUNK_SIZE = 4
2022 reads = txt.read(4)
2023 reads += txt.read(4)
2024 reads += txt.readline()
2025 reads += txt.readline()
2026 reads += txt.readline()
2027 self.assertEquals(reads, self.normalized)
2029 def test_issue1395_4(self):
2030 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2031 txt._CHUNK_SIZE = 4
2033 reads = txt.read(4)
2034 reads += txt.read()
2035 self.assertEquals(reads, self.normalized)
2037 def test_issue1395_5(self):
2038 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2039 txt._CHUNK_SIZE = 4
2041 reads = txt.read(4)
2042 pos = txt.tell()
2043 txt.seek(0)
2044 txt.seek(pos)
2045 self.assertEquals(txt.read(4), "BBB\n")
2047 def test_issue2282(self):
2048 buffer = self.BytesIO(self.testdata)
2049 txt = self.TextIOWrapper(buffer, encoding="ascii")
2051 self.assertEqual(buffer.seekable(), txt.seekable())
2053 @unittest.skip("Issue #6213 with incremental encoders")
2054 def test_append_bom(self):
2055 # The BOM is not written again when appending to a non-empty file
2056 filename = support.TESTFN
2057 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2058 with self.open(filename, 'w', encoding=charset) as f:
2059 f.write('aaa')
2060 pos = f.tell()
2061 with self.open(filename, 'rb') as f:
2062 self.assertEquals(f.read(), 'aaa'.encode(charset))
2064 with self.open(filename, 'a', encoding=charset) as f:
2065 f.write('xxx')
2066 with self.open(filename, 'rb') as f:
2067 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2069 @unittest.skip("Issue #6213 with incremental encoders")
2070 def test_seek_bom(self):
2071 # Same test, but when seeking manually
2072 filename = support.TESTFN
2073 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2074 with self.open(filename, 'w', encoding=charset) as f:
2075 f.write('aaa')
2076 pos = f.tell()
2077 with self.open(filename, 'r+', encoding=charset) as f:
2078 f.seek(pos)
2079 f.write('zzz')
2080 f.seek(0)
2081 f.write('bbb')
2082 with self.open(filename, 'rb') as f:
2083 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2085 def test_errors_property(self):
2086 with self.open(support.TESTFN, "w") as f:
2087 self.assertEqual(f.errors, "strict")
2088 with self.open(support.TESTFN, "w", errors="replace") as f:
2089 self.assertEqual(f.errors, "replace")
2092 def test_threads_write(self):
2093 # Issue6750: concurrent writes could duplicate data
2094 event = threading.Event()
2095 with self.open(support.TESTFN, "w", buffering=1) as f:
2096 def run(n):
2097 text = "Thread%03d\n" % n
2098 event.wait()
2099 f.write(text)
2100 threads = [threading.Thread(target=lambda n=x: run(n))
2101 for x in range(20)]
2102 for t in threads:
2103 t.start()
2104 time.sleep(0.02)
2105 event.set()
2106 for t in threads:
2107 t.join()
2108 with self.open(support.TESTFN) as f:
2109 content = f.read()
2110 for n in range(20):
2111 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2113 class CTextIOWrapperTest(TextIOWrapperTest):
2115 def test_initialization(self):
2116 r = self.BytesIO(b"\xc3\xa9\n\n")
2117 b = self.BufferedReader(r, 1000)
2118 t = self.TextIOWrapper(b)
2119 self.assertRaises(TypeError, t.__init__, b, newline=42)
2120 self.assertRaises(ValueError, t.read)
2121 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2122 self.assertRaises(ValueError, t.read)
2124 def test_garbage_collection(self):
2125 # C TextIOWrapper objects are collected, and collecting them flushes
2126 # all data to disk.
2127 # The Python version has __del__, so it ends in gc.garbage instead.
2128 rawio = io.FileIO(support.TESTFN, "wb")
2129 b = self.BufferedWriter(rawio)
2130 t = self.TextIOWrapper(b, encoding="ascii")
2131 t.write("456def")
2132 t.x = t
2133 wr = weakref.ref(t)
2134 del t
2135 support.gc_collect()
2136 self.assertTrue(wr() is None, wr)
2137 with self.open(support.TESTFN, "rb") as f:
2138 self.assertEqual(f.read(), b"456def")
2140 class PyTextIOWrapperTest(TextIOWrapperTest):
2141 pass
2144 class IncrementalNewlineDecoderTest(unittest.TestCase):
2146 def check_newline_decoding_utf8(self, decoder):
2147 # UTF-8 specific tests for a newline decoder
2148 def _check_decode(b, s, **kwargs):
2149 # We exercise getstate() / setstate() as well as decode()
2150 state = decoder.getstate()
2151 self.assertEquals(decoder.decode(b, **kwargs), s)
2152 decoder.setstate(state)
2153 self.assertEquals(decoder.decode(b, **kwargs), s)
2155 _check_decode(b'\xe8\xa2\x88', "\u8888")
2157 _check_decode(b'\xe8', "")
2158 _check_decode(b'\xa2', "")
2159 _check_decode(b'\x88', "\u8888")
2161 _check_decode(b'\xe8', "")
2162 _check_decode(b'\xa2', "")
2163 _check_decode(b'\x88', "\u8888")
2165 _check_decode(b'\xe8', "")
2166 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2168 decoder.reset()
2169 _check_decode(b'\n', "\n")
2170 _check_decode(b'\r', "")
2171 _check_decode(b'', "\n", final=True)
2172 _check_decode(b'\r', "\n", final=True)
2174 _check_decode(b'\r', "")
2175 _check_decode(b'a', "\na")
2177 _check_decode(b'\r\r\n', "\n\n")
2178 _check_decode(b'\r', "")
2179 _check_decode(b'\r', "\n")
2180 _check_decode(b'\na', "\na")
2182 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2183 _check_decode(b'\xe8\xa2\x88', "\u8888")
2184 _check_decode(b'\n', "\n")
2185 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2186 _check_decode(b'\n', "\n")
2188 def check_newline_decoding(self, decoder, encoding):
2189 result = []
2190 if encoding is not None:
2191 encoder = codecs.getincrementalencoder(encoding)()
2192 def _decode_bytewise(s):
2193 # Decode one byte at a time
2194 for b in encoder.encode(s):
2195 result.append(decoder.decode(b))
2196 else:
2197 encoder = None
2198 def _decode_bytewise(s):
2199 # Decode one char at a time
2200 for c in s:
2201 result.append(decoder.decode(c))
2202 self.assertEquals(decoder.newlines, None)
2203 _decode_bytewise("abc\n\r")
2204 self.assertEquals(decoder.newlines, '\n')
2205 _decode_bytewise("\nabc")
2206 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2207 _decode_bytewise("abc\r")
2208 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2209 _decode_bytewise("abc")
2210 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2211 _decode_bytewise("abc\r")
2212 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2213 decoder.reset()
2214 input = "abc"
2215 if encoder is not None:
2216 encoder.reset()
2217 input = encoder.encode(input)
2218 self.assertEquals(decoder.decode(input), "abc")
2219 self.assertEquals(decoder.newlines, None)
2221 def test_newline_decoder(self):
2222 encodings = (
2223 # None meaning the IncrementalNewlineDecoder takes unicode input
2224 # rather than bytes input
2225 None, 'utf-8', 'latin-1',
2226 'utf-16', 'utf-16-le', 'utf-16-be',
2227 'utf-32', 'utf-32-le', 'utf-32-be',
2229 for enc in encodings:
2230 decoder = enc and codecs.getincrementaldecoder(enc)()
2231 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2232 self.check_newline_decoding(decoder, enc)
2233 decoder = codecs.getincrementaldecoder("utf-8")()
2234 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2235 self.check_newline_decoding_utf8(decoder)
2237 def test_newline_bytes(self):
2238 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2239 def _check(dec):
2240 self.assertEquals(dec.newlines, None)
2241 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2242 self.assertEquals(dec.newlines, None)
2243 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2244 self.assertEquals(dec.newlines, None)
2245 dec = self.IncrementalNewlineDecoder(None, translate=False)
2246 _check(dec)
2247 dec = self.IncrementalNewlineDecoder(None, translate=True)
2248 _check(dec)
2250 class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2251 pass
2253 class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2254 pass
2257 # XXX Tests for open()
2259 class MiscIOTest(unittest.TestCase):
2261 def tearDown(self):
2262 support.unlink(support.TESTFN)
2264 def test___all__(self):
2265 for name in self.io.__all__:
2266 obj = getattr(self.io, name, None)
2267 self.assertTrue(obj is not None, name)
2268 if name == "open":
2269 continue
2270 elif "error" in name.lower() or name == "UnsupportedOperation":
2271 self.assertTrue(issubclass(obj, Exception), name)
2272 elif not name.startswith("SEEK_"):
2273 self.assertTrue(issubclass(obj, self.IOBase))
2275 def test_attributes(self):
2276 f = self.open(support.TESTFN, "wb", buffering=0)
2277 self.assertEquals(f.mode, "wb")
2278 f.close()
2280 f = self.open(support.TESTFN, "U")
2281 self.assertEquals(f.name, support.TESTFN)
2282 self.assertEquals(f.buffer.name, support.TESTFN)
2283 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2284 self.assertEquals(f.mode, "U")
2285 self.assertEquals(f.buffer.mode, "rb")
2286 self.assertEquals(f.buffer.raw.mode, "rb")
2287 f.close()
2289 f = self.open(support.TESTFN, "w+")
2290 self.assertEquals(f.mode, "w+")
2291 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2292 self.assertEquals(f.buffer.raw.mode, "rb+")
2294 g = self.open(f.fileno(), "wb", closefd=False)
2295 self.assertEquals(g.mode, "wb")
2296 self.assertEquals(g.raw.mode, "wb")
2297 self.assertEquals(g.name, f.fileno())
2298 self.assertEquals(g.raw.name, f.fileno())
2299 f.close()
2300 g.close()
2302 def test_io_after_close(self):
2303 for kwargs in [
2304 {"mode": "w"},
2305 {"mode": "wb"},
2306 {"mode": "w", "buffering": 1},
2307 {"mode": "w", "buffering": 2},
2308 {"mode": "wb", "buffering": 0},
2309 {"mode": "r"},
2310 {"mode": "rb"},
2311 {"mode": "r", "buffering": 1},
2312 {"mode": "r", "buffering": 2},
2313 {"mode": "rb", "buffering": 0},
2314 {"mode": "w+"},
2315 {"mode": "w+b"},
2316 {"mode": "w+", "buffering": 1},
2317 {"mode": "w+", "buffering": 2},
2318 {"mode": "w+b", "buffering": 0},
2320 f = self.open(support.TESTFN, **kwargs)
2321 f.close()
2322 self.assertRaises(ValueError, f.flush)
2323 self.assertRaises(ValueError, f.fileno)
2324 self.assertRaises(ValueError, f.isatty)
2325 self.assertRaises(ValueError, f.__iter__)
2326 if hasattr(f, "peek"):
2327 self.assertRaises(ValueError, f.peek, 1)
2328 self.assertRaises(ValueError, f.read)
2329 if hasattr(f, "read1"):
2330 self.assertRaises(ValueError, f.read1, 1024)
2331 if hasattr(f, "readinto"):
2332 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2333 self.assertRaises(ValueError, f.readline)
2334 self.assertRaises(ValueError, f.readlines)
2335 self.assertRaises(ValueError, f.seek, 0)
2336 self.assertRaises(ValueError, f.tell)
2337 self.assertRaises(ValueError, f.truncate)
2338 self.assertRaises(ValueError, f.write,
2339 b"" if "b" in kwargs['mode'] else "")
2340 self.assertRaises(ValueError, f.writelines, [])
2341 self.assertRaises(ValueError, next, f)
2343 def test_blockingioerror(self):
2344 # Various BlockingIOError issues
2345 self.assertRaises(TypeError, self.BlockingIOError)
2346 self.assertRaises(TypeError, self.BlockingIOError, 1)
2347 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2348 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2349 b = self.BlockingIOError(1, "")
2350 self.assertEqual(b.characters_written, 0)
2351 class C(unicode):
2352 pass
2353 c = C("")
2354 b = self.BlockingIOError(1, c)
2355 c.b = b
2356 b.c = c
2357 wr = weakref.ref(c)
2358 del c, b
2359 support.gc_collect()
2360 self.assertTrue(wr() is None, wr)
2362 def test_abcs(self):
2363 # Test the visible base classes are ABCs.
2364 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2365 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2366 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2367 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2369 def _check_abc_inheritance(self, abcmodule):
2370 with self.open(support.TESTFN, "wb", buffering=0) as f:
2371 self.assertTrue(isinstance(f, abcmodule.IOBase))
2372 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2373 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2374 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2375 with self.open(support.TESTFN, "wb") as f:
2376 self.assertTrue(isinstance(f, abcmodule.IOBase))
2377 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2378 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2379 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2380 with self.open(support.TESTFN, "w") as f:
2381 self.assertTrue(isinstance(f, abcmodule.IOBase))
2382 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2383 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2384 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2386 def test_abc_inheritance(self):
2387 # Test implementations inherit from their respective ABCs
2388 self._check_abc_inheritance(self)
2390 def test_abc_inheritance_official(self):
2391 # Test implementations inherit from the official ABCs of the
2392 # baseline "io" module.
2393 self._check_abc_inheritance(io)
2395 class CMiscIOTest(MiscIOTest):
2396 io = io
2398 class PyMiscIOTest(MiscIOTest):
2399 io = pyio
2401 def test_main():
2402 tests = (CIOTest, PyIOTest,
2403 CBufferedReaderTest, PyBufferedReaderTest,
2404 CBufferedWriterTest, PyBufferedWriterTest,
2405 CBufferedRWPairTest, PyBufferedRWPairTest,
2406 CBufferedRandomTest, PyBufferedRandomTest,
2407 StatefulIncrementalDecoderTest,
2408 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2409 CTextIOWrapperTest, PyTextIOWrapperTest,
2410 CMiscIOTest, PyMiscIOTest,
2413 # Put the namespaces of the IO module we are testing and some useful mock
2414 # classes in the __dict__ of each test.
2415 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2416 MockNonBlockWriterIO)
2417 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2418 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2419 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2420 globs = globals()
2421 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2422 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2423 # Avoid turning open into a bound method.
2424 py_io_ns["open"] = pyio.OpenWrapper
2425 for test in tests:
2426 if test.__name__.startswith("C"):
2427 for name, obj in c_io_ns.items():
2428 setattr(test, name, obj)
2429 elif test.__name__.startswith("Py"):
2430 for name, obj in py_io_ns.items():
2431 setattr(test, name, obj)
2433 support.run_unittest(*tests)
2435 if __name__ == "__main__":
2436 test_main()