7 from array
import array
8 from weakref
import proxy
10 from test
import test_support
11 from test
.test_support
import TESTFN
, findfile
, run_unittest
12 from UserList
import UserList
14 class AutoFileTests(unittest
.TestCase
):
15 # file tests for which a test file is automatically set up
18 self
.f
= open(TESTFN
, 'wb')
25 def testWeakRefs(self
):
26 # verify weak references
29 self
.assertEquals(self
.f
.tell(), p
.tell())
32 self
.assertRaises(ReferenceError, getattr, p
, 'tell')
34 def testAttributes(self
):
35 # verify expected attributes exist
37 softspace
= f
.softspace
38 f
.name
# merely shouldn't blow up
42 # verify softspace is writable
43 f
.softspace
= softspace
# merely shouldn't blow up
45 # verify the others aren't
46 for attr
in 'name', 'mode', 'closed':
47 self
.assertRaises((AttributeError, TypeError), setattr, f
, attr
, 'oops')
49 def testReadinto(self
):
53 a
= array('c', 'x'*10)
54 self
.f
= open(TESTFN
, 'rb')
55 n
= self
.f
.readinto(a
)
56 self
.assertEquals('12', a
.tostring()[:n
])
58 def testWritelinesUserList(self
):
59 # verify writelines with instance sequence
60 l
= UserList(['1', '2'])
63 self
.f
= open(TESTFN
, 'rb')
65 self
.assertEquals(buf
, '12')
67 def testWritelinesIntegers(self
):
68 # verify writelines with integers
69 self
.assertRaises(TypeError, self
.f
.writelines
, [1, 2, 3])
71 def testWritelinesIntegersUserList(self
):
72 # verify writelines with integers in UserList
74 self
.assertRaises(TypeError, self
.f
.writelines
, l
)
76 def testWritelinesNonString(self
):
77 # verify writelines with non-string object
81 self
.assertRaises(TypeError, self
.f
.writelines
,
82 [NonString(), NonString()])
86 self
.assertTrue(repr(self
.f
).startswith("<open file '" + TESTFN
))
90 self
.assertEquals(f
.name
, TESTFN
)
91 self
.assertTrue(not f
.isatty())
92 self
.assertTrue(not f
.closed
)
94 self
.assertRaises(TypeError, f
.readinto
, "")
96 self
.assertTrue(f
.closed
)
98 def testMethods(self
):
99 methods
= ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
102 if sys
.platform
.startswith('atheos'):
103 methods
.remove('truncate')
105 # __exit__ should close the file
106 self
.f
.__exit
__(None, None, None)
107 self
.assertTrue(self
.f
.closed
)
109 for methodname
in methods
:
110 method
= getattr(self
.f
, methodname
)
111 # should raise on closed file
112 self
.assertRaises(ValueError, method
)
113 self
.assertRaises(ValueError, self
.f
.writelines
, [])
115 # file is closed, __exit__ shouldn't do anything
116 self
.assertEquals(self
.f
.__exit
__(None, None, None), None)
117 # it must also return None if an exception was given
121 self
.assertEquals(self
.f
.__exit
__(*sys
.exc_info()), None)
123 def testReadWhenWriting(self
):
124 self
.assertRaises(IOError, self
.f
.read
)
126 class OtherFileTests(unittest
.TestCase
):
128 def testOpenDir(self
):
129 this_dir
= os
.path
.dirname(__file__
)
130 for mode
in (None, "w"):
133 f
= open(this_dir
, mode
)
137 self
.assertEqual(e
.filename
, this_dir
)
139 self
.fail("opening a directory didn't raise an IOError")
141 def testModeStrings(self
):
142 # check invalid mode strings
143 for mode
in ("", "aU", "wU+"):
145 f
= open(TESTFN
, mode
)
150 self
.fail('%r is an invalid file mode' % mode
)
152 # Some invalid modes fail on Windows, but pass on Unix
153 # Issue3965: avoid a crash on Windows when filename is unicode
154 for name
in (TESTFN
, unicode(TESTFN
), unicode(TESTFN
+ '\t')):
157 except (IOError, ValueError):
163 # This causes the interpreter to exit on OSF1 v5.1.
164 if sys
.platform
!= 'osf1V5':
165 self
.assertRaises(IOError, sys
.stdin
.seek
, -1)
167 print >>sys
.__stdout
__, (
168 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
170 self
.assertRaises(IOError, sys
.stdin
.truncate
)
172 def testUnicodeOpen(self
):
173 # verify repr works for unicode too
174 f
= open(unicode(TESTFN
), "w")
175 self
.assertTrue(repr(f
).startswith("<open file u'" + TESTFN
))
179 def testBadModeArgument(self
):
180 # verify that we get a sensible error message for bad mode argument
183 f
= open(TESTFN
, bad_mode
)
184 except ValueError, msg
:
187 if s
.find(TESTFN
) != -1 or s
.find(bad_mode
) == -1:
188 self
.fail("bad error message for invalid mode: %s" % s
)
189 # if msg[0] == 0, we're probably on Windows where there may be
190 # no obvious way to discover why open() failed.
193 self
.fail("no error for invalid mode: %s" % bad_mode
)
195 def testSetBufferSize(self
):
196 # make sure that explicitly setting the buffer size doesn't cause
197 # misbehaviour especially with repeated close() calls
198 for s
in (-1, 0, 1, 512):
200 f
= open(TESTFN
, 'w', s
)
204 f
= open(TESTFN
, 'r', s
)
209 self
.fail('error setting buffer size %d: %s' % (s
, str(msg
)))
210 self
.assertEquals(d
, s
)
212 def testTruncateOnWindows(self
):
216 # SF bug <http://www.python.org/sf/801631>
217 # "file.truncate fault on windows"
218 f
= open(TESTFN
, 'wb')
219 f
.write('12345678901') # 11 bytes
222 f
= open(TESTFN
,'rb+')
225 self
.fail("Read on file opened for update failed %r" % data
)
227 self
.fail("File pos after read wrong %d" % f
.tell())
231 self
.fail("File pos after ftruncate wrong %d" % f
.tell())
234 size
= os
.path
.getsize(TESTFN
)
236 self
.fail("File size after ftruncate wrong %d" % size
)
243 def testIteration(self
):
244 # Test the complex interaction when mixing file-iteration and the
245 # various read* methods. Ostensibly, the mixture could just be tested
246 # to work when it should work according to the Python language,
247 # instead of fail when it should fail according to the current CPython
248 # implementation. People don't always program Python the way they
249 # should, though, and the implemenation might change in subtle ways,
250 # so we explicitly test for errors, too; the test will just have to
251 # be updated when the implementation changes.
254 assert not dataoffset
% len(filler
), \
255 "dataoffset must be multiple of len(filler)"
256 nchunks
= dataoffset
// len(filler
)
258 "spam, spam and eggs\n",
259 "eggs, spam, ham and spam\n",
260 "saussages, spam, spam and eggs\n",
261 "spam, ham, spam and eggs\n",
262 "spam, spam, spam, spam, spam, ham, spam\n",
263 "wonderful spaaaaaam.\n"
265 methods
= [("readline", ()), ("read", ()), ("readlines", ()),
266 ("readinto", (array("c", " "*100),))]
269 # Prepare the testfile
270 bag
= open(TESTFN
, "w")
271 bag
.write(filler
* nchunks
)
272 bag
.writelines(testlines
)
274 # Test for appropriate errors mixing read* and iteration
275 for methodname
, args
in methods
:
277 if f
.next() != filler
:
278 self
.fail
, "Broken testfile"
279 meth
= getattr(f
, methodname
)
285 self
.fail("%s%r after next() didn't raise ValueError" %
289 # Test to see if harmless (by accident) mixing of read* and
290 # iteration still works. This depends on the size of the internal
291 # iteration buffer (currently 8192,) but we can test it in a
292 # flexible manner. Each line in the bag o' ham is 4 bytes
293 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
294 # exactly on the buffer boundary for any power-of-2 buffersize
295 # between 4 and 16384 (inclusive).
297 for i
in range(nchunks
):
299 testline
= testlines
.pop(0)
303 self
.fail("readline() after next() with supposedly empty "
304 "iteration-buffer failed anyway")
306 self
.fail("readline() after next() with empty buffer "
307 "failed. Got %r, expected %r" % (line
, testline
))
308 testline
= testlines
.pop(0)
309 buf
= array("c", "\x00" * len(testline
))
313 self
.fail("readinto() after next() with supposedly empty "
314 "iteration-buffer failed anyway")
315 line
= buf
.tostring()
317 self
.fail("readinto() after next() with empty buffer "
318 "failed. Got %r, expected %r" % (line
, testline
))
320 testline
= testlines
.pop(0)
322 line
= f
.read(len(testline
))
324 self
.fail("read() after next() with supposedly empty "
325 "iteration-buffer failed anyway")
327 self
.fail("read() after next() with empty buffer "
328 "failed. Got %r, expected %r" % (line
, testline
))
330 lines
= f
.readlines()
332 self
.fail("readlines() after next() with supposedly empty "
333 "iteration-buffer failed anyway")
334 if lines
!= testlines
:
335 self
.fail("readlines() after next() with empty buffer "
336 "failed. Got %r, expected %r" % (line
, testline
))
337 # Reading after iteration hit EOF shouldn't hurt either
348 self
.fail("read* failed after next() consumed file")
354 class FileSubclassTests(unittest
.TestCase
):
357 # test that exiting with context calls subclass' close
359 def __init__(self
, *args
):
360 self
.subclass_closed
= False
361 file.__init
__(self
, *args
)
363 self
.subclass_closed
= True
366 with
C(TESTFN
, 'w') as f
:
368 self
.assertTrue(f
.subclass_closed
)
371 class FileThreadingTests(unittest
.TestCase
):
372 # These tests check the ability to call various methods of file objects
373 # (including close()) concurrently without crashing the Python interpreter.
374 # See #815646, #595601
377 self
._threads
= test_support
.threading_setup()
379 self
.filename
= TESTFN
380 with
open(self
.filename
, "w") as f
:
381 f
.write("\n".join("0123456789"))
382 self
._count
_lock
= threading
.Lock()
384 self
.close_success_count
= 0
390 except (EnvironmentError, ValueError):
393 os
.remove(self
.filename
)
394 except EnvironmentError:
396 test_support
.threading_cleanup(*self
._threads
)
398 def _create_file(self
):
399 self
.f
= open(self
.filename
, "w+")
401 def _close_file(self
):
402 with self
._count
_lock
:
403 self
.close_count
+= 1
405 with self
._count
_lock
:
406 self
.close_success_count
+= 1
408 def _close_and_reopen_file(self
):
410 # if close raises an exception thats fine, self.f remains valid so
411 # we don't need to reopen.
414 def _run_workers(self
, func
, nb_workers
, duration
=0.2):
415 with self
._count
_lock
:
417 self
.close_success_count
= 0
418 self
.do_continue
= True
421 for i
in range(nb_workers
):
422 t
= threading
.Thread(target
=func
)
425 for _
in xrange(100):
426 time
.sleep(duration
/100)
427 with self
._count
_lock
:
428 if self
.close_count
-self
.close_success_count
> nb_workers
+1:
429 if test_support
.verbose
:
434 self
.do_continue
= False
438 def _test_close_open_io(self
, io_func
, nb_workers
=5):
441 funcs
= itertools
.cycle((
443 lambda: self
._close
_and
_reopen
_file
(),
446 if not self
.do_continue
:
450 except (IOError, ValueError):
452 self
._run
_workers
(worker
, nb_workers
)
453 if test_support
.verbose
:
454 # Useful verbose statistics when tuning this test to take
455 # less time to run but still ensuring that its still useful.
457 # the percent of close calls that raised an error
458 percent
= 100. - 100.*self
.close_success_count
/self
.close_count
459 print self
.close_count
, ('%.4f ' % percent
),
461 def test_close_open(self
):
464 self
._test
_close
_open
_io
(io_func
)
466 def test_close_open_flush(self
):
469 self
._test
_close
_open
_io
(io_func
)
471 def test_close_open_iter(self
):
474 self
._test
_close
_open
_io
(io_func
)
476 def test_close_open_isatty(self
):
479 self
._test
_close
_open
_io
(io_func
)
481 def test_close_open_print(self
):
484 self
._test
_close
_open
_io
(io_func
)
486 def test_close_open_read(self
):
489 self
._test
_close
_open
_io
(io_func
)
491 def test_close_open_readinto(self
):
493 a
= array('c', 'xxxxx')
495 self
._test
_close
_open
_io
(io_func
)
497 def test_close_open_readline(self
):
500 self
._test
_close
_open
_io
(io_func
)
502 def test_close_open_readlines(self
):
505 self
._test
_close
_open
_io
(io_func
)
507 def test_close_open_seek(self
):
510 self
._test
_close
_open
_io
(io_func
)
512 def test_close_open_tell(self
):
515 self
._test
_close
_open
_io
(io_func
)
517 def test_close_open_truncate(self
):
520 self
._test
_close
_open
_io
(io_func
)
522 def test_close_open_write(self
):
525 self
._test
_close
_open
_io
(io_func
)
527 def test_close_open_writelines(self
):
529 self
.f
.writelines('')
530 self
._test
_close
_open
_io
(io_func
)
533 class StdoutTests(unittest
.TestCase
):
535 def test_move_stdout_on_write(self
):
536 # Issue 3242: sys.stdout can be replaced (and freed) during a
537 # print statement; prevent a segfault in this case
538 save_stdout
= sys
.stdout
541 def write(self
, data
):
543 sys
.stdout
= save_stdout
549 sys
.stdout
= save_stdout
551 def test_del_stdout_before_print(self
):
552 # Issue 4597: 'print' with no argument wasn't reporting when
553 # sys.stdout was deleted.
554 save_stdout
= sys
.stdout
558 except RuntimeError as e
:
559 self
.assertEquals(str(e
), "lost sys.stdout")
561 self
.fail("Expected RuntimeError")
563 sys
.stdout
= save_stdout
567 # Historically, these tests have been sloppy about removing TESTFN.
568 # So get rid of it no matter what.
570 run_unittest(AutoFileTests
, OtherFileTests
, FileSubclassTests
,
571 FileThreadingTests
, StdoutTests
)
573 if os
.path
.exists(TESTFN
):
576 if __name__
== '__main__':