6 from array
import array
7 from weakref
import proxy
13 from test
import test_support
14 from test
.test_support
import TESTFN
, run_unittest
15 from UserList
import UserList
17 class AutoFileTests(unittest
.TestCase
):
18 # file tests for which a test file is automatically set up
21 self
.f
= open(TESTFN
, 'wb')
28 def testWeakRefs(self
):
29 # verify weak references
32 self
.assertEquals(self
.f
.tell(), p
.tell())
35 self
.assertRaises(ReferenceError, getattr, p
, 'tell')
37 def testAttributes(self
):
38 # verify expected attributes exist
40 with test_support
.check_py3k_warnings():
41 softspace
= f
.softspace
42 f
.name
# merely shouldn't blow up
46 with test_support
.check_py3k_warnings():
47 # verify softspace is writable
48 f
.softspace
= softspace
# merely shouldn't blow up
50 # verify the others aren't
51 for attr
in 'name', 'mode', 'closed':
52 self
.assertRaises((AttributeError, TypeError), setattr, f
, attr
, 'oops')
54 def testReadinto(self
):
58 a
= array('c', 'x'*10)
59 self
.f
= open(TESTFN
, 'rb')
60 n
= self
.f
.readinto(a
)
61 self
.assertEquals('12', a
.tostring()[:n
])
63 def testWritelinesUserList(self
):
64 # verify writelines with instance sequence
65 l
= UserList(['1', '2'])
68 self
.f
= open(TESTFN
, 'rb')
70 self
.assertEquals(buf
, '12')
72 def testWritelinesIntegers(self
):
73 # verify writelines with integers
74 self
.assertRaises(TypeError, self
.f
.writelines
, [1, 2, 3])
76 def testWritelinesIntegersUserList(self
):
77 # verify writelines with integers in UserList
79 self
.assertRaises(TypeError, self
.f
.writelines
, l
)
81 def testWritelinesNonString(self
):
82 # verify writelines with non-string object
86 self
.assertRaises(TypeError, self
.f
.writelines
,
87 [NonString(), NonString()])
91 self
.assertTrue(repr(self
.f
).startswith("<open file '" + TESTFN
))
95 self
.f
= open(TESTFN
, 'rb')
97 self
.assertEquals(f
.name
, TESTFN
)
98 self
.assertTrue(not f
.isatty())
99 self
.assertTrue(not f
.closed
)
101 self
.assertRaises(TypeError, f
.readinto
, "")
103 self
.assertTrue(f
.closed
)
105 def testMethods(self
):
106 methods
= ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
107 'readline', 'readlines', 'seek', 'tell', 'truncate',
109 deprecated_methods
= ['xreadlines']
110 if sys
.platform
.startswith('atheos'):
111 methods
.remove('truncate')
113 # __exit__ should close the file
114 self
.f
.__exit
__(None, None, None)
115 self
.assertTrue(self
.f
.closed
)
117 for methodname
in methods
:
118 method
= getattr(self
.f
, methodname
)
119 # should raise on closed file
120 self
.assertRaises(ValueError, method
)
121 with test_support
.check_py3k_warnings():
122 for methodname
in deprecated_methods
:
123 method
= getattr(self
.f
, methodname
)
124 self
.assertRaises(ValueError, method
)
125 self
.assertRaises(ValueError, self
.f
.writelines
, [])
127 # file is closed, __exit__ shouldn't do anything
128 self
.assertEquals(self
.f
.__exit
__(None, None, None), None)
129 # it must also return None if an exception was given
133 self
.assertEquals(self
.f
.__exit
__(*sys
.exc_info()), None)
135 def testReadWhenWriting(self
):
136 self
.assertRaises(IOError, self
.f
.read
)
138 def testIssue5677(self
):
139 # Remark: Do not perform more than one test per open file,
140 # since that does NOT catch the readline error on Windows.
142 for mode
in ['w', 'wb', 'a', 'ab']:
143 for attr
in ['read', 'readline', 'readlines']:
144 self
.f
= open(TESTFN
, mode
)
146 self
.assertRaises(IOError, getattr(self
.f
, attr
))
149 self
.f
= open(TESTFN
, mode
)
151 self
.assertRaises(IOError, lambda: [line
for line
in self
.f
])
154 self
.f
= open(TESTFN
, mode
)
156 self
.assertRaises(IOError, self
.f
.readinto
, bytearray(len(data
)))
159 for mode
in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
160 self
.f
= open(TESTFN
, mode
)
161 self
.assertRaises(IOError, self
.f
.write
, data
)
164 self
.f
= open(TESTFN
, mode
)
165 self
.assertRaises(IOError, self
.f
.writelines
, [data
, data
])
168 self
.f
= open(TESTFN
, mode
)
169 self
.assertRaises(IOError, self
.f
.truncate
)
172 class OtherFileTests(unittest
.TestCase
):
174 def testOpenDir(self
):
175 this_dir
= os
.path
.dirname(__file__
)
176 for mode
in (None, "w"):
179 f
= open(this_dir
, mode
)
183 self
.assertEqual(e
.filename
, this_dir
)
185 self
.fail("opening a directory didn't raise an IOError")
187 def testModeStrings(self
):
188 # check invalid mode strings
189 for mode
in ("", "aU", "wU+"):
191 f
= open(TESTFN
, mode
)
196 self
.fail('%r is an invalid file mode' % mode
)
198 # Some invalid modes fail on Windows, but pass on Unix
199 # Issue3965: avoid a crash on Windows when filename is unicode
200 for name
in (TESTFN
, unicode(TESTFN
), unicode(TESTFN
+ '\t')):
203 except (IOError, ValueError):
209 # This causes the interpreter to exit on OSF1 v5.1.
210 if sys
.platform
!= 'osf1V5':
211 self
.assertRaises(IOError, sys
.stdin
.seek
, -1)
213 print >>sys
.__stdout
__, (
214 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
216 self
.assertRaises(IOError, sys
.stdin
.truncate
)
218 def testUnicodeOpen(self
):
219 # verify repr works for unicode too
220 f
= open(unicode(TESTFN
), "w")
221 self
.assertTrue(repr(f
).startswith("<open file u'" + TESTFN
))
225 def testBadModeArgument(self
):
226 # verify that we get a sensible error message for bad mode argument
229 f
= open(TESTFN
, bad_mode
)
230 except ValueError, msg
:
233 if TESTFN
in s
or bad_mode
not in s
:
234 self
.fail("bad error message for invalid mode: %s" % s
)
235 # if msg.args[0] == 0, we're probably on Windows where there may
236 # be no obvious way to discover why open() failed.
239 self
.fail("no error for invalid mode: %s" % bad_mode
)
241 def testSetBufferSize(self
):
242 # make sure that explicitly setting the buffer size doesn't cause
243 # misbehaviour especially with repeated close() calls
244 for s
in (-1, 0, 1, 512):
246 f
= open(TESTFN
, 'w', s
)
250 f
= open(TESTFN
, 'r', s
)
255 self
.fail('error setting buffer size %d: %s' % (s
, str(msg
)))
256 self
.assertEquals(d
, s
)
258 def testTruncateOnWindows(self
):
262 # SF bug <http://www.python.org/sf/801631>
263 # "file.truncate fault on windows"
264 f
= open(TESTFN
, 'wb')
265 f
.write('12345678901') # 11 bytes
268 f
= open(TESTFN
,'rb+')
271 self
.fail("Read on file opened for update failed %r" % data
)
273 self
.fail("File pos after read wrong %d" % f
.tell())
277 self
.fail("File pos after ftruncate wrong %d" % f
.tell())
280 size
= os
.path
.getsize(TESTFN
)
282 self
.fail("File size after ftruncate wrong %d" % size
)
289 def testIteration(self
):
290 # Test the complex interaction when mixing file-iteration and the
291 # various read* methods. Ostensibly, the mixture could just be tested
292 # to work when it should work according to the Python language,
293 # instead of fail when it should fail according to the current CPython
294 # implementation. People don't always program Python the way they
295 # should, though, and the implemenation might change in subtle ways,
296 # so we explicitly test for errors, too; the test will just have to
297 # be updated when the implementation changes.
300 assert not dataoffset
% len(filler
), \
301 "dataoffset must be multiple of len(filler)"
302 nchunks
= dataoffset
// len(filler
)
304 "spam, spam and eggs\n",
305 "eggs, spam, ham and spam\n",
306 "saussages, spam, spam and eggs\n",
307 "spam, ham, spam and eggs\n",
308 "spam, spam, spam, spam, spam, ham, spam\n",
309 "wonderful spaaaaaam.\n"
311 methods
= [("readline", ()), ("read", ()), ("readlines", ()),
312 ("readinto", (array("c", " "*100),))]
315 # Prepare the testfile
316 bag
= open(TESTFN
, "w")
317 bag
.write(filler
* nchunks
)
318 bag
.writelines(testlines
)
320 # Test for appropriate errors mixing read* and iteration
321 for methodname
, args
in methods
:
323 if f
.next() != filler
:
324 self
.fail
, "Broken testfile"
325 meth
= getattr(f
, methodname
)
331 self
.fail("%s%r after next() didn't raise ValueError" %
335 # Test to see if harmless (by accident) mixing of read* and
336 # iteration still works. This depends on the size of the internal
337 # iteration buffer (currently 8192,) but we can test it in a
338 # flexible manner. Each line in the bag o' ham is 4 bytes
339 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
340 # exactly on the buffer boundary for any power-of-2 buffersize
341 # between 4 and 16384 (inclusive).
343 for i
in range(nchunks
):
345 testline
= testlines
.pop(0)
349 self
.fail("readline() after next() with supposedly empty "
350 "iteration-buffer failed anyway")
352 self
.fail("readline() after next() with empty buffer "
353 "failed. Got %r, expected %r" % (line
, testline
))
354 testline
= testlines
.pop(0)
355 buf
= array("c", "\x00" * len(testline
))
359 self
.fail("readinto() after next() with supposedly empty "
360 "iteration-buffer failed anyway")
361 line
= buf
.tostring()
363 self
.fail("readinto() after next() with empty buffer "
364 "failed. Got %r, expected %r" % (line
, testline
))
366 testline
= testlines
.pop(0)
368 line
= f
.read(len(testline
))
370 self
.fail("read() after next() with supposedly empty "
371 "iteration-buffer failed anyway")
373 self
.fail("read() after next() with empty buffer "
374 "failed. Got %r, expected %r" % (line
, testline
))
376 lines
= f
.readlines()
378 self
.fail("readlines() after next() with supposedly empty "
379 "iteration-buffer failed anyway")
380 if lines
!= testlines
:
381 self
.fail("readlines() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line
, testline
))
383 # Reading after iteration hit EOF shouldn't hurt either
394 self
.fail("read* failed after next() consumed file")
400 class FileSubclassTests(unittest
.TestCase
):
403 # test that exiting with context calls subclass' close
405 def __init__(self
, *args
):
406 self
.subclass_closed
= False
407 file.__init
__(self
, *args
)
409 self
.subclass_closed
= True
412 with
C(TESTFN
, 'w') as f
:
414 self
.assertTrue(f
.subclass_closed
)
417 @unittest.skipUnless(threading
, 'Threading required for this test.')
418 class FileThreadingTests(unittest
.TestCase
):
419 # These tests check the ability to call various methods of file objects
420 # (including close()) concurrently without crashing the Python interpreter.
421 # See #815646, #595601
424 self
._threads
= test_support
.threading_setup()
426 self
.filename
= TESTFN
427 with
open(self
.filename
, "w") as f
:
428 f
.write("\n".join("0123456789"))
429 self
._count
_lock
= threading
.Lock()
431 self
.close_success_count
= 0
432 self
.use_buffering
= False
438 except (EnvironmentError, ValueError):
441 os
.remove(self
.filename
)
442 except EnvironmentError:
444 test_support
.threading_cleanup(*self
._threads
)
446 def _create_file(self
):
447 if self
.use_buffering
:
448 self
.f
= open(self
.filename
, "w+", buffering
=1024*16)
450 self
.f
= open(self
.filename
, "w+")
452 def _close_file(self
):
453 with self
._count
_lock
:
454 self
.close_count
+= 1
456 with self
._count
_lock
:
457 self
.close_success_count
+= 1
459 def _close_and_reopen_file(self
):
461 # if close raises an exception thats fine, self.f remains valid so
462 # we don't need to reopen.
465 def _run_workers(self
, func
, nb_workers
, duration
=0.2):
466 with self
._count
_lock
:
468 self
.close_success_count
= 0
469 self
.do_continue
= True
472 for i
in range(nb_workers
):
473 t
= threading
.Thread(target
=func
)
476 for _
in xrange(100):
477 time
.sleep(duration
/100)
478 with self
._count
_lock
:
479 if self
.close_count
-self
.close_success_count
> nb_workers
+1:
480 if test_support
.verbose
:
485 self
.do_continue
= False
489 def _test_close_open_io(self
, io_func
, nb_workers
=5):
492 funcs
= itertools
.cycle((
494 lambda: self
._close
_and
_reopen
_file
(),
497 if not self
.do_continue
:
501 except (IOError, ValueError):
503 self
._run
_workers
(worker
, nb_workers
)
504 if test_support
.verbose
:
505 # Useful verbose statistics when tuning this test to take
506 # less time to run but still ensuring that its still useful.
508 # the percent of close calls that raised an error
509 percent
= 100. - 100.*self
.close_success_count
/self
.close_count
510 print self
.close_count
, ('%.4f ' % percent
),
512 def test_close_open(self
):
515 self
._test
_close
_open
_io
(io_func
)
517 def test_close_open_flush(self
):
520 self
._test
_close
_open
_io
(io_func
)
522 def test_close_open_iter(self
):
525 self
._test
_close
_open
_io
(io_func
)
527 def test_close_open_isatty(self
):
530 self
._test
_close
_open
_io
(io_func
)
532 def test_close_open_print(self
):
535 self
._test
_close
_open
_io
(io_func
)
537 def test_close_open_print_buffered(self
):
538 self
.use_buffering
= True
541 self
._test
_close
_open
_io
(io_func
)
543 def test_close_open_read(self
):
546 self
._test
_close
_open
_io
(io_func
)
548 def test_close_open_readinto(self
):
550 a
= array('c', 'xxxxx')
552 self
._test
_close
_open
_io
(io_func
)
554 def test_close_open_readline(self
):
557 self
._test
_close
_open
_io
(io_func
)
559 def test_close_open_readlines(self
):
562 self
._test
_close
_open
_io
(io_func
)
564 def test_close_open_seek(self
):
567 self
._test
_close
_open
_io
(io_func
)
569 def test_close_open_tell(self
):
572 self
._test
_close
_open
_io
(io_func
)
574 def test_close_open_truncate(self
):
577 self
._test
_close
_open
_io
(io_func
)
579 def test_close_open_write(self
):
582 self
._test
_close
_open
_io
(io_func
)
584 def test_close_open_writelines(self
):
586 self
.f
.writelines('')
587 self
._test
_close
_open
_io
(io_func
)
590 class StdoutTests(unittest
.TestCase
):
592 def test_move_stdout_on_write(self
):
593 # Issue 3242: sys.stdout can be replaced (and freed) during a
594 # print statement; prevent a segfault in this case
595 save_stdout
= sys
.stdout
598 def write(self
, data
):
600 sys
.stdout
= save_stdout
606 sys
.stdout
= save_stdout
608 def test_del_stdout_before_print(self
):
609 # Issue 4597: 'print' with no argument wasn't reporting when
610 # sys.stdout was deleted.
611 save_stdout
= sys
.stdout
615 except RuntimeError as e
:
616 self
.assertEquals(str(e
), "lost sys.stdout")
618 self
.fail("Expected RuntimeError")
620 sys
.stdout
= save_stdout
624 # Historically, these tests have been sloppy about removing TESTFN.
625 # So get rid of it no matter what.
627 run_unittest(AutoFileTests
, OtherFileTests
, FileSubclassTests
,
628 FileThreadingTests
, StdoutTests
)
630 if os
.path
.exists(TESTFN
):
633 if __name__
== '__main__':