7 from array
import array
8 from weakref
import proxy
10 from test
import test_support
11 from test
.test_support
import TESTFN
, findfile
, run_unittest
12 from UserList
import UserList
14 class AutoFileTests(unittest
.TestCase
):
15 # file tests for which a test file is automatically set up
18 self
.f
= open(TESTFN
, 'wb')
25 def testWeakRefs(self
):
26 # verify weak references
29 self
.assertEquals(self
.f
.tell(), p
.tell())
32 self
.assertRaises(ReferenceError, getattr, p
, 'tell')
34 def testAttributes(self
):
35 # verify expected attributes exist
37 softspace
= f
.softspace
38 f
.name
# merely shouldn't blow up
42 # verify softspace is writable
43 f
.softspace
= softspace
# merely shouldn't blow up
45 # verify the others aren't
46 for attr
in 'name', 'mode', 'closed':
47 self
.assertRaises((AttributeError, TypeError), setattr, f
, attr
, 'oops')
49 def testReadinto(self
):
53 a
= array('c', 'x'*10)
54 self
.f
= open(TESTFN
, 'rb')
55 n
= self
.f
.readinto(a
)
56 self
.assertEquals('12', a
.tostring()[:n
])
58 def testWritelinesUserList(self
):
59 # verify writelines with instance sequence
60 l
= UserList(['1', '2'])
63 self
.f
= open(TESTFN
, 'rb')
65 self
.assertEquals(buf
, '12')
67 def testWritelinesIntegers(self
):
68 # verify writelines with integers
69 self
.assertRaises(TypeError, self
.f
.writelines
, [1, 2, 3])
71 def testWritelinesIntegersUserList(self
):
72 # verify writelines with integers in UserList
74 self
.assertRaises(TypeError, self
.f
.writelines
, l
)
76 def testWritelinesNonString(self
):
77 # verify writelines with non-string object
81 self
.assertRaises(TypeError, self
.f
.writelines
,
82 [NonString(), NonString()])
86 self
.assert_(repr(self
.f
).startswith("<open file '" + TESTFN
))
90 self
.assertEquals(f
.name
, TESTFN
)
91 self
.assert_(not f
.isatty())
92 self
.assert_(not f
.closed
)
94 self
.assertRaises(TypeError, f
.readinto
, "")
96 self
.assert_(f
.closed
)
98 def testMethods(self
):
99 methods
= ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
102 if sys
.platform
.startswith('atheos'):
103 methods
.remove('truncate')
105 # __exit__ should close the file
106 self
.f
.__exit
__(None, None, None)
107 self
.assert_(self
.f
.closed
)
109 for methodname
in methods
:
110 method
= getattr(self
.f
, methodname
)
111 # should raise on closed file
112 self
.assertRaises(ValueError, method
)
113 self
.assertRaises(ValueError, self
.f
.writelines
, [])
115 # file is closed, __exit__ shouldn't do anything
116 self
.assertEquals(self
.f
.__exit
__(None, None, None), None)
117 # it must also return None if an exception was given
121 self
.assertEquals(self
.f
.__exit
__(*sys
.exc_info()), None)
124 class OtherFileTests(unittest
.TestCase
):
126 def testModeStrings(self
):
127 # check invalid mode strings
128 for mode
in ("", "aU", "wU+"):
130 f
= open(TESTFN
, mode
)
135 self
.fail('%r is an invalid file mode' % mode
)
138 # This causes the interpreter to exit on OSF1 v5.1.
139 if sys
.platform
!= 'osf1V5':
140 self
.assertRaises(IOError, sys
.stdin
.seek
, -1)
142 print >>sys
.__stdout
__, (
143 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
145 self
.assertRaises(IOError, sys
.stdin
.truncate
)
147 def testUnicodeOpen(self
):
148 # verify repr works for unicode too
149 f
= open(unicode(TESTFN
), "w")
150 self
.assert_(repr(f
).startswith("<open file u'" + TESTFN
))
154 def testBadModeArgument(self
):
155 # verify that we get a sensible error message for bad mode argument
158 f
= open(TESTFN
, bad_mode
)
159 except ValueError, msg
:
162 if s
.find(TESTFN
) != -1 or s
.find(bad_mode
) == -1:
163 self
.fail("bad error message for invalid mode: %s" % s
)
164 # if msg[0] == 0, we're probably on Windows where there may be
165 # no obvious way to discover why open() failed.
168 self
.fail("no error for invalid mode: %s" % bad_mode
)
170 def testSetBufferSize(self
):
171 # make sure that explicitly setting the buffer size doesn't cause
172 # misbehaviour especially with repeated close() calls
173 for s
in (-1, 0, 1, 512):
175 f
= open(TESTFN
, 'w', s
)
179 f
= open(TESTFN
, 'r', s
)
184 self
.fail('error setting buffer size %d: %s' % (s
, str(msg
)))
185 self
.assertEquals(d
, s
)
187 def testTruncateOnWindows(self
):
191 # SF bug <http://www.python.org/sf/801631>
192 # "file.truncate fault on windows"
193 f
= open(TESTFN
, 'wb')
194 f
.write('12345678901') # 11 bytes
197 f
= open(TESTFN
,'rb+')
200 self
.fail("Read on file opened for update failed %r" % data
)
202 self
.fail("File pos after read wrong %d" % f
.tell())
206 self
.fail("File pos after ftruncate wrong %d" % f
.tell())
209 size
= os
.path
.getsize(TESTFN
)
211 self
.fail("File size after ftruncate wrong %d" % size
)
218 def testIteration(self
):
219 # Test the complex interaction when mixing file-iteration and the
220 # various read* methods. Ostensibly, the mixture could just be tested
221 # to work when it should work according to the Python language,
222 # instead of fail when it should fail according to the current CPython
223 # implementation. People don't always program Python the way they
224 # should, though, and the implemenation might change in subtle ways,
225 # so we explicitly test for errors, too; the test will just have to
226 # be updated when the implementation changes.
229 assert not dataoffset
% len(filler
), \
230 "dataoffset must be multiple of len(filler)"
231 nchunks
= dataoffset
// len(filler
)
233 "spam, spam and eggs\n",
234 "eggs, spam, ham and spam\n",
235 "saussages, spam, spam and eggs\n",
236 "spam, ham, spam and eggs\n",
237 "spam, spam, spam, spam, spam, ham, spam\n",
238 "wonderful spaaaaaam.\n"
240 methods
= [("readline", ()), ("read", ()), ("readlines", ()),
241 ("readinto", (array("c", " "*100),))]
244 # Prepare the testfile
245 bag
= open(TESTFN
, "w")
246 bag
.write(filler
* nchunks
)
247 bag
.writelines(testlines
)
249 # Test for appropriate errors mixing read* and iteration
250 for methodname
, args
in methods
:
252 if f
.next() != filler
:
253 self
.fail
, "Broken testfile"
254 meth
= getattr(f
, methodname
)
260 self
.fail("%s%r after next() didn't raise ValueError" %
264 # Test to see if harmless (by accident) mixing of read* and
265 # iteration still works. This depends on the size of the internal
266 # iteration buffer (currently 8192,) but we can test it in a
267 # flexible manner. Each line in the bag o' ham is 4 bytes
268 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
269 # exactly on the buffer boundary for any power-of-2 buffersize
270 # between 4 and 16384 (inclusive).
272 for i
in range(nchunks
):
274 testline
= testlines
.pop(0)
278 self
.fail("readline() after next() with supposedly empty "
279 "iteration-buffer failed anyway")
281 self
.fail("readline() after next() with empty buffer "
282 "failed. Got %r, expected %r" % (line
, testline
))
283 testline
= testlines
.pop(0)
284 buf
= array("c", "\x00" * len(testline
))
288 self
.fail("readinto() after next() with supposedly empty "
289 "iteration-buffer failed anyway")
290 line
= buf
.tostring()
292 self
.fail("readinto() after next() with empty buffer "
293 "failed. Got %r, expected %r" % (line
, testline
))
295 testline
= testlines
.pop(0)
297 line
= f
.read(len(testline
))
299 self
.fail("read() after next() with supposedly empty "
300 "iteration-buffer failed anyway")
302 self
.fail("read() after next() with empty buffer "
303 "failed. Got %r, expected %r" % (line
, testline
))
305 lines
= f
.readlines()
307 self
.fail("readlines() after next() with supposedly empty "
308 "iteration-buffer failed anyway")
309 if lines
!= testlines
:
310 self
.fail("readlines() after next() with empty buffer "
311 "failed. Got %r, expected %r" % (line
, testline
))
312 # Reading after iteration hit EOF shouldn't hurt either
323 self
.fail("read* failed after next() consumed file")
329 class FileSubclassTests(unittest
.TestCase
):
332 # test that exiting with context calls subclass' close
334 def __init__(self
, *args
):
335 self
.subclass_closed
= False
336 file.__init
__(self
, *args
)
338 self
.subclass_closed
= True
341 with
C(TESTFN
, 'w') as f
:
343 self
.failUnless(f
.subclass_closed
)
346 class FileThreadingTests(unittest
.TestCase
):
347 # These tests check the ability to call various methods of file objects
348 # (including close()) concurrently without crashing the Python interpreter.
349 # See #815646, #595601
353 self
.filename
= TESTFN
354 with
open(self
.filename
, "w") as f
:
355 f
.write("\n".join("0123456789"))
356 self
._count
_lock
= threading
.Lock()
358 self
.close_success_count
= 0
364 except (EnvironmentError, ValueError):
367 os
.remove(self
.filename
)
368 except EnvironmentError:
371 def _create_file(self
):
372 self
.f
= open(self
.filename
, "w+")
374 def _close_file(self
):
375 with self
._count
_lock
:
376 self
.close_count
+= 1
378 with self
._count
_lock
:
379 self
.close_success_count
+= 1
381 def _close_and_reopen_file(self
):
383 # if close raises an exception thats fine, self.f remains valid so
384 # we don't need to reopen.
387 def _run_workers(self
, func
, nb_workers
, duration
=0.2):
388 with self
._count
_lock
:
390 self
.close_success_count
= 0
391 self
.do_continue
= True
394 for i
in range(nb_workers
):
395 t
= threading
.Thread(target
=func
)
398 for _
in xrange(100):
399 time
.sleep(duration
/100)
400 with self
._count
_lock
:
401 if self
.close_count
-self
.close_success_count
> nb_workers
+1:
402 if test_support
.verbose
:
407 self
.do_continue
= False
411 def _test_close_open_io(self
, io_func
, nb_workers
=5):
414 funcs
= itertools
.cycle((
416 lambda: self
._close
_and
_reopen
_file
(),
419 if not self
.do_continue
:
423 except (IOError, ValueError):
425 self
._run
_workers
(worker
, nb_workers
)
426 if test_support
.verbose
:
427 # Useful verbose statistics when tuning this test to take
428 # less time to run but still ensuring that its still useful.
430 # the percent of close calls that raised an error
431 percent
= 100. - 100.*self
.close_success_count
/self
.close_count
432 print self
.close_count
, ('%.4f ' % percent
),
434 def test_close_open(self
):
437 self
._test
_close
_open
_io
(io_func
)
439 def test_close_open_flush(self
):
442 self
._test
_close
_open
_io
(io_func
)
444 def test_close_open_iter(self
):
447 self
._test
_close
_open
_io
(io_func
)
449 def test_close_open_isatty(self
):
452 self
._test
_close
_open
_io
(io_func
)
454 def test_close_open_print(self
):
457 self
._test
_close
_open
_io
(io_func
)
459 def test_close_open_read(self
):
462 self
._test
_close
_open
_io
(io_func
)
464 def test_close_open_readinto(self
):
466 a
= array('c', 'xxxxx')
468 self
._test
_close
_open
_io
(io_func
)
470 def test_close_open_readline(self
):
473 self
._test
_close
_open
_io
(io_func
)
475 def test_close_open_readlines(self
):
478 self
._test
_close
_open
_io
(io_func
)
480 def test_close_open_seek(self
):
483 self
._test
_close
_open
_io
(io_func
)
485 def test_close_open_tell(self
):
488 self
._test
_close
_open
_io
(io_func
)
490 def test_close_open_truncate(self
):
493 self
._test
_close
_open
_io
(io_func
)
495 def test_close_open_write(self
):
498 self
._test
_close
_open
_io
(io_func
)
500 def test_close_open_writelines(self
):
502 self
.f
.writelines('')
503 self
._test
_close
_open
_io
(io_func
)
506 class StdoutTests(unittest
.TestCase
):
508 def test_move_stdout_on_write(self
):
509 # Issue 3242: sys.stdout can be replaced (and freed) during a
510 # print statement; prevent a segfault in this case
511 save_stdout
= sys
.stdout
514 def write(self
, data
):
516 sys
.stdout
= save_stdout
522 sys
.stdout
= save_stdout
526 # Historically, these tests have been sloppy about removing TESTFN.
527 # So get rid of it no matter what.
529 run_unittest(AutoFileTests
, OtherFileTests
, FileSubclassTests
,
530 FileThreadingTests
, StdoutTests
)
532 if os
.path
.exists(TESTFN
):
535 if __name__
== '__main__':