move sections
[python/dscho.git] / Lib / test / test_mmap.py
blob14dd27801585c1a70bc7906d84a9b5c29a530b79
1 from test.test_support import TESTFN, run_unittest, import_module
2 import unittest
3 import os, re, itertools
5 mmap = import_module('mmap')
7 PAGESIZE = mmap.PAGESIZE
9 class MmapTests(unittest.TestCase):
11 def setUp(self):
12 if os.path.exists(TESTFN):
13 os.unlink(TESTFN)
15 def tearDown(self):
16 try:
17 os.unlink(TESTFN)
18 except OSError:
19 pass
21 def test_basic(self):
22 # Test mmap module on Unix systems and Windows
24 # Create a file to be mmap'ed.
25 f = open(TESTFN, 'w+')
26 try:
27 # Write 2 pages worth of data to the file
28 f.write('\0'* PAGESIZE)
29 f.write('foo')
30 f.write('\0'* (PAGESIZE-3) )
31 f.flush()
32 m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
33 f.close()
35 # Simple sanity checks
37 tp = str(type(m)) # SF bug 128713: segfaulted on Linux
38 self.assertEqual(m.find('foo'), PAGESIZE)
40 self.assertEqual(len(m), 2*PAGESIZE)
42 self.assertEqual(m[0], '\0')
43 self.assertEqual(m[0:3], '\0\0\0')
45 # Shouldn't crash on boundary (Issue #5292)
46 self.assertRaises(IndexError, m.__getitem__, len(m))
47 self.assertRaises(IndexError, m.__setitem__, len(m), '\0')
49 # Modify the file's content
50 m[0] = '3'
51 m[PAGESIZE +3: PAGESIZE +3+3] = 'bar'
53 # Check that the modification worked
54 self.assertEqual(m[0], '3')
55 self.assertEqual(m[0:3], '3\0\0')
56 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], '\0foobar\0')
58 m.flush()
60 # Test doing a regular expression match in an mmap'ed file
61 match = re.search('[A-Za-z]+', m)
62 if match is None:
63 self.fail('regex match on mmap failed!')
64 else:
65 start, end = match.span(0)
66 length = end - start
68 self.assertEqual(start, PAGESIZE)
69 self.assertEqual(end, PAGESIZE + 6)
71 # test seeking around (try to overflow the seek implementation)
72 m.seek(0,0)
73 self.assertEqual(m.tell(), 0)
74 m.seek(42,1)
75 self.assertEqual(m.tell(), 42)
76 m.seek(0,2)
77 self.assertEqual(m.tell(), len(m))
79 # Try to seek to negative position...
80 self.assertRaises(ValueError, m.seek, -1)
82 # Try to seek beyond end of mmap...
83 self.assertRaises(ValueError, m.seek, 1, 2)
85 # Try to seek to negative position...
86 self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
88 # Try resizing map
89 try:
90 m.resize(512)
91 except SystemError:
92 # resize() not supported
93 # No messages are printed, since the output of this test suite
94 # would then be different across platforms.
95 pass
96 else:
97 # resize() is supported
98 self.assertEqual(len(m), 512)
99 # Check that we can no longer seek beyond the new size.
100 self.assertRaises(ValueError, m.seek, 513, 0)
102 # Check that the underlying file is truncated too
103 # (bug #728515)
104 f = open(TESTFN)
105 f.seek(0, 2)
106 self.assertEqual(f.tell(), 512)
107 f.close()
108 self.assertEqual(m.size(), 512)
110 m.close()
112 finally:
113 try:
114 f.close()
115 except OSError:
116 pass
118 def test_access_parameter(self):
119 # Test for "access" keyword parameter
120 mapsize = 10
121 open(TESTFN, "wb").write("a"*mapsize)
122 f = open(TESTFN, "rb")
123 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
124 self.assertEqual(m[:], 'a'*mapsize, "Readonly memory map data incorrect.")
126 # Ensuring that readonly mmap can't be slice assigned
127 try:
128 m[:] = 'b'*mapsize
129 except TypeError:
130 pass
131 else:
132 self.fail("Able to write to readonly memory map")
134 # Ensuring that readonly mmap can't be item assigned
135 try:
136 m[0] = 'b'
137 except TypeError:
138 pass
139 else:
140 self.fail("Able to write to readonly memory map")
142 # Ensuring that readonly mmap can't be write() to
143 try:
144 m.seek(0,0)
145 m.write('abc')
146 except TypeError:
147 pass
148 else:
149 self.fail("Able to write to readonly memory map")
151 # Ensuring that readonly mmap can't be write_byte() to
152 try:
153 m.seek(0,0)
154 m.write_byte('d')
155 except TypeError:
156 pass
157 else:
158 self.fail("Able to write to readonly memory map")
160 # Ensuring that readonly mmap can't be resized
161 try:
162 m.resize(2*mapsize)
163 except SystemError: # resize is not universally supported
164 pass
165 except TypeError:
166 pass
167 else:
168 self.fail("Able to resize readonly memory map")
169 f.close()
170 del m, f
171 self.assertEqual(open(TESTFN, "rb").read(), 'a'*mapsize,
172 "Readonly memory map data file was modified")
174 # Opening mmap with size too big
175 import sys
176 f = open(TESTFN, "r+b")
177 try:
178 m = mmap.mmap(f.fileno(), mapsize+1)
179 except ValueError:
180 # we do not expect a ValueError on Windows
181 # CAUTION: This also changes the size of the file on disk, and
182 # later tests assume that the length hasn't changed. We need to
183 # repair that.
184 if sys.platform.startswith('win'):
185 self.fail("Opening mmap with size+1 should work on Windows.")
186 else:
187 # we expect a ValueError on Unix, but not on Windows
188 if not sys.platform.startswith('win'):
189 self.fail("Opening mmap with size+1 should raise ValueError.")
190 m.close()
191 f.close()
192 if sys.platform.startswith('win'):
193 # Repair damage from the resizing test.
194 f = open(TESTFN, 'r+b')
195 f.truncate(mapsize)
196 f.close()
198 # Opening mmap with access=ACCESS_WRITE
199 f = open(TESTFN, "r+b")
200 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
201 # Modifying write-through memory map
202 m[:] = 'c'*mapsize
203 self.assertEqual(m[:], 'c'*mapsize,
204 "Write-through memory map memory not updated properly.")
205 m.flush()
206 m.close()
207 f.close()
208 f = open(TESTFN, 'rb')
209 stuff = f.read()
210 f.close()
211 self.assertEqual(stuff, 'c'*mapsize,
212 "Write-through memory map data file not updated properly.")
214 # Opening mmap with access=ACCESS_COPY
215 f = open(TESTFN, "r+b")
216 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
217 # Modifying copy-on-write memory map
218 m[:] = 'd'*mapsize
219 self.assertEqual(m[:], 'd' * mapsize,
220 "Copy-on-write memory map data not written correctly.")
221 m.flush()
222 self.assertEqual(open(TESTFN, "rb").read(), 'c'*mapsize,
223 "Copy-on-write test data file should not be modified.")
224 # Ensuring copy-on-write maps cannot be resized
225 self.assertRaises(TypeError, m.resize, 2*mapsize)
226 f.close()
227 del m, f
229 # Ensuring invalid access parameter raises exception
230 f = open(TESTFN, "r+b")
231 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
232 f.close()
234 if os.name == "posix":
235 # Try incompatible flags, prot and access parameters.
236 f = open(TESTFN, "r+b")
237 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
238 flags=mmap.MAP_PRIVATE,
239 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
240 f.close()
242 def test_bad_file_desc(self):
243 # Try opening a bad file descriptor...
244 self.assertRaises(mmap.error, mmap.mmap, -2, 4096)
246 def test_tougher_find(self):
247 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
248 # searching for data with embedded \0 bytes didn't work.
249 f = open(TESTFN, 'w+')
251 data = 'aabaac\x00deef\x00\x00aa\x00'
252 n = len(data)
253 f.write(data)
254 f.flush()
255 m = mmap.mmap(f.fileno(), n)
256 f.close()
258 for start in range(n+1):
259 for finish in range(start, n+1):
260 slice = data[start : finish]
261 self.assertEqual(m.find(slice), data.find(slice))
262 self.assertEqual(m.find(slice + 'x'), -1)
263 m.close()
265 def test_find_end(self):
266 # test the new 'end' parameter works as expected
267 f = open(TESTFN, 'w+')
268 data = 'one two ones'
269 n = len(data)
270 f.write(data)
271 f.flush()
272 m = mmap.mmap(f.fileno(), n)
273 f.close()
275 self.assertEqual(m.find('one'), 0)
276 self.assertEqual(m.find('ones'), 8)
277 self.assertEqual(m.find('one', 0, -1), 0)
278 self.assertEqual(m.find('one', 1), 8)
279 self.assertEqual(m.find('one', 1, -1), 8)
280 self.assertEqual(m.find('one', 1, -2), -1)
283 def test_rfind(self):
284 # test the new 'end' parameter works as expected
285 f = open(TESTFN, 'w+')
286 data = 'one two ones'
287 n = len(data)
288 f.write(data)
289 f.flush()
290 m = mmap.mmap(f.fileno(), n)
291 f.close()
293 self.assertEqual(m.rfind('one'), 8)
294 self.assertEqual(m.rfind('one '), 0)
295 self.assertEqual(m.rfind('one', 0, -1), 8)
296 self.assertEqual(m.rfind('one', 0, -2), 0)
297 self.assertEqual(m.rfind('one', 1, -1), 8)
298 self.assertEqual(m.rfind('one', 1, -2), -1)
301 def test_double_close(self):
302 # make sure a double close doesn't crash on Solaris (Bug# 665913)
303 f = open(TESTFN, 'w+')
305 f.write(2**16 * 'a') # Arbitrary character
306 f.close()
308 f = open(TESTFN)
309 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
310 mf.close()
311 mf.close()
312 f.close()
314 def test_entire_file(self):
315 # test mapping of entire file by passing 0 for map length
316 if hasattr(os, "stat"):
317 f = open(TESTFN, "w+")
319 f.write(2**16 * 'm') # Arbitrary character
320 f.close()
322 f = open(TESTFN, "rb+")
323 mf = mmap.mmap(f.fileno(), 0)
324 self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
325 self.assertEqual(mf.read(2**16), 2**16 * "m")
326 mf.close()
327 f.close()
329 def test_move(self):
330 # make move works everywhere (64-bit format problem earlier)
331 f = open(TESTFN, 'w+')
333 f.write("ABCDEabcde") # Arbitrary character
334 f.flush()
336 mf = mmap.mmap(f.fileno(), 10)
337 mf.move(5, 0, 5)
338 self.assertEqual(mf[:], "ABCDEABCDE", "Map move should have duplicated front 5")
339 mf.close()
340 f.close()
342 # more excessive test
343 data = "0123456789"
344 for dest in range(len(data)):
345 for src in range(len(data)):
346 for count in range(len(data) - max(dest, src)):
347 expected = data[:dest] + data[src:src+count] + data[dest+count:]
348 m = mmap.mmap(-1, len(data))
349 m[:] = data
350 m.move(dest, src, count)
351 self.assertEqual(m[:], expected)
352 m.close()
354 # segfault test (Issue 5387)
355 m = mmap.mmap(-1, 100)
356 offsets = [-100, -1, 0, 1, 100]
357 for source, dest, size in itertools.product(offsets, offsets, offsets):
358 try:
359 m.move(source, dest, size)
360 except ValueError:
361 pass
363 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
364 (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
365 for source, dest, size in offsets:
366 self.assertRaises(ValueError, m.move, source, dest, size)
368 m.close()
370 m = mmap.mmap(-1, 1) # single byte
371 self.assertRaises(ValueError, m.move, 0, 0, 2)
372 self.assertRaises(ValueError, m.move, 1, 0, 1)
373 self.assertRaises(ValueError, m.move, 0, 1, 1)
374 m.move(0, 0, 1)
375 m.move(0, 0, 0)
378 def test_anonymous(self):
379 # anonymous mmap.mmap(-1, PAGE)
380 m = mmap.mmap(-1, PAGESIZE)
381 for x in xrange(PAGESIZE):
382 self.assertEqual(m[x], '\0', "anonymously mmap'ed contents should be zero")
384 for x in xrange(PAGESIZE):
385 m[x] = ch = chr(x & 255)
386 self.assertEqual(m[x], ch)
388 def test_extended_getslice(self):
389 # Test extended slicing by comparing with list slicing.
390 s = "".join(chr(c) for c in reversed(range(256)))
391 m = mmap.mmap(-1, len(s))
392 m[:] = s
393 self.assertEqual(m[:], s)
394 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
395 for start in indices:
396 for stop in indices:
397 # Skip step 0 (invalid)
398 for step in indices[1:]:
399 self.assertEqual(m[start:stop:step],
400 s[start:stop:step])
402 def test_extended_set_del_slice(self):
403 # Test extended slicing by comparing with list slicing.
404 s = "".join(chr(c) for c in reversed(range(256)))
405 m = mmap.mmap(-1, len(s))
406 indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
407 for start in indices:
408 for stop in indices:
409 # Skip invalid step 0
410 for step in indices[1:]:
411 m[:] = s
412 self.assertEqual(m[:], s)
413 L = list(s)
414 # Make sure we have a slice of exactly the right length,
415 # but with different data.
416 data = L[start:stop:step]
417 data = "".join(reversed(data))
418 L[start:stop:step] = data
419 m[start:stop:step] = data
420 self.assertEquals(m[:], "".join(L))
422 def make_mmap_file (self, f, halfsize):
423 # Write 2 pages worth of data to the file
424 f.write ('\0' * halfsize)
425 f.write ('foo')
426 f.write ('\0' * (halfsize - 3))
427 f.flush ()
428 return mmap.mmap (f.fileno(), 0)
430 def test_offset (self):
431 f = open (TESTFN, 'w+b')
433 try: # unlink TESTFN no matter what
434 halfsize = mmap.ALLOCATIONGRANULARITY
435 m = self.make_mmap_file (f, halfsize)
436 m.close ()
437 f.close ()
439 mapsize = halfsize * 2
440 # Try invalid offset
441 f = open(TESTFN, "r+b")
442 for offset in [-2, -1, None]:
443 try:
444 m = mmap.mmap(f.fileno(), mapsize, offset=offset)
445 self.assertEqual(0, 1)
446 except (ValueError, TypeError, OverflowError):
447 pass
448 else:
449 self.assertEqual(0, 0)
450 f.close()
452 # Try valid offset, hopefully 8192 works on all OSes
453 f = open(TESTFN, "r+b")
454 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
455 self.assertEqual(m[0:3], 'foo')
456 f.close()
458 # Try resizing map
459 try:
460 m.resize(512)
461 except SystemError:
462 pass
463 else:
464 # resize() is supported
465 self.assertEqual(len(m), 512)
466 # Check that we can no longer seek beyond the new size.
467 self.assertRaises(ValueError, m.seek, 513, 0)
468 # Check that the content is not changed
469 self.assertEqual(m[0:3], 'foo')
471 # Check that the underlying file is truncated too
472 f = open(TESTFN)
473 f.seek(0, 2)
474 self.assertEqual(f.tell(), halfsize + 512)
475 f.close()
476 self.assertEqual(m.size(), halfsize + 512)
478 m.close()
480 finally:
481 f.close()
482 try:
483 os.unlink(TESTFN)
484 except OSError:
485 pass
487 def test_subclass(self):
488 class anon_mmap(mmap.mmap):
489 def __new__(klass, *args, **kwargs):
490 return mmap.mmap.__new__(klass, -1, *args, **kwargs)
491 anon_mmap(PAGESIZE)
493 def test_prot_readonly(self):
494 if not hasattr(mmap, 'PROT_READ'):
495 return
496 mapsize = 10
497 open(TESTFN, "wb").write("a"*mapsize)
498 f = open(TESTFN, "rb")
499 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
500 self.assertRaises(TypeError, m.write, "foo")
501 f.close()
503 def test_error(self):
504 self.assertTrue(issubclass(mmap.error, EnvironmentError))
505 self.assertIn("mmap.error", str(mmap.error))
507 def test_io_methods(self):
508 data = "0123456789"
509 open(TESTFN, "wb").write("x"*len(data))
510 f = open(TESTFN, "r+b")
511 m = mmap.mmap(f.fileno(), len(data))
512 f.close()
513 # Test write_byte()
514 for i in xrange(len(data)):
515 self.assertEquals(m.tell(), i)
516 m.write_byte(data[i])
517 self.assertEquals(m.tell(), i+1)
518 self.assertRaises(ValueError, m.write_byte, "x")
519 self.assertEquals(m[:], data)
520 # Test read_byte()
521 m.seek(0)
522 for i in xrange(len(data)):
523 self.assertEquals(m.tell(), i)
524 self.assertEquals(m.read_byte(), data[i])
525 self.assertEquals(m.tell(), i+1)
526 self.assertRaises(ValueError, m.read_byte)
527 # Test read()
528 m.seek(3)
529 self.assertEquals(m.read(3), "345")
530 self.assertEquals(m.tell(), 6)
531 # Test write()
532 m.seek(3)
533 m.write("bar")
534 self.assertEquals(m.tell(), 6)
535 self.assertEquals(m[:], "012bar6789")
536 m.seek(8)
537 self.assertRaises(ValueError, m.write, "bar")
539 if os.name == 'nt':
540 def test_tagname(self):
541 data1 = "0123456789"
542 data2 = "abcdefghij"
543 assert len(data1) == len(data2)
545 # Test same tag
546 m1 = mmap.mmap(-1, len(data1), tagname="foo")
547 m1[:] = data1
548 m2 = mmap.mmap(-1, len(data2), tagname="foo")
549 m2[:] = data2
550 self.assertEquals(m1[:], data2)
551 self.assertEquals(m2[:], data2)
552 m2.close()
553 m1.close()
555 # Test differnt tag
556 m1 = mmap.mmap(-1, len(data1), tagname="foo")
557 m1[:] = data1
558 m2 = mmap.mmap(-1, len(data2), tagname="boo")
559 m2[:] = data2
560 self.assertEquals(m1[:], data1)
561 self.assertEquals(m2[:], data2)
562 m2.close()
563 m1.close()
565 def test_crasher_on_windows(self):
566 # Should not crash (Issue 1733986)
567 m = mmap.mmap(-1, 1000, tagname="foo")
568 try:
569 mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
570 except:
571 pass
572 m.close()
574 # Should not crash (Issue 5385)
575 open(TESTFN, "wb").write("x"*10)
576 f = open(TESTFN, "r+b")
577 m = mmap.mmap(f.fileno(), 0)
578 f.close()
579 try:
580 m.resize(0) # will raise WindowsError
581 except:
582 pass
583 try:
584 m[:]
585 except:
586 pass
587 m.close()
590 def test_main():
591 run_unittest(MmapTests)
593 if __name__ == '__main__':
594 test_main()