Issue #7295: Do not use a hardcoded file name in test_tarfile.
[python.git] / Lib / test / test_file2k.py
blobbf47c6fa4592426d6b31e13d6f86c83e10ead924
1 import sys
2 import os
3 import unittest
4 import itertools
5 import time
6 import threading
7 from array import array
8 from weakref import proxy
10 from test import test_support
11 from test.test_support import TESTFN, findfile, run_unittest
12 from UserList import UserList
14 class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
17 def setUp(self):
18 self.f = open(TESTFN, 'wb')
20 def tearDown(self):
21 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
78 class NonString:
79 pass
81 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
84 def testRepr(self):
85 # verify repr works
86 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assertTrue(not f.isatty())
92 self.assertTrue(not f.closed)
94 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assertTrue(f.closed)
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assertTrue(self.f.closed)
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
123 def testReadWhenWriting(self):
124 self.assertRaises(IOError, self.f.read)
126 class OtherFileTests(unittest.TestCase):
128 def testOpenDir(self):
129 this_dir = os.path.dirname(__file__)
130 for mode in (None, "w"):
131 try:
132 if mode:
133 f = open(this_dir, mode)
134 else:
135 f = open(this_dir)
136 except IOError as e:
137 self.assertEqual(e.filename, this_dir)
138 else:
139 self.fail("opening a directory didn't raise an IOError")
141 def testModeStrings(self):
142 # check invalid mode strings
143 for mode in ("", "aU", "wU+"):
144 try:
145 f = open(TESTFN, mode)
146 except ValueError:
147 pass
148 else:
149 f.close()
150 self.fail('%r is an invalid file mode' % mode)
152 # Some invalid modes fail on Windows, but pass on Unix
153 # Issue3965: avoid a crash on Windows when filename is unicode
154 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
155 try:
156 f = open(name, "rr")
157 except (IOError, ValueError):
158 pass
159 else:
160 f.close()
162 def testStdin(self):
163 # This causes the interpreter to exit on OSF1 v5.1.
164 if sys.platform != 'osf1V5':
165 self.assertRaises(IOError, sys.stdin.seek, -1)
166 else:
167 print >>sys.__stdout__, (
168 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
169 ' Test manually.')
170 self.assertRaises(IOError, sys.stdin.truncate)
172 def testUnicodeOpen(self):
173 # verify repr works for unicode too
174 f = open(unicode(TESTFN), "w")
175 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
176 f.close()
177 os.unlink(TESTFN)
179 def testBadModeArgument(self):
180 # verify that we get a sensible error message for bad mode argument
181 bad_mode = "qwerty"
182 try:
183 f = open(TESTFN, bad_mode)
184 except ValueError, msg:
185 if msg[0] != 0:
186 s = str(msg)
187 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
188 self.fail("bad error message for invalid mode: %s" % s)
189 # if msg[0] == 0, we're probably on Windows where there may be
190 # no obvious way to discover why open() failed.
191 else:
192 f.close()
193 self.fail("no error for invalid mode: %s" % bad_mode)
195 def testSetBufferSize(self):
196 # make sure that explicitly setting the buffer size doesn't cause
197 # misbehaviour especially with repeated close() calls
198 for s in (-1, 0, 1, 512):
199 try:
200 f = open(TESTFN, 'w', s)
201 f.write(str(s))
202 f.close()
203 f.close()
204 f = open(TESTFN, 'r', s)
205 d = int(f.read())
206 f.close()
207 f.close()
208 except IOError, msg:
209 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
210 self.assertEquals(d, s)
212 def testTruncateOnWindows(self):
213 os.unlink(TESTFN)
215 def bug801631():
216 # SF bug <http://www.python.org/sf/801631>
217 # "file.truncate fault on windows"
218 f = open(TESTFN, 'wb')
219 f.write('12345678901') # 11 bytes
220 f.close()
222 f = open(TESTFN,'rb+')
223 data = f.read(5)
224 if data != '12345':
225 self.fail("Read on file opened for update failed %r" % data)
226 if f.tell() != 5:
227 self.fail("File pos after read wrong %d" % f.tell())
229 f.truncate()
230 if f.tell() != 5:
231 self.fail("File pos after ftruncate wrong %d" % f.tell())
233 f.close()
234 size = os.path.getsize(TESTFN)
235 if size != 5:
236 self.fail("File size after ftruncate wrong %d" % size)
238 try:
239 bug801631()
240 finally:
241 os.unlink(TESTFN)
243 def testIteration(self):
244 # Test the complex interaction when mixing file-iteration and the
245 # various read* methods. Ostensibly, the mixture could just be tested
246 # to work when it should work according to the Python language,
247 # instead of fail when it should fail according to the current CPython
248 # implementation. People don't always program Python the way they
249 # should, though, and the implemenation might change in subtle ways,
250 # so we explicitly test for errors, too; the test will just have to
251 # be updated when the implementation changes.
252 dataoffset = 16384
253 filler = "ham\n"
254 assert not dataoffset % len(filler), \
255 "dataoffset must be multiple of len(filler)"
256 nchunks = dataoffset // len(filler)
257 testlines = [
258 "spam, spam and eggs\n",
259 "eggs, spam, ham and spam\n",
260 "saussages, spam, spam and eggs\n",
261 "spam, ham, spam and eggs\n",
262 "spam, spam, spam, spam, spam, ham, spam\n",
263 "wonderful spaaaaaam.\n"
265 methods = [("readline", ()), ("read", ()), ("readlines", ()),
266 ("readinto", (array("c", " "*100),))]
268 try:
269 # Prepare the testfile
270 bag = open(TESTFN, "w")
271 bag.write(filler * nchunks)
272 bag.writelines(testlines)
273 bag.close()
274 # Test for appropriate errors mixing read* and iteration
275 for methodname, args in methods:
276 f = open(TESTFN)
277 if f.next() != filler:
278 self.fail, "Broken testfile"
279 meth = getattr(f, methodname)
280 try:
281 meth(*args)
282 except ValueError:
283 pass
284 else:
285 self.fail("%s%r after next() didn't raise ValueError" %
286 (methodname, args))
287 f.close()
289 # Test to see if harmless (by accident) mixing of read* and
290 # iteration still works. This depends on the size of the internal
291 # iteration buffer (currently 8192,) but we can test it in a
292 # flexible manner. Each line in the bag o' ham is 4 bytes
293 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
294 # exactly on the buffer boundary for any power-of-2 buffersize
295 # between 4 and 16384 (inclusive).
296 f = open(TESTFN)
297 for i in range(nchunks):
298 f.next()
299 testline = testlines.pop(0)
300 try:
301 line = f.readline()
302 except ValueError:
303 self.fail("readline() after next() with supposedly empty "
304 "iteration-buffer failed anyway")
305 if line != testline:
306 self.fail("readline() after next() with empty buffer "
307 "failed. Got %r, expected %r" % (line, testline))
308 testline = testlines.pop(0)
309 buf = array("c", "\x00" * len(testline))
310 try:
311 f.readinto(buf)
312 except ValueError:
313 self.fail("readinto() after next() with supposedly empty "
314 "iteration-buffer failed anyway")
315 line = buf.tostring()
316 if line != testline:
317 self.fail("readinto() after next() with empty buffer "
318 "failed. Got %r, expected %r" % (line, testline))
320 testline = testlines.pop(0)
321 try:
322 line = f.read(len(testline))
323 except ValueError:
324 self.fail("read() after next() with supposedly empty "
325 "iteration-buffer failed anyway")
326 if line != testline:
327 self.fail("read() after next() with empty buffer "
328 "failed. Got %r, expected %r" % (line, testline))
329 try:
330 lines = f.readlines()
331 except ValueError:
332 self.fail("readlines() after next() with supposedly empty "
333 "iteration-buffer failed anyway")
334 if lines != testlines:
335 self.fail("readlines() after next() with empty buffer "
336 "failed. Got %r, expected %r" % (line, testline))
337 # Reading after iteration hit EOF shouldn't hurt either
338 f = open(TESTFN)
339 try:
340 for line in f:
341 pass
342 try:
343 f.readline()
344 f.readinto(buf)
345 f.read()
346 f.readlines()
347 except ValueError:
348 self.fail("read* failed after next() consumed file")
349 finally:
350 f.close()
351 finally:
352 os.unlink(TESTFN)
354 class FileSubclassTests(unittest.TestCase):
356 def testExit(self):
357 # test that exiting with context calls subclass' close
358 class C(file):
359 def __init__(self, *args):
360 self.subclass_closed = False
361 file.__init__(self, *args)
362 def close(self):
363 self.subclass_closed = True
364 file.close(self)
366 with C(TESTFN, 'w') as f:
367 pass
368 self.assertTrue(f.subclass_closed)
371 class FileThreadingTests(unittest.TestCase):
372 # These tests check the ability to call various methods of file objects
373 # (including close()) concurrently without crashing the Python interpreter.
374 # See #815646, #595601
376 def setUp(self):
377 self._threads = test_support.threading_setup()
378 self.f = None
379 self.filename = TESTFN
380 with open(self.filename, "w") as f:
381 f.write("\n".join("0123456789"))
382 self._count_lock = threading.Lock()
383 self.close_count = 0
384 self.close_success_count = 0
386 def tearDown(self):
387 if self.f:
388 try:
389 self.f.close()
390 except (EnvironmentError, ValueError):
391 pass
392 try:
393 os.remove(self.filename)
394 except EnvironmentError:
395 pass
396 test_support.threading_cleanup(*self._threads)
398 def _create_file(self):
399 self.f = open(self.filename, "w+")
401 def _close_file(self):
402 with self._count_lock:
403 self.close_count += 1
404 self.f.close()
405 with self._count_lock:
406 self.close_success_count += 1
408 def _close_and_reopen_file(self):
409 self._close_file()
410 # if close raises an exception thats fine, self.f remains valid so
411 # we don't need to reopen.
412 self._create_file()
414 def _run_workers(self, func, nb_workers, duration=0.2):
415 with self._count_lock:
416 self.close_count = 0
417 self.close_success_count = 0
418 self.do_continue = True
419 threads = []
420 try:
421 for i in range(nb_workers):
422 t = threading.Thread(target=func)
423 t.start()
424 threads.append(t)
425 for _ in xrange(100):
426 time.sleep(duration/100)
427 with self._count_lock:
428 if self.close_count-self.close_success_count > nb_workers+1:
429 if test_support.verbose:
430 print 'Q',
431 break
432 time.sleep(duration)
433 finally:
434 self.do_continue = False
435 for t in threads:
436 t.join()
438 def _test_close_open_io(self, io_func, nb_workers=5):
439 def worker():
440 self._create_file()
441 funcs = itertools.cycle((
442 lambda: io_func(),
443 lambda: self._close_and_reopen_file(),
445 for f in funcs:
446 if not self.do_continue:
447 break
448 try:
450 except (IOError, ValueError):
451 pass
452 self._run_workers(worker, nb_workers)
453 if test_support.verbose:
454 # Useful verbose statistics when tuning this test to take
455 # less time to run but still ensuring that its still useful.
457 # the percent of close calls that raised an error
458 percent = 100. - 100.*self.close_success_count/self.close_count
459 print self.close_count, ('%.4f ' % percent),
461 def test_close_open(self):
462 def io_func():
463 pass
464 self._test_close_open_io(io_func)
466 def test_close_open_flush(self):
467 def io_func():
468 self.f.flush()
469 self._test_close_open_io(io_func)
471 def test_close_open_iter(self):
472 def io_func():
473 list(iter(self.f))
474 self._test_close_open_io(io_func)
476 def test_close_open_isatty(self):
477 def io_func():
478 self.f.isatty()
479 self._test_close_open_io(io_func)
481 def test_close_open_print(self):
482 def io_func():
483 print >> self.f, ''
484 self._test_close_open_io(io_func)
486 def test_close_open_read(self):
487 def io_func():
488 self.f.read(0)
489 self._test_close_open_io(io_func)
491 def test_close_open_readinto(self):
492 def io_func():
493 a = array('c', 'xxxxx')
494 self.f.readinto(a)
495 self._test_close_open_io(io_func)
497 def test_close_open_readline(self):
498 def io_func():
499 self.f.readline()
500 self._test_close_open_io(io_func)
502 def test_close_open_readlines(self):
503 def io_func():
504 self.f.readlines()
505 self._test_close_open_io(io_func)
507 def test_close_open_seek(self):
508 def io_func():
509 self.f.seek(0, 0)
510 self._test_close_open_io(io_func)
512 def test_close_open_tell(self):
513 def io_func():
514 self.f.tell()
515 self._test_close_open_io(io_func)
517 def test_close_open_truncate(self):
518 def io_func():
519 self.f.truncate()
520 self._test_close_open_io(io_func)
522 def test_close_open_write(self):
523 def io_func():
524 self.f.write('')
525 self._test_close_open_io(io_func)
527 def test_close_open_writelines(self):
528 def io_func():
529 self.f.writelines('')
530 self._test_close_open_io(io_func)
533 class StdoutTests(unittest.TestCase):
535 def test_move_stdout_on_write(self):
536 # Issue 3242: sys.stdout can be replaced (and freed) during a
537 # print statement; prevent a segfault in this case
538 save_stdout = sys.stdout
540 class File:
541 def write(self, data):
542 if '\n' in data:
543 sys.stdout = save_stdout
545 try:
546 sys.stdout = File()
547 print "some text"
548 finally:
549 sys.stdout = save_stdout
551 def test_del_stdout_before_print(self):
552 # Issue 4597: 'print' with no argument wasn't reporting when
553 # sys.stdout was deleted.
554 save_stdout = sys.stdout
555 del sys.stdout
556 try:
557 print
558 except RuntimeError as e:
559 self.assertEquals(str(e), "lost sys.stdout")
560 else:
561 self.fail("Expected RuntimeError")
562 finally:
563 sys.stdout = save_stdout
566 def test_main():
567 # Historically, these tests have been sloppy about removing TESTFN.
568 # So get rid of it no matter what.
569 try:
570 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
571 FileThreadingTests, StdoutTests)
572 finally:
573 if os.path.exists(TESTFN):
574 os.unlink(TESTFN)
576 if __name__ == '__main__':
577 test_main()