Issue #3389: Allow resolving dotted names for handlers in logging configuration files...
[python.git] / Lib / test / test_file.py
blob7cfaef76fedfaca3e231ea44e493b6814d220676
1 import sys
2 import os
3 import unittest
4 import itertools
5 import time
6 import threading
7 from array import array
8 from weakref import proxy
10 from test import test_support
11 from test.test_support import TESTFN, findfile, run_unittest
12 from UserList import UserList
14 class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
17 def setUp(self):
18 self.f = open(TESTFN, 'wb')
20 def tearDown(self):
21 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
78 class NonString:
79 pass
81 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
94 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assert_(f.closed)
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
124 class OtherFileTests(unittest.TestCase):
126 def testModeStrings(self):
127 # check invalid mode strings
128 for mode in ("", "aU", "wU+"):
129 try:
130 f = open(TESTFN, mode)
131 except ValueError:
132 pass
133 else:
134 f.close()
135 self.fail('%r is an invalid file mode' % mode)
137 def testStdin(self):
138 # This causes the interpreter to exit on OSF1 v5.1.
139 if sys.platform != 'osf1V5':
140 self.assertRaises(IOError, sys.stdin.seek, -1)
141 else:
142 print >>sys.__stdout__, (
143 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
144 ' Test manually.')
145 self.assertRaises(IOError, sys.stdin.truncate)
147 def testUnicodeOpen(self):
148 # verify repr works for unicode too
149 f = open(unicode(TESTFN), "w")
150 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
151 f.close()
152 os.unlink(TESTFN)
154 def testBadModeArgument(self):
155 # verify that we get a sensible error message for bad mode argument
156 bad_mode = "qwerty"
157 try:
158 f = open(TESTFN, bad_mode)
159 except ValueError, msg:
160 if msg[0] != 0:
161 s = str(msg)
162 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
163 self.fail("bad error message for invalid mode: %s" % s)
164 # if msg[0] == 0, we're probably on Windows where there may be
165 # no obvious way to discover why open() failed.
166 else:
167 f.close()
168 self.fail("no error for invalid mode: %s" % bad_mode)
170 def testSetBufferSize(self):
171 # make sure that explicitly setting the buffer size doesn't cause
172 # misbehaviour especially with repeated close() calls
173 for s in (-1, 0, 1, 512):
174 try:
175 f = open(TESTFN, 'w', s)
176 f.write(str(s))
177 f.close()
178 f.close()
179 f = open(TESTFN, 'r', s)
180 d = int(f.read())
181 f.close()
182 f.close()
183 except IOError, msg:
184 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
185 self.assertEquals(d, s)
187 def testTruncateOnWindows(self):
188 os.unlink(TESTFN)
190 def bug801631():
191 # SF bug <http://www.python.org/sf/801631>
192 # "file.truncate fault on windows"
193 f = open(TESTFN, 'wb')
194 f.write('12345678901') # 11 bytes
195 f.close()
197 f = open(TESTFN,'rb+')
198 data = f.read(5)
199 if data != '12345':
200 self.fail("Read on file opened for update failed %r" % data)
201 if f.tell() != 5:
202 self.fail("File pos after read wrong %d" % f.tell())
204 f.truncate()
205 if f.tell() != 5:
206 self.fail("File pos after ftruncate wrong %d" % f.tell())
208 f.close()
209 size = os.path.getsize(TESTFN)
210 if size != 5:
211 self.fail("File size after ftruncate wrong %d" % size)
213 try:
214 bug801631()
215 finally:
216 os.unlink(TESTFN)
218 def testIteration(self):
219 # Test the complex interaction when mixing file-iteration and the
220 # various read* methods. Ostensibly, the mixture could just be tested
221 # to work when it should work according to the Python language,
222 # instead of fail when it should fail according to the current CPython
223 # implementation. People don't always program Python the way they
224 # should, though, and the implemenation might change in subtle ways,
225 # so we explicitly test for errors, too; the test will just have to
226 # be updated when the implementation changes.
227 dataoffset = 16384
228 filler = "ham\n"
229 assert not dataoffset % len(filler), \
230 "dataoffset must be multiple of len(filler)"
231 nchunks = dataoffset // len(filler)
232 testlines = [
233 "spam, spam and eggs\n",
234 "eggs, spam, ham and spam\n",
235 "saussages, spam, spam and eggs\n",
236 "spam, ham, spam and eggs\n",
237 "spam, spam, spam, spam, spam, ham, spam\n",
238 "wonderful spaaaaaam.\n"
240 methods = [("readline", ()), ("read", ()), ("readlines", ()),
241 ("readinto", (array("c", " "*100),))]
243 try:
244 # Prepare the testfile
245 bag = open(TESTFN, "w")
246 bag.write(filler * nchunks)
247 bag.writelines(testlines)
248 bag.close()
249 # Test for appropriate errors mixing read* and iteration
250 for methodname, args in methods:
251 f = open(TESTFN)
252 if f.next() != filler:
253 self.fail, "Broken testfile"
254 meth = getattr(f, methodname)
255 try:
256 meth(*args)
257 except ValueError:
258 pass
259 else:
260 self.fail("%s%r after next() didn't raise ValueError" %
261 (methodname, args))
262 f.close()
264 # Test to see if harmless (by accident) mixing of read* and
265 # iteration still works. This depends on the size of the internal
266 # iteration buffer (currently 8192,) but we can test it in a
267 # flexible manner. Each line in the bag o' ham is 4 bytes
268 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
269 # exactly on the buffer boundary for any power-of-2 buffersize
270 # between 4 and 16384 (inclusive).
271 f = open(TESTFN)
272 for i in range(nchunks):
273 f.next()
274 testline = testlines.pop(0)
275 try:
276 line = f.readline()
277 except ValueError:
278 self.fail("readline() after next() with supposedly empty "
279 "iteration-buffer failed anyway")
280 if line != testline:
281 self.fail("readline() after next() with empty buffer "
282 "failed. Got %r, expected %r" % (line, testline))
283 testline = testlines.pop(0)
284 buf = array("c", "\x00" * len(testline))
285 try:
286 f.readinto(buf)
287 except ValueError:
288 self.fail("readinto() after next() with supposedly empty "
289 "iteration-buffer failed anyway")
290 line = buf.tostring()
291 if line != testline:
292 self.fail("readinto() after next() with empty buffer "
293 "failed. Got %r, expected %r" % (line, testline))
295 testline = testlines.pop(0)
296 try:
297 line = f.read(len(testline))
298 except ValueError:
299 self.fail("read() after next() with supposedly empty "
300 "iteration-buffer failed anyway")
301 if line != testline:
302 self.fail("read() after next() with empty buffer "
303 "failed. Got %r, expected %r" % (line, testline))
304 try:
305 lines = f.readlines()
306 except ValueError:
307 self.fail("readlines() after next() with supposedly empty "
308 "iteration-buffer failed anyway")
309 if lines != testlines:
310 self.fail("readlines() after next() with empty buffer "
311 "failed. Got %r, expected %r" % (line, testline))
312 # Reading after iteration hit EOF shouldn't hurt either
313 f = open(TESTFN)
314 try:
315 for line in f:
316 pass
317 try:
318 f.readline()
319 f.readinto(buf)
320 f.read()
321 f.readlines()
322 except ValueError:
323 self.fail("read* failed after next() consumed file")
324 finally:
325 f.close()
326 finally:
327 os.unlink(TESTFN)
329 class FileSubclassTests(unittest.TestCase):
331 def testExit(self):
332 # test that exiting with context calls subclass' close
333 class C(file):
334 def __init__(self, *args):
335 self.subclass_closed = False
336 file.__init__(self, *args)
337 def close(self):
338 self.subclass_closed = True
339 file.close(self)
341 with C(TESTFN, 'w') as f:
342 pass
343 self.failUnless(f.subclass_closed)
346 class FileThreadingTests(unittest.TestCase):
347 # These tests check the ability to call various methods of file objects
348 # (including close()) concurrently without crashing the Python interpreter.
349 # See #815646, #595601
351 def setUp(self):
352 self.f = None
353 self.filename = TESTFN
354 with open(self.filename, "w") as f:
355 f.write("\n".join("0123456789"))
356 self._count_lock = threading.Lock()
357 self.close_count = 0
358 self.close_success_count = 0
360 def tearDown(self):
361 if self.f:
362 try:
363 self.f.close()
364 except (EnvironmentError, ValueError):
365 pass
366 try:
367 os.remove(self.filename)
368 except EnvironmentError:
369 pass
371 def _create_file(self):
372 self.f = open(self.filename, "w+")
374 def _close_file(self):
375 with self._count_lock:
376 self.close_count += 1
377 self.f.close()
378 with self._count_lock:
379 self.close_success_count += 1
381 def _close_and_reopen_file(self):
382 self._close_file()
383 # if close raises an exception thats fine, self.f remains valid so
384 # we don't need to reopen.
385 self._create_file()
387 def _run_workers(self, func, nb_workers, duration=0.2):
388 with self._count_lock:
389 self.close_count = 0
390 self.close_success_count = 0
391 self.do_continue = True
392 threads = []
393 try:
394 for i in range(nb_workers):
395 t = threading.Thread(target=func)
396 t.start()
397 threads.append(t)
398 for _ in xrange(100):
399 time.sleep(duration/100)
400 with self._count_lock:
401 if self.close_count-self.close_success_count > nb_workers+1:
402 if test_support.verbose:
403 print 'Q',
404 break
405 time.sleep(duration)
406 finally:
407 self.do_continue = False
408 for t in threads:
409 t.join()
411 def _test_close_open_io(self, io_func, nb_workers=5):
412 def worker():
413 self._create_file()
414 funcs = itertools.cycle((
415 lambda: io_func(),
416 lambda: self._close_and_reopen_file(),
418 for f in funcs:
419 if not self.do_continue:
420 break
421 try:
423 except (IOError, ValueError):
424 pass
425 self._run_workers(worker, nb_workers)
426 if test_support.verbose:
427 # Useful verbose statistics when tuning this test to take
428 # less time to run but still ensuring that its still useful.
430 # the percent of close calls that raised an error
431 percent = 100. - 100.*self.close_success_count/self.close_count
432 print self.close_count, ('%.4f ' % percent),
434 def test_close_open(self):
435 def io_func():
436 pass
437 self._test_close_open_io(io_func)
439 def test_close_open_flush(self):
440 def io_func():
441 self.f.flush()
442 self._test_close_open_io(io_func)
444 def test_close_open_iter(self):
445 def io_func():
446 list(iter(self.f))
447 self._test_close_open_io(io_func)
449 def test_close_open_isatty(self):
450 def io_func():
451 self.f.isatty()
452 self._test_close_open_io(io_func)
454 def test_close_open_print(self):
455 def io_func():
456 print >> self.f, ''
457 self._test_close_open_io(io_func)
459 def test_close_open_read(self):
460 def io_func():
461 self.f.read(0)
462 self._test_close_open_io(io_func)
464 def test_close_open_readinto(self):
465 def io_func():
466 a = array('c', 'xxxxx')
467 self.f.readinto(a)
468 self._test_close_open_io(io_func)
470 def test_close_open_readline(self):
471 def io_func():
472 self.f.readline()
473 self._test_close_open_io(io_func)
475 def test_close_open_readlines(self):
476 def io_func():
477 self.f.readlines()
478 self._test_close_open_io(io_func)
480 def test_close_open_seek(self):
481 def io_func():
482 self.f.seek(0, 0)
483 self._test_close_open_io(io_func)
485 def test_close_open_tell(self):
486 def io_func():
487 self.f.tell()
488 self._test_close_open_io(io_func)
490 def test_close_open_truncate(self):
491 def io_func():
492 self.f.truncate()
493 self._test_close_open_io(io_func)
495 def test_close_open_write(self):
496 def io_func():
497 self.f.write('')
498 self._test_close_open_io(io_func)
500 def test_close_open_writelines(self):
501 def io_func():
502 self.f.writelines('')
503 self._test_close_open_io(io_func)
506 class StdoutTests(unittest.TestCase):
508 def test_move_stdout_on_write(self):
509 # Issue 3242: sys.stdout can be replaced (and freed) during a
510 # print statement; prevent a segfault in this case
511 save_stdout = sys.stdout
513 class File:
514 def write(self, data):
515 if '\n' in data:
516 sys.stdout = save_stdout
518 try:
519 sys.stdout = File()
520 print "some text"
521 finally:
522 sys.stdout = save_stdout
525 def test_main():
526 # Historically, these tests have been sloppy about removing TESTFN.
527 # So get rid of it no matter what.
528 try:
529 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
530 FileThreadingTests, StdoutTests)
531 finally:
532 if os.path.exists(TESTFN):
533 os.unlink(TESTFN)
535 if __name__ == '__main__':
536 test_main()