move sections
[python/dscho.git] / Lib / test / test_file2k.py
blobda2ab6745099d7a2965464905153ea0c6a7f5926
1 import sys
2 import os
3 import unittest
4 import itertools
5 import time
6 from array import array
7 from weakref import proxy
8 try:
9 import threading
10 except ImportError:
11 threading = None
13 from test import test_support
14 from test.test_support import TESTFN, run_unittest
15 from UserList import UserList
17 class AutoFileTests(unittest.TestCase):
18 # file tests for which a test file is automatically set up
20 def setUp(self):
21 self.f = open(TESTFN, 'wb')
23 def tearDown(self):
24 if self.f:
25 self.f.close()
26 os.remove(TESTFN)
28 def testWeakRefs(self):
29 # verify weak references
30 p = proxy(self.f)
31 p.write('teststring')
32 self.assertEquals(self.f.tell(), p.tell())
33 self.f.close()
34 self.f = None
35 self.assertRaises(ReferenceError, getattr, p, 'tell')
37 def testAttributes(self):
38 # verify expected attributes exist
39 f = self.f
40 with test_support.check_py3k_warnings():
41 softspace = f.softspace
42 f.name # merely shouldn't blow up
43 f.mode # ditto
44 f.closed # ditto
46 with test_support.check_py3k_warnings():
47 # verify softspace is writable
48 f.softspace = softspace # merely shouldn't blow up
50 # verify the others aren't
51 for attr in 'name', 'mode', 'closed':
52 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
54 def testReadinto(self):
55 # verify readinto
56 self.f.write('12')
57 self.f.close()
58 a = array('c', 'x'*10)
59 self.f = open(TESTFN, 'rb')
60 n = self.f.readinto(a)
61 self.assertEquals('12', a.tostring()[:n])
63 def testWritelinesUserList(self):
64 # verify writelines with instance sequence
65 l = UserList(['1', '2'])
66 self.f.writelines(l)
67 self.f.close()
68 self.f = open(TESTFN, 'rb')
69 buf = self.f.read()
70 self.assertEquals(buf, '12')
72 def testWritelinesIntegers(self):
73 # verify writelines with integers
74 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
76 def testWritelinesIntegersUserList(self):
77 # verify writelines with integers in UserList
78 l = UserList([1,2,3])
79 self.assertRaises(TypeError, self.f.writelines, l)
81 def testWritelinesNonString(self):
82 # verify writelines with non-string object
83 class NonString:
84 pass
86 self.assertRaises(TypeError, self.f.writelines,
87 [NonString(), NonString()])
89 def testRepr(self):
90 # verify repr works
91 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
93 def testErrors(self):
94 self.f.close()
95 self.f = open(TESTFN, 'rb')
96 f = self.f
97 self.assertEquals(f.name, TESTFN)
98 self.assertTrue(not f.isatty())
99 self.assertTrue(not f.closed)
101 self.assertRaises(TypeError, f.readinto, "")
102 f.close()
103 self.assertTrue(f.closed)
105 def testMethods(self):
106 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
107 'readline', 'readlines', 'seek', 'tell', 'truncate',
108 'write', '__iter__']
109 deprecated_methods = ['xreadlines']
110 if sys.platform.startswith('atheos'):
111 methods.remove('truncate')
113 # __exit__ should close the file
114 self.f.__exit__(None, None, None)
115 self.assertTrue(self.f.closed)
117 for methodname in methods:
118 method = getattr(self.f, methodname)
119 # should raise on closed file
120 self.assertRaises(ValueError, method)
121 with test_support.check_py3k_warnings():
122 for methodname in deprecated_methods:
123 method = getattr(self.f, methodname)
124 self.assertRaises(ValueError, method)
125 self.assertRaises(ValueError, self.f.writelines, [])
127 # file is closed, __exit__ shouldn't do anything
128 self.assertEquals(self.f.__exit__(None, None, None), None)
129 # it must also return None if an exception was given
130 try:
131 1 // 0
132 except:
133 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
135 def testReadWhenWriting(self):
136 self.assertRaises(IOError, self.f.read)
138 def testIssue5677(self):
139 # Remark: Do not perform more than one test per open file,
140 # since that does NOT catch the readline error on Windows.
141 data = 'xxx'
142 for mode in ['w', 'wb', 'a', 'ab']:
143 for attr in ['read', 'readline', 'readlines']:
144 self.f = open(TESTFN, mode)
145 self.f.write(data)
146 self.assertRaises(IOError, getattr(self.f, attr))
147 self.f.close()
149 self.f = open(TESTFN, mode)
150 self.f.write(data)
151 self.assertRaises(IOError, lambda: [line for line in self.f])
152 self.f.close()
154 self.f = open(TESTFN, mode)
155 self.f.write(data)
156 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
157 self.f.close()
159 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
160 self.f = open(TESTFN, mode)
161 self.assertRaises(IOError, self.f.write, data)
162 self.f.close()
164 self.f = open(TESTFN, mode)
165 self.assertRaises(IOError, self.f.writelines, [data, data])
166 self.f.close()
168 self.f = open(TESTFN, mode)
169 self.assertRaises(IOError, self.f.truncate)
170 self.f.close()
172 class OtherFileTests(unittest.TestCase):
174 def testOpenDir(self):
175 this_dir = os.path.dirname(__file__)
176 for mode in (None, "w"):
177 try:
178 if mode:
179 f = open(this_dir, mode)
180 else:
181 f = open(this_dir)
182 except IOError as e:
183 self.assertEqual(e.filename, this_dir)
184 else:
185 self.fail("opening a directory didn't raise an IOError")
187 def testModeStrings(self):
188 # check invalid mode strings
189 for mode in ("", "aU", "wU+"):
190 try:
191 f = open(TESTFN, mode)
192 except ValueError:
193 pass
194 else:
195 f.close()
196 self.fail('%r is an invalid file mode' % mode)
198 # Some invalid modes fail on Windows, but pass on Unix
199 # Issue3965: avoid a crash on Windows when filename is unicode
200 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
201 try:
202 f = open(name, "rr")
203 except (IOError, ValueError):
204 pass
205 else:
206 f.close()
208 def testStdin(self):
209 # This causes the interpreter to exit on OSF1 v5.1.
210 if sys.platform != 'osf1V5':
211 self.assertRaises(IOError, sys.stdin.seek, -1)
212 else:
213 print >>sys.__stdout__, (
214 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
215 ' Test manually.')
216 self.assertRaises(IOError, sys.stdin.truncate)
218 def testUnicodeOpen(self):
219 # verify repr works for unicode too
220 f = open(unicode(TESTFN), "w")
221 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
222 f.close()
223 os.unlink(TESTFN)
225 def testBadModeArgument(self):
226 # verify that we get a sensible error message for bad mode argument
227 bad_mode = "qwerty"
228 try:
229 f = open(TESTFN, bad_mode)
230 except ValueError, msg:
231 if msg.args[0] != 0:
232 s = str(msg)
233 if TESTFN in s or bad_mode not in s:
234 self.fail("bad error message for invalid mode: %s" % s)
235 # if msg.args[0] == 0, we're probably on Windows where there may
236 # be no obvious way to discover why open() failed.
237 else:
238 f.close()
239 self.fail("no error for invalid mode: %s" % bad_mode)
241 def testSetBufferSize(self):
242 # make sure that explicitly setting the buffer size doesn't cause
243 # misbehaviour especially with repeated close() calls
244 for s in (-1, 0, 1, 512):
245 try:
246 f = open(TESTFN, 'w', s)
247 f.write(str(s))
248 f.close()
249 f.close()
250 f = open(TESTFN, 'r', s)
251 d = int(f.read())
252 f.close()
253 f.close()
254 except IOError, msg:
255 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
256 self.assertEquals(d, s)
258 def testTruncateOnWindows(self):
259 os.unlink(TESTFN)
261 def bug801631():
262 # SF bug <http://www.python.org/sf/801631>
263 # "file.truncate fault on windows"
264 f = open(TESTFN, 'wb')
265 f.write('12345678901') # 11 bytes
266 f.close()
268 f = open(TESTFN,'rb+')
269 data = f.read(5)
270 if data != '12345':
271 self.fail("Read on file opened for update failed %r" % data)
272 if f.tell() != 5:
273 self.fail("File pos after read wrong %d" % f.tell())
275 f.truncate()
276 if f.tell() != 5:
277 self.fail("File pos after ftruncate wrong %d" % f.tell())
279 f.close()
280 size = os.path.getsize(TESTFN)
281 if size != 5:
282 self.fail("File size after ftruncate wrong %d" % size)
284 try:
285 bug801631()
286 finally:
287 os.unlink(TESTFN)
289 def testIteration(self):
290 # Test the complex interaction when mixing file-iteration and the
291 # various read* methods. Ostensibly, the mixture could just be tested
292 # to work when it should work according to the Python language,
293 # instead of fail when it should fail according to the current CPython
294 # implementation. People don't always program Python the way they
295 # should, though, and the implemenation might change in subtle ways,
296 # so we explicitly test for errors, too; the test will just have to
297 # be updated when the implementation changes.
298 dataoffset = 16384
299 filler = "ham\n"
300 assert not dataoffset % len(filler), \
301 "dataoffset must be multiple of len(filler)"
302 nchunks = dataoffset // len(filler)
303 testlines = [
304 "spam, spam and eggs\n",
305 "eggs, spam, ham and spam\n",
306 "saussages, spam, spam and eggs\n",
307 "spam, ham, spam and eggs\n",
308 "spam, spam, spam, spam, spam, ham, spam\n",
309 "wonderful spaaaaaam.\n"
311 methods = [("readline", ()), ("read", ()), ("readlines", ()),
312 ("readinto", (array("c", " "*100),))]
314 try:
315 # Prepare the testfile
316 bag = open(TESTFN, "w")
317 bag.write(filler * nchunks)
318 bag.writelines(testlines)
319 bag.close()
320 # Test for appropriate errors mixing read* and iteration
321 for methodname, args in methods:
322 f = open(TESTFN)
323 if f.next() != filler:
324 self.fail, "Broken testfile"
325 meth = getattr(f, methodname)
326 try:
327 meth(*args)
328 except ValueError:
329 pass
330 else:
331 self.fail("%s%r after next() didn't raise ValueError" %
332 (methodname, args))
333 f.close()
335 # Test to see if harmless (by accident) mixing of read* and
336 # iteration still works. This depends on the size of the internal
337 # iteration buffer (currently 8192,) but we can test it in a
338 # flexible manner. Each line in the bag o' ham is 4 bytes
339 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
340 # exactly on the buffer boundary for any power-of-2 buffersize
341 # between 4 and 16384 (inclusive).
342 f = open(TESTFN)
343 for i in range(nchunks):
344 f.next()
345 testline = testlines.pop(0)
346 try:
347 line = f.readline()
348 except ValueError:
349 self.fail("readline() after next() with supposedly empty "
350 "iteration-buffer failed anyway")
351 if line != testline:
352 self.fail("readline() after next() with empty buffer "
353 "failed. Got %r, expected %r" % (line, testline))
354 testline = testlines.pop(0)
355 buf = array("c", "\x00" * len(testline))
356 try:
357 f.readinto(buf)
358 except ValueError:
359 self.fail("readinto() after next() with supposedly empty "
360 "iteration-buffer failed anyway")
361 line = buf.tostring()
362 if line != testline:
363 self.fail("readinto() after next() with empty buffer "
364 "failed. Got %r, expected %r" % (line, testline))
366 testline = testlines.pop(0)
367 try:
368 line = f.read(len(testline))
369 except ValueError:
370 self.fail("read() after next() with supposedly empty "
371 "iteration-buffer failed anyway")
372 if line != testline:
373 self.fail("read() after next() with empty buffer "
374 "failed. Got %r, expected %r" % (line, testline))
375 try:
376 lines = f.readlines()
377 except ValueError:
378 self.fail("readlines() after next() with supposedly empty "
379 "iteration-buffer failed anyway")
380 if lines != testlines:
381 self.fail("readlines() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383 # Reading after iteration hit EOF shouldn't hurt either
384 f = open(TESTFN)
385 try:
386 for line in f:
387 pass
388 try:
389 f.readline()
390 f.readinto(buf)
391 f.read()
392 f.readlines()
393 except ValueError:
394 self.fail("read* failed after next() consumed file")
395 finally:
396 f.close()
397 finally:
398 os.unlink(TESTFN)
400 class FileSubclassTests(unittest.TestCase):
402 def testExit(self):
403 # test that exiting with context calls subclass' close
404 class C(file):
405 def __init__(self, *args):
406 self.subclass_closed = False
407 file.__init__(self, *args)
408 def close(self):
409 self.subclass_closed = True
410 file.close(self)
412 with C(TESTFN, 'w') as f:
413 pass
414 self.assertTrue(f.subclass_closed)
417 @unittest.skipUnless(threading, 'Threading required for this test.')
418 class FileThreadingTests(unittest.TestCase):
419 # These tests check the ability to call various methods of file objects
420 # (including close()) concurrently without crashing the Python interpreter.
421 # See #815646, #595601
423 def setUp(self):
424 self._threads = test_support.threading_setup()
425 self.f = None
426 self.filename = TESTFN
427 with open(self.filename, "w") as f:
428 f.write("\n".join("0123456789"))
429 self._count_lock = threading.Lock()
430 self.close_count = 0
431 self.close_success_count = 0
432 self.use_buffering = False
434 def tearDown(self):
435 if self.f:
436 try:
437 self.f.close()
438 except (EnvironmentError, ValueError):
439 pass
440 try:
441 os.remove(self.filename)
442 except EnvironmentError:
443 pass
444 test_support.threading_cleanup(*self._threads)
446 def _create_file(self):
447 if self.use_buffering:
448 self.f = open(self.filename, "w+", buffering=1024*16)
449 else:
450 self.f = open(self.filename, "w+")
452 def _close_file(self):
453 with self._count_lock:
454 self.close_count += 1
455 self.f.close()
456 with self._count_lock:
457 self.close_success_count += 1
459 def _close_and_reopen_file(self):
460 self._close_file()
461 # if close raises an exception thats fine, self.f remains valid so
462 # we don't need to reopen.
463 self._create_file()
465 def _run_workers(self, func, nb_workers, duration=0.2):
466 with self._count_lock:
467 self.close_count = 0
468 self.close_success_count = 0
469 self.do_continue = True
470 threads = []
471 try:
472 for i in range(nb_workers):
473 t = threading.Thread(target=func)
474 t.start()
475 threads.append(t)
476 for _ in xrange(100):
477 time.sleep(duration/100)
478 with self._count_lock:
479 if self.close_count-self.close_success_count > nb_workers+1:
480 if test_support.verbose:
481 print 'Q',
482 break
483 time.sleep(duration)
484 finally:
485 self.do_continue = False
486 for t in threads:
487 t.join()
489 def _test_close_open_io(self, io_func, nb_workers=5):
490 def worker():
491 self._create_file()
492 funcs = itertools.cycle((
493 lambda: io_func(),
494 lambda: self._close_and_reopen_file(),
496 for f in funcs:
497 if not self.do_continue:
498 break
499 try:
501 except (IOError, ValueError):
502 pass
503 self._run_workers(worker, nb_workers)
504 if test_support.verbose:
505 # Useful verbose statistics when tuning this test to take
506 # less time to run but still ensuring that its still useful.
508 # the percent of close calls that raised an error
509 percent = 100. - 100.*self.close_success_count/self.close_count
510 print self.close_count, ('%.4f ' % percent),
512 def test_close_open(self):
513 def io_func():
514 pass
515 self._test_close_open_io(io_func)
517 def test_close_open_flush(self):
518 def io_func():
519 self.f.flush()
520 self._test_close_open_io(io_func)
522 def test_close_open_iter(self):
523 def io_func():
524 list(iter(self.f))
525 self._test_close_open_io(io_func)
527 def test_close_open_isatty(self):
528 def io_func():
529 self.f.isatty()
530 self._test_close_open_io(io_func)
532 def test_close_open_print(self):
533 def io_func():
534 print >> self.f, ''
535 self._test_close_open_io(io_func)
537 def test_close_open_print_buffered(self):
538 self.use_buffering = True
539 def io_func():
540 print >> self.f, ''
541 self._test_close_open_io(io_func)
543 def test_close_open_read(self):
544 def io_func():
545 self.f.read(0)
546 self._test_close_open_io(io_func)
548 def test_close_open_readinto(self):
549 def io_func():
550 a = array('c', 'xxxxx')
551 self.f.readinto(a)
552 self._test_close_open_io(io_func)
554 def test_close_open_readline(self):
555 def io_func():
556 self.f.readline()
557 self._test_close_open_io(io_func)
559 def test_close_open_readlines(self):
560 def io_func():
561 self.f.readlines()
562 self._test_close_open_io(io_func)
564 def test_close_open_seek(self):
565 def io_func():
566 self.f.seek(0, 0)
567 self._test_close_open_io(io_func)
569 def test_close_open_tell(self):
570 def io_func():
571 self.f.tell()
572 self._test_close_open_io(io_func)
574 def test_close_open_truncate(self):
575 def io_func():
576 self.f.truncate()
577 self._test_close_open_io(io_func)
579 def test_close_open_write(self):
580 def io_func():
581 self.f.write('')
582 self._test_close_open_io(io_func)
584 def test_close_open_writelines(self):
585 def io_func():
586 self.f.writelines('')
587 self._test_close_open_io(io_func)
590 class StdoutTests(unittest.TestCase):
592 def test_move_stdout_on_write(self):
593 # Issue 3242: sys.stdout can be replaced (and freed) during a
594 # print statement; prevent a segfault in this case
595 save_stdout = sys.stdout
597 class File:
598 def write(self, data):
599 if '\n' in data:
600 sys.stdout = save_stdout
602 try:
603 sys.stdout = File()
604 print "some text"
605 finally:
606 sys.stdout = save_stdout
608 def test_del_stdout_before_print(self):
609 # Issue 4597: 'print' with no argument wasn't reporting when
610 # sys.stdout was deleted.
611 save_stdout = sys.stdout
612 del sys.stdout
613 try:
614 print
615 except RuntimeError as e:
616 self.assertEquals(str(e), "lost sys.stdout")
617 else:
618 self.fail("Expected RuntimeError")
619 finally:
620 sys.stdout = save_stdout
623 def test_main():
624 # Historically, these tests have been sloppy about removing TESTFN.
625 # So get rid of it no matter what.
626 try:
627 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
628 FileThreadingTests, StdoutTests)
629 finally:
630 if os.path.exists(TESTFN):
631 os.unlink(TESTFN)
633 if __name__ == '__main__':
634 test_main()