move sections
[python/dscho.git] / Lib / test / test_multiprocessing.py
blob59b3357eefb7a4eb22a867a6ef1bba9e3296b36e
1 #!/usr/bin/env python
4 # Unit tests for the multiprocessing package
7 import unittest
8 import Queue
9 import time
10 import sys
11 import os
12 import gc
13 import signal
14 import array
15 import socket
16 import random
17 import logging
18 from test import test_support
19 from StringIO import StringIO
20 _multiprocessing = test_support.import_module('_multiprocessing')
21 # import threading after _multiprocessing to raise a more revelant error
22 # message: "No module named _multiprocessing". _multiprocessing is not compiled
23 # without thread support.
24 import threading
26 # Work around broken sem_open implementations
27 test_support.import_module('multiprocessing.synchronize')
29 import multiprocessing.dummy
30 import multiprocessing.connection
31 import multiprocessing.managers
32 import multiprocessing.heap
33 import multiprocessing.pool
35 from multiprocessing import util
41 latin = str
44 # Constants
47 LOG_LEVEL = util.SUBWARNING
48 #LOG_LEVEL = logging.DEBUG
50 DELTA = 0.1
51 CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55 if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57 else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60 HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
63 WIN32 = (sys.platform == "win32")
66 # Some tests require ctypes
69 try:
70 from ctypes import Structure, c_int, c_double
71 except ImportError:
72 Structure = object
73 c_int = c_double = None
75 try:
76 from ctypes import Value
77 except ImportError:
78 Value = None
80 try:
81 from ctypes import copy as ctypes_copy
82 except ImportError:
83 ctypes_copy = None
86 # Creates a wrapper for a function which records the time it takes to finish
89 class TimingWrapper(object):
91 def __init__(self, func):
92 self.func = func
93 self.elapsed = None
95 def __call__(self, *args, **kwds):
96 t = time.time()
97 try:
98 return self.func(*args, **kwds)
99 finally:
100 self.elapsed = time.time() - t
103 # Base class for test cases
106 class BaseTestCase(object):
108 ALLOWED_TYPES = ('processes', 'manager', 'threads')
110 def assertTimingAlmostEqual(self, a, b):
111 if CHECK_TIMINGS:
112 self.assertAlmostEqual(a, b, 1)
114 def assertReturnsIfImplemented(self, value, func, *args):
115 try:
116 res = func(*args)
117 except NotImplementedError:
118 pass
119 else:
120 return self.assertEqual(value, res)
123 # Return the value of a semaphore
126 def get_value(self):
127 try:
128 return self.get_value()
129 except AttributeError:
130 try:
131 return self._Semaphore__value
132 except AttributeError:
133 try:
134 return self._value
135 except AttributeError:
136 raise NotImplementedError
139 # Testcases
142 class _TestProcess(BaseTestCase):
144 ALLOWED_TYPES = ('processes', 'threads')
146 def test_current(self):
147 if self.TYPE == 'threads':
148 return
150 current = self.current_process()
151 authkey = current.authkey
153 self.assertTrue(current.is_alive())
154 self.assertTrue(not current.daemon)
155 self.assertIsInstance(authkey, bytes)
156 self.assertTrue(len(authkey) > 0)
157 self.assertEqual(current.ident, os.getpid())
158 self.assertEqual(current.exitcode, None)
160 def _test(self, q, *args, **kwds):
161 current = self.current_process()
162 q.put(args)
163 q.put(kwds)
164 q.put(current.name)
165 if self.TYPE != 'threads':
166 q.put(bytes(current.authkey))
167 q.put(current.pid)
169 def test_process(self):
170 q = self.Queue(1)
171 e = self.Event()
172 args = (q, 1, 2)
173 kwargs = {'hello':23, 'bye':2.54}
174 name = 'SomeProcess'
175 p = self.Process(
176 target=self._test, args=args, kwargs=kwargs, name=name
178 p.daemon = True
179 current = self.current_process()
181 if self.TYPE != 'threads':
182 self.assertEquals(p.authkey, current.authkey)
183 self.assertEquals(p.is_alive(), False)
184 self.assertEquals(p.daemon, True)
185 self.assertNotIn(p, self.active_children())
186 self.assertTrue(type(self.active_children()) is list)
187 self.assertEqual(p.exitcode, None)
189 p.start()
191 self.assertEquals(p.exitcode, None)
192 self.assertEquals(p.is_alive(), True)
193 self.assertIn(p, self.active_children())
195 self.assertEquals(q.get(), args[1:])
196 self.assertEquals(q.get(), kwargs)
197 self.assertEquals(q.get(), p.name)
198 if self.TYPE != 'threads':
199 self.assertEquals(q.get(), current.authkey)
200 self.assertEquals(q.get(), p.pid)
202 p.join()
204 self.assertEquals(p.exitcode, 0)
205 self.assertEquals(p.is_alive(), False)
206 self.assertNotIn(p, self.active_children())
208 def _test_terminate(self):
209 time.sleep(1000)
211 def test_terminate(self):
212 if self.TYPE == 'threads':
213 return
215 p = self.Process(target=self._test_terminate)
216 p.daemon = True
217 p.start()
219 self.assertEqual(p.is_alive(), True)
220 self.assertIn(p, self.active_children())
221 self.assertEqual(p.exitcode, None)
223 p.terminate()
225 join = TimingWrapper(p.join)
226 self.assertEqual(join(), None)
227 self.assertTimingAlmostEqual(join.elapsed, 0.0)
229 self.assertEqual(p.is_alive(), False)
230 self.assertNotIn(p, self.active_children())
232 p.join()
234 # XXX sometimes get p.exitcode == 0 on Windows ...
235 #self.assertEqual(p.exitcode, -signal.SIGTERM)
237 def test_cpu_count(self):
238 try:
239 cpus = multiprocessing.cpu_count()
240 except NotImplementedError:
241 cpus = 1
242 self.assertTrue(type(cpus) is int)
243 self.assertTrue(cpus >= 1)
245 def test_active_children(self):
246 self.assertEqual(type(self.active_children()), list)
248 p = self.Process(target=time.sleep, args=(DELTA,))
249 self.assertNotIn(p, self.active_children())
251 p.start()
252 self.assertIn(p, self.active_children())
254 p.join()
255 self.assertNotIn(p, self.active_children())
257 def _test_recursion(self, wconn, id):
258 from multiprocessing import forking
259 wconn.send(id)
260 if len(id) < 2:
261 for i in range(2):
262 p = self.Process(
263 target=self._test_recursion, args=(wconn, id+[i])
265 p.start()
266 p.join()
268 def test_recursion(self):
269 rconn, wconn = self.Pipe(duplex=False)
270 self._test_recursion(wconn, [])
272 time.sleep(DELTA)
273 result = []
274 while rconn.poll():
275 result.append(rconn.recv())
277 expected = [
279 [0],
280 [0, 0],
281 [0, 1],
282 [1],
283 [1, 0],
284 [1, 1]
286 self.assertEqual(result, expected)
292 class _UpperCaser(multiprocessing.Process):
294 def __init__(self):
295 multiprocessing.Process.__init__(self)
296 self.child_conn, self.parent_conn = multiprocessing.Pipe()
298 def run(self):
299 self.parent_conn.close()
300 for s in iter(self.child_conn.recv, None):
301 self.child_conn.send(s.upper())
302 self.child_conn.close()
304 def submit(self, s):
305 assert type(s) is str
306 self.parent_conn.send(s)
307 return self.parent_conn.recv()
309 def stop(self):
310 self.parent_conn.send(None)
311 self.parent_conn.close()
312 self.child_conn.close()
314 class _TestSubclassingProcess(BaseTestCase):
316 ALLOWED_TYPES = ('processes',)
318 def test_subclassing(self):
319 uppercaser = _UpperCaser()
320 uppercaser.start()
321 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
322 self.assertEqual(uppercaser.submit('world'), 'WORLD')
323 uppercaser.stop()
324 uppercaser.join()
330 def queue_empty(q):
331 if hasattr(q, 'empty'):
332 return q.empty()
333 else:
334 return q.qsize() == 0
336 def queue_full(q, maxsize):
337 if hasattr(q, 'full'):
338 return q.full()
339 else:
340 return q.qsize() == maxsize
343 class _TestQueue(BaseTestCase):
346 def _test_put(self, queue, child_can_start, parent_can_continue):
347 child_can_start.wait()
348 for i in range(6):
349 queue.get()
350 parent_can_continue.set()
352 def test_put(self):
353 MAXSIZE = 6
354 queue = self.Queue(maxsize=MAXSIZE)
355 child_can_start = self.Event()
356 parent_can_continue = self.Event()
358 proc = self.Process(
359 target=self._test_put,
360 args=(queue, child_can_start, parent_can_continue)
362 proc.daemon = True
363 proc.start()
365 self.assertEqual(queue_empty(queue), True)
366 self.assertEqual(queue_full(queue, MAXSIZE), False)
368 queue.put(1)
369 queue.put(2, True)
370 queue.put(3, True, None)
371 queue.put(4, False)
372 queue.put(5, False, None)
373 queue.put_nowait(6)
375 # the values may be in buffer but not yet in pipe so sleep a bit
376 time.sleep(DELTA)
378 self.assertEqual(queue_empty(queue), False)
379 self.assertEqual(queue_full(queue, MAXSIZE), True)
381 put = TimingWrapper(queue.put)
382 put_nowait = TimingWrapper(queue.put_nowait)
384 self.assertRaises(Queue.Full, put, 7, False)
385 self.assertTimingAlmostEqual(put.elapsed, 0)
387 self.assertRaises(Queue.Full, put, 7, False, None)
388 self.assertTimingAlmostEqual(put.elapsed, 0)
390 self.assertRaises(Queue.Full, put_nowait, 7)
391 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
393 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
394 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
396 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
397 self.assertTimingAlmostEqual(put.elapsed, 0)
399 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
400 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
402 child_can_start.set()
403 parent_can_continue.wait()
405 self.assertEqual(queue_empty(queue), True)
406 self.assertEqual(queue_full(queue, MAXSIZE), False)
408 proc.join()
410 def _test_get(self, queue, child_can_start, parent_can_continue):
411 child_can_start.wait()
412 #queue.put(1)
413 queue.put(2)
414 queue.put(3)
415 queue.put(4)
416 queue.put(5)
417 parent_can_continue.set()
419 def test_get(self):
420 queue = self.Queue()
421 child_can_start = self.Event()
422 parent_can_continue = self.Event()
424 proc = self.Process(
425 target=self._test_get,
426 args=(queue, child_can_start, parent_can_continue)
428 proc.daemon = True
429 proc.start()
431 self.assertEqual(queue_empty(queue), True)
433 child_can_start.set()
434 parent_can_continue.wait()
436 time.sleep(DELTA)
437 self.assertEqual(queue_empty(queue), False)
439 # Hangs unexpectedly, remove for now
440 #self.assertEqual(queue.get(), 1)
441 self.assertEqual(queue.get(True, None), 2)
442 self.assertEqual(queue.get(True), 3)
443 self.assertEqual(queue.get(timeout=1), 4)
444 self.assertEqual(queue.get_nowait(), 5)
446 self.assertEqual(queue_empty(queue), True)
448 get = TimingWrapper(queue.get)
449 get_nowait = TimingWrapper(queue.get_nowait)
451 self.assertRaises(Queue.Empty, get, False)
452 self.assertTimingAlmostEqual(get.elapsed, 0)
454 self.assertRaises(Queue.Empty, get, False, None)
455 self.assertTimingAlmostEqual(get.elapsed, 0)
457 self.assertRaises(Queue.Empty, get_nowait)
458 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
460 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
461 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
463 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
464 self.assertTimingAlmostEqual(get.elapsed, 0)
466 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
467 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
469 proc.join()
471 def _test_fork(self, queue):
472 for i in range(10, 20):
473 queue.put(i)
474 # note that at this point the items may only be buffered, so the
475 # process cannot shutdown until the feeder thread has finished
476 # pushing items onto the pipe.
478 def test_fork(self):
479 # Old versions of Queue would fail to create a new feeder
480 # thread for a forked process if the original process had its
481 # own feeder thread. This test checks that this no longer
482 # happens.
484 queue = self.Queue()
486 # put items on queue so that main process starts a feeder thread
487 for i in range(10):
488 queue.put(i)
490 # wait to make sure thread starts before we fork a new process
491 time.sleep(DELTA)
493 # fork process
494 p = self.Process(target=self._test_fork, args=(queue,))
495 p.start()
497 # check that all expected items are in the queue
498 for i in range(20):
499 self.assertEqual(queue.get(), i)
500 self.assertRaises(Queue.Empty, queue.get, False)
502 p.join()
504 def test_qsize(self):
505 q = self.Queue()
506 try:
507 self.assertEqual(q.qsize(), 0)
508 except NotImplementedError:
509 return
510 q.put(1)
511 self.assertEqual(q.qsize(), 1)
512 q.put(5)
513 self.assertEqual(q.qsize(), 2)
514 q.get()
515 self.assertEqual(q.qsize(), 1)
516 q.get()
517 self.assertEqual(q.qsize(), 0)
519 def _test_task_done(self, q):
520 for obj in iter(q.get, None):
521 time.sleep(DELTA)
522 q.task_done()
524 def test_task_done(self):
525 queue = self.JoinableQueue()
527 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
528 self.skipTest("requires 'queue.task_done()' method")
530 workers = [self.Process(target=self._test_task_done, args=(queue,))
531 for i in xrange(4)]
533 for p in workers:
534 p.start()
536 for i in xrange(10):
537 queue.put(i)
539 queue.join()
541 for p in workers:
542 queue.put(None)
544 for p in workers:
545 p.join()
551 class _TestLock(BaseTestCase):
553 def test_lock(self):
554 lock = self.Lock()
555 self.assertEqual(lock.acquire(), True)
556 self.assertEqual(lock.acquire(False), False)
557 self.assertEqual(lock.release(), None)
558 self.assertRaises((ValueError, threading.ThreadError), lock.release)
560 def test_rlock(self):
561 lock = self.RLock()
562 self.assertEqual(lock.acquire(), True)
563 self.assertEqual(lock.acquire(), True)
564 self.assertEqual(lock.acquire(), True)
565 self.assertEqual(lock.release(), None)
566 self.assertEqual(lock.release(), None)
567 self.assertEqual(lock.release(), None)
568 self.assertRaises((AssertionError, RuntimeError), lock.release)
570 def test_lock_context(self):
571 with self.Lock():
572 pass
575 class _TestSemaphore(BaseTestCase):
577 def _test_semaphore(self, sem):
578 self.assertReturnsIfImplemented(2, get_value, sem)
579 self.assertEqual(sem.acquire(), True)
580 self.assertReturnsIfImplemented(1, get_value, sem)
581 self.assertEqual(sem.acquire(), True)
582 self.assertReturnsIfImplemented(0, get_value, sem)
583 self.assertEqual(sem.acquire(False), False)
584 self.assertReturnsIfImplemented(0, get_value, sem)
585 self.assertEqual(sem.release(), None)
586 self.assertReturnsIfImplemented(1, get_value, sem)
587 self.assertEqual(sem.release(), None)
588 self.assertReturnsIfImplemented(2, get_value, sem)
590 def test_semaphore(self):
591 sem = self.Semaphore(2)
592 self._test_semaphore(sem)
593 self.assertEqual(sem.release(), None)
594 self.assertReturnsIfImplemented(3, get_value, sem)
595 self.assertEqual(sem.release(), None)
596 self.assertReturnsIfImplemented(4, get_value, sem)
598 def test_bounded_semaphore(self):
599 sem = self.BoundedSemaphore(2)
600 self._test_semaphore(sem)
601 # Currently fails on OS/X
602 #if HAVE_GETVALUE:
603 # self.assertRaises(ValueError, sem.release)
604 # self.assertReturnsIfImplemented(2, get_value, sem)
606 def test_timeout(self):
607 if self.TYPE != 'processes':
608 return
610 sem = self.Semaphore(0)
611 acquire = TimingWrapper(sem.acquire)
613 self.assertEqual(acquire(False), False)
614 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
616 self.assertEqual(acquire(False, None), False)
617 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
619 self.assertEqual(acquire(False, TIMEOUT1), False)
620 self.assertTimingAlmostEqual(acquire.elapsed, 0)
622 self.assertEqual(acquire(True, TIMEOUT2), False)
623 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
625 self.assertEqual(acquire(timeout=TIMEOUT3), False)
626 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
629 class _TestCondition(BaseTestCase):
631 def f(self, cond, sleeping, woken, timeout=None):
632 cond.acquire()
633 sleeping.release()
634 cond.wait(timeout)
635 woken.release()
636 cond.release()
638 def check_invariant(self, cond):
639 # this is only supposed to succeed when there are no sleepers
640 if self.TYPE == 'processes':
641 try:
642 sleepers = (cond._sleeping_count.get_value() -
643 cond._woken_count.get_value())
644 self.assertEqual(sleepers, 0)
645 self.assertEqual(cond._wait_semaphore.get_value(), 0)
646 except NotImplementedError:
647 pass
649 def test_notify(self):
650 cond = self.Condition()
651 sleeping = self.Semaphore(0)
652 woken = self.Semaphore(0)
654 p = self.Process(target=self.f, args=(cond, sleeping, woken))
655 p.daemon = True
656 p.start()
658 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
659 p.daemon = True
660 p.start()
662 # wait for both children to start sleeping
663 sleeping.acquire()
664 sleeping.acquire()
666 # check no process/thread has woken up
667 time.sleep(DELTA)
668 self.assertReturnsIfImplemented(0, get_value, woken)
670 # wake up one process/thread
671 cond.acquire()
672 cond.notify()
673 cond.release()
675 # check one process/thread has woken up
676 time.sleep(DELTA)
677 self.assertReturnsIfImplemented(1, get_value, woken)
679 # wake up another
680 cond.acquire()
681 cond.notify()
682 cond.release()
684 # check other has woken up
685 time.sleep(DELTA)
686 self.assertReturnsIfImplemented(2, get_value, woken)
688 # check state is not mucked up
689 self.check_invariant(cond)
690 p.join()
692 def test_notify_all(self):
693 cond = self.Condition()
694 sleeping = self.Semaphore(0)
695 woken = self.Semaphore(0)
697 # start some threads/processes which will timeout
698 for i in range(3):
699 p = self.Process(target=self.f,
700 args=(cond, sleeping, woken, TIMEOUT1))
701 p.daemon = True
702 p.start()
704 t = threading.Thread(target=self.f,
705 args=(cond, sleeping, woken, TIMEOUT1))
706 t.daemon = True
707 t.start()
709 # wait for them all to sleep
710 for i in xrange(6):
711 sleeping.acquire()
713 # check they have all timed out
714 for i in xrange(6):
715 woken.acquire()
716 self.assertReturnsIfImplemented(0, get_value, woken)
718 # check state is not mucked up
719 self.check_invariant(cond)
721 # start some more threads/processes
722 for i in range(3):
723 p = self.Process(target=self.f, args=(cond, sleeping, woken))
724 p.daemon = True
725 p.start()
727 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
728 t.daemon = True
729 t.start()
731 # wait for them to all sleep
732 for i in xrange(6):
733 sleeping.acquire()
735 # check no process/thread has woken up
736 time.sleep(DELTA)
737 self.assertReturnsIfImplemented(0, get_value, woken)
739 # wake them all up
740 cond.acquire()
741 cond.notify_all()
742 cond.release()
744 # check they have all woken
745 time.sleep(DELTA)
746 self.assertReturnsIfImplemented(6, get_value, woken)
748 # check state is not mucked up
749 self.check_invariant(cond)
751 def test_timeout(self):
752 cond = self.Condition()
753 wait = TimingWrapper(cond.wait)
754 cond.acquire()
755 res = wait(TIMEOUT1)
756 cond.release()
757 self.assertEqual(res, None)
758 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761 class _TestEvent(BaseTestCase):
763 def _test_event(self, event):
764 time.sleep(TIMEOUT2)
765 event.set()
767 def test_event(self):
768 event = self.Event()
769 wait = TimingWrapper(event.wait)
771 # Removed temporaily, due to API shear, this does not
772 # work with threading._Event objects. is_set == isSet
773 self.assertEqual(event.is_set(), False)
775 # Removed, threading.Event.wait() will return the value of the __flag
776 # instead of None. API Shear with the semaphore backed mp.Event
777 self.assertEqual(wait(0.0), False)
778 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
779 self.assertEqual(wait(TIMEOUT1), False)
780 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
782 event.set()
784 # See note above on the API differences
785 self.assertEqual(event.is_set(), True)
786 self.assertEqual(wait(), True)
787 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
788 self.assertEqual(wait(TIMEOUT1), True)
789 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
790 # self.assertEqual(event.is_set(), True)
792 event.clear()
794 #self.assertEqual(event.is_set(), False)
796 self.Process(target=self._test_event, args=(event,)).start()
797 self.assertEqual(wait(), True)
803 class _TestValue(BaseTestCase):
805 ALLOWED_TYPES = ('processes',)
807 codes_values = [
808 ('i', 4343, 24234),
809 ('d', 3.625, -4.25),
810 ('h', -232, 234),
811 ('c', latin('x'), latin('y'))
814 def _test(self, values):
815 for sv, cv in zip(values, self.codes_values):
816 sv.value = cv[2]
819 @unittest.skipIf(c_int is None, "requires _ctypes")
820 def test_value(self, raw=False):
821 if raw:
822 values = [self.RawValue(code, value)
823 for code, value, _ in self.codes_values]
824 else:
825 values = [self.Value(code, value)
826 for code, value, _ in self.codes_values]
828 for sv, cv in zip(values, self.codes_values):
829 self.assertEqual(sv.value, cv[1])
831 proc = self.Process(target=self._test, args=(values,))
832 proc.start()
833 proc.join()
835 for sv, cv in zip(values, self.codes_values):
836 self.assertEqual(sv.value, cv[2])
838 @unittest.skipIf(c_int is None, "requires _ctypes")
839 def test_rawvalue(self):
840 self.test_value(raw=True)
842 @unittest.skipIf(c_int is None, "requires _ctypes")
843 def test_getobj_getlock(self):
844 val1 = self.Value('i', 5)
845 lock1 = val1.get_lock()
846 obj1 = val1.get_obj()
848 val2 = self.Value('i', 5, lock=None)
849 lock2 = val2.get_lock()
850 obj2 = val2.get_obj()
852 lock = self.Lock()
853 val3 = self.Value('i', 5, lock=lock)
854 lock3 = val3.get_lock()
855 obj3 = val3.get_obj()
856 self.assertEqual(lock, lock3)
858 arr4 = self.Value('i', 5, lock=False)
859 self.assertFalse(hasattr(arr4, 'get_lock'))
860 self.assertFalse(hasattr(arr4, 'get_obj'))
862 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
864 arr5 = self.RawValue('i', 5)
865 self.assertFalse(hasattr(arr5, 'get_lock'))
866 self.assertFalse(hasattr(arr5, 'get_obj'))
869 class _TestArray(BaseTestCase):
871 ALLOWED_TYPES = ('processes',)
873 def f(self, seq):
874 for i in range(1, len(seq)):
875 seq[i] += seq[i-1]
877 @unittest.skipIf(c_int is None, "requires _ctypes")
878 def test_array(self, raw=False):
879 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
880 if raw:
881 arr = self.RawArray('i', seq)
882 else:
883 arr = self.Array('i', seq)
885 self.assertEqual(len(arr), len(seq))
886 self.assertEqual(arr[3], seq[3])
887 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
889 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
891 self.assertEqual(list(arr[:]), seq)
893 self.f(seq)
895 p = self.Process(target=self.f, args=(arr,))
896 p.start()
897 p.join()
899 self.assertEqual(list(arr[:]), seq)
901 @unittest.skipIf(c_int is None, "requires _ctypes")
902 def test_rawarray(self):
903 self.test_array(raw=True)
905 @unittest.skipIf(c_int is None, "requires _ctypes")
906 def test_getobj_getlock_obj(self):
907 arr1 = self.Array('i', range(10))
908 lock1 = arr1.get_lock()
909 obj1 = arr1.get_obj()
911 arr2 = self.Array('i', range(10), lock=None)
912 lock2 = arr2.get_lock()
913 obj2 = arr2.get_obj()
915 lock = self.Lock()
916 arr3 = self.Array('i', range(10), lock=lock)
917 lock3 = arr3.get_lock()
918 obj3 = arr3.get_obj()
919 self.assertEqual(lock, lock3)
921 arr4 = self.Array('i', range(10), lock=False)
922 self.assertFalse(hasattr(arr4, 'get_lock'))
923 self.assertFalse(hasattr(arr4, 'get_obj'))
924 self.assertRaises(AttributeError,
925 self.Array, 'i', range(10), lock='notalock')
927 arr5 = self.RawArray('i', range(10))
928 self.assertFalse(hasattr(arr5, 'get_lock'))
929 self.assertFalse(hasattr(arr5, 'get_obj'))
935 class _TestContainers(BaseTestCase):
937 ALLOWED_TYPES = ('manager',)
939 def test_list(self):
940 a = self.list(range(10))
941 self.assertEqual(a[:], range(10))
943 b = self.list()
944 self.assertEqual(b[:], [])
946 b.extend(range(5))
947 self.assertEqual(b[:], range(5))
949 self.assertEqual(b[2], 2)
950 self.assertEqual(b[2:10], [2,3,4])
952 b *= 2
953 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
955 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
957 self.assertEqual(a[:], range(10))
959 d = [a, b]
960 e = self.list(d)
961 self.assertEqual(
962 e[:],
963 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
966 f = self.list([a])
967 a.append('hello')
968 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
970 def test_dict(self):
971 d = self.dict()
972 indices = range(65, 70)
973 for i in indices:
974 d[i] = chr(i)
975 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
976 self.assertEqual(sorted(d.keys()), indices)
977 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
978 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
980 def test_namespace(self):
981 n = self.Namespace()
982 n.name = 'Bob'
983 n.job = 'Builder'
984 n._hidden = 'hidden'
985 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
986 del n.job
987 self.assertEqual(str(n), "Namespace(name='Bob')")
988 self.assertTrue(hasattr(n, 'name'))
989 self.assertTrue(not hasattr(n, 'job'))
995 def sqr(x, wait=0.0):
996 time.sleep(wait)
997 return x*x
998 class _TestPool(BaseTestCase):
1000 def test_apply(self):
1001 papply = self.pool.apply
1002 self.assertEqual(papply(sqr, (5,)), sqr(5))
1003 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1005 def test_map(self):
1006 pmap = self.pool.map
1007 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1008 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1009 map(sqr, range(100)))
1011 def test_map_chunksize(self):
1012 try:
1013 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1014 except multiprocessing.TimeoutError:
1015 self.fail("pool.map_async with chunksize stalled on null list")
1017 def test_async(self):
1018 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1019 get = TimingWrapper(res.get)
1020 self.assertEqual(get(), 49)
1021 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1023 def test_async_timeout(self):
1024 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1025 get = TimingWrapper(res.get)
1026 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1027 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1029 def test_imap(self):
1030 it = self.pool.imap(sqr, range(10))
1031 self.assertEqual(list(it), map(sqr, range(10)))
1033 it = self.pool.imap(sqr, range(10))
1034 for i in range(10):
1035 self.assertEqual(it.next(), i*i)
1036 self.assertRaises(StopIteration, it.next)
1038 it = self.pool.imap(sqr, range(1000), chunksize=100)
1039 for i in range(1000):
1040 self.assertEqual(it.next(), i*i)
1041 self.assertRaises(StopIteration, it.next)
1043 def test_imap_unordered(self):
1044 it = self.pool.imap_unordered(sqr, range(1000))
1045 self.assertEqual(sorted(it), map(sqr, range(1000)))
1047 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1048 self.assertEqual(sorted(it), map(sqr, range(1000)))
1050 def test_make_pool(self):
1051 p = multiprocessing.Pool(3)
1052 self.assertEqual(3, len(p._pool))
1053 p.close()
1054 p.join()
1056 def test_terminate(self):
1057 if self.TYPE == 'manager':
1058 # On Unix a forked process increfs each shared object to
1059 # which its parent process held a reference. If the
1060 # forked process gets terminated then there is likely to
1061 # be a reference leak. So to prevent
1062 # _TestZZZNumberOfObjects from failing we skip this test
1063 # when using a manager.
1064 return
1066 result = self.pool.map_async(
1067 time.sleep, [0.1 for i in range(10000)], chunksize=1
1069 self.pool.terminate()
1070 join = TimingWrapper(self.pool.join)
1071 join()
1072 self.assertTrue(join.elapsed < 0.2)
1074 class _TestPoolWorkerLifetime(BaseTestCase):
1076 ALLOWED_TYPES = ('processes', )
1077 def test_pool_worker_lifetime(self):
1078 p = multiprocessing.Pool(3, maxtasksperchild=10)
1079 self.assertEqual(3, len(p._pool))
1080 origworkerpids = [w.pid for w in p._pool]
1081 # Run many tasks so each worker gets replaced (hopefully)
1082 results = []
1083 for i in range(100):
1084 results.append(p.apply_async(sqr, (i, )))
1085 # Fetch the results and verify we got the right answers,
1086 # also ensuring all the tasks have completed.
1087 for (j, res) in enumerate(results):
1088 self.assertEqual(res.get(), sqr(j))
1089 # Refill the pool
1090 p._repopulate_pool()
1091 # Wait until all workers are alive
1092 countdown = 5
1093 while countdown and not all(w.is_alive() for w in p._pool):
1094 countdown -= 1
1095 time.sleep(DELTA)
1096 finalworkerpids = [w.pid for w in p._pool]
1097 # All pids should be assigned. See issue #7805.
1098 self.assertNotIn(None, origworkerpids)
1099 self.assertNotIn(None, finalworkerpids)
1100 # Finally, check that the worker pids have changed
1101 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1102 p.close()
1103 p.join()
1106 # Test that manager has expected number of shared objects left
1109 class _TestZZZNumberOfObjects(BaseTestCase):
1110 # Because test cases are sorted alphabetically, this one will get
1111 # run after all the other tests for the manager. It tests that
1112 # there have been no "reference leaks" for the manager's shared
1113 # objects. Note the comment in _TestPool.test_terminate().
1114 ALLOWED_TYPES = ('manager',)
1116 def test_number_of_objects(self):
1117 EXPECTED_NUMBER = 1 # the pool object is still alive
1118 multiprocessing.active_children() # discard dead process objs
1119 gc.collect() # do garbage collection
1120 refs = self.manager._number_of_objects()
1121 debug_info = self.manager._debug_info()
1122 if refs != EXPECTED_NUMBER:
1123 print self.manager._debug_info()
1124 print debug_info
1126 self.assertEqual(refs, EXPECTED_NUMBER)
1129 # Test of creating a customized manager class
1132 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1134 class FooBar(object):
1135 def f(self):
1136 return 'f()'
1137 def g(self):
1138 raise ValueError
1139 def _h(self):
1140 return '_h()'
1142 def baz():
1143 for i in xrange(10):
1144 yield i*i
1146 class IteratorProxy(BaseProxy):
1147 _exposed_ = ('next', '__next__')
1148 def __iter__(self):
1149 return self
1150 def next(self):
1151 return self._callmethod('next')
1152 def __next__(self):
1153 return self._callmethod('__next__')
1155 class MyManager(BaseManager):
1156 pass
1158 MyManager.register('Foo', callable=FooBar)
1159 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1160 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1163 class _TestMyManager(BaseTestCase):
1165 ALLOWED_TYPES = ('manager',)
1167 def test_mymanager(self):
1168 manager = MyManager()
1169 manager.start()
1171 foo = manager.Foo()
1172 bar = manager.Bar()
1173 baz = manager.baz()
1175 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1176 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1178 self.assertEqual(foo_methods, ['f', 'g'])
1179 self.assertEqual(bar_methods, ['f', '_h'])
1181 self.assertEqual(foo.f(), 'f()')
1182 self.assertRaises(ValueError, foo.g)
1183 self.assertEqual(foo._callmethod('f'), 'f()')
1184 self.assertRaises(RemoteError, foo._callmethod, '_h')
1186 self.assertEqual(bar.f(), 'f()')
1187 self.assertEqual(bar._h(), '_h()')
1188 self.assertEqual(bar._callmethod('f'), 'f()')
1189 self.assertEqual(bar._callmethod('_h'), '_h()')
1191 self.assertEqual(list(baz), [i*i for i in range(10)])
1193 manager.shutdown()
1196 # Test of connecting to a remote server and using xmlrpclib for serialization
1199 _queue = Queue.Queue()
1200 def get_queue():
1201 return _queue
1203 class QueueManager(BaseManager):
1204 '''manager class used by server process'''
1205 QueueManager.register('get_queue', callable=get_queue)
1207 class QueueManager2(BaseManager):
1208 '''manager class which specifies the same interface as QueueManager'''
1209 QueueManager2.register('get_queue')
1212 SERIALIZER = 'xmlrpclib'
1214 class _TestRemoteManager(BaseTestCase):
1216 ALLOWED_TYPES = ('manager',)
1218 def _putter(self, address, authkey):
1219 manager = QueueManager2(
1220 address=address, authkey=authkey, serializer=SERIALIZER
1222 manager.connect()
1223 queue = manager.get_queue()
1224 queue.put(('hello world', None, True, 2.25))
1226 def test_remote(self):
1227 authkey = os.urandom(32)
1229 manager = QueueManager(
1230 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1232 manager.start()
1234 p = self.Process(target=self._putter, args=(manager.address, authkey))
1235 p.start()
1237 manager2 = QueueManager2(
1238 address=manager.address, authkey=authkey, serializer=SERIALIZER
1240 manager2.connect()
1241 queue = manager2.get_queue()
1243 # Note that xmlrpclib will deserialize object as a list not a tuple
1244 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1246 # Because we are using xmlrpclib for serialization instead of
1247 # pickle this will cause a serialization error.
1248 self.assertRaises(Exception, queue.put, time.sleep)
1250 # Make queue finalizer run before the server is stopped
1251 del queue
1252 manager.shutdown()
1254 class _TestManagerRestart(BaseTestCase):
1256 def _putter(self, address, authkey):
1257 manager = QueueManager(
1258 address=address, authkey=authkey, serializer=SERIALIZER)
1259 manager.connect()
1260 queue = manager.get_queue()
1261 queue.put('hello world')
1263 def test_rapid_restart(self):
1264 authkey = os.urandom(32)
1265 manager = QueueManager(
1266 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1267 addr = manager.get_server().address
1268 manager.start()
1270 p = self.Process(target=self._putter, args=(manager.address, authkey))
1271 p.start()
1272 queue = manager.get_queue()
1273 self.assertEqual(queue.get(), 'hello world')
1274 del queue
1275 manager.shutdown()
1276 manager = QueueManager(
1277 address=addr, authkey=authkey, serializer=SERIALIZER)
1278 manager.start()
1279 manager.shutdown()
1285 SENTINEL = latin('')
1287 class _TestConnection(BaseTestCase):
1289 ALLOWED_TYPES = ('processes', 'threads')
1291 def _echo(self, conn):
1292 for msg in iter(conn.recv_bytes, SENTINEL):
1293 conn.send_bytes(msg)
1294 conn.close()
1296 def test_connection(self):
1297 conn, child_conn = self.Pipe()
1299 p = self.Process(target=self._echo, args=(child_conn,))
1300 p.daemon = True
1301 p.start()
1303 seq = [1, 2.25, None]
1304 msg = latin('hello world')
1305 longmsg = msg * 10
1306 arr = array.array('i', range(4))
1308 if self.TYPE == 'processes':
1309 self.assertEqual(type(conn.fileno()), int)
1311 self.assertEqual(conn.send(seq), None)
1312 self.assertEqual(conn.recv(), seq)
1314 self.assertEqual(conn.send_bytes(msg), None)
1315 self.assertEqual(conn.recv_bytes(), msg)
1317 if self.TYPE == 'processes':
1318 buffer = array.array('i', [0]*10)
1319 expected = list(arr) + [0] * (10 - len(arr))
1320 self.assertEqual(conn.send_bytes(arr), None)
1321 self.assertEqual(conn.recv_bytes_into(buffer),
1322 len(arr) * buffer.itemsize)
1323 self.assertEqual(list(buffer), expected)
1325 buffer = array.array('i', [0]*10)
1326 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1327 self.assertEqual(conn.send_bytes(arr), None)
1328 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1329 len(arr) * buffer.itemsize)
1330 self.assertEqual(list(buffer), expected)
1332 buffer = bytearray(latin(' ' * 40))
1333 self.assertEqual(conn.send_bytes(longmsg), None)
1334 try:
1335 res = conn.recv_bytes_into(buffer)
1336 except multiprocessing.BufferTooShort, e:
1337 self.assertEqual(e.args, (longmsg,))
1338 else:
1339 self.fail('expected BufferTooShort, got %s' % res)
1341 poll = TimingWrapper(conn.poll)
1343 self.assertEqual(poll(), False)
1344 self.assertTimingAlmostEqual(poll.elapsed, 0)
1346 self.assertEqual(poll(TIMEOUT1), False)
1347 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1349 conn.send(None)
1351 self.assertEqual(poll(TIMEOUT1), True)
1352 self.assertTimingAlmostEqual(poll.elapsed, 0)
1354 self.assertEqual(conn.recv(), None)
1356 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1357 conn.send_bytes(really_big_msg)
1358 self.assertEqual(conn.recv_bytes(), really_big_msg)
1360 conn.send_bytes(SENTINEL) # tell child to quit
1361 child_conn.close()
1363 if self.TYPE == 'processes':
1364 self.assertEqual(conn.readable, True)
1365 self.assertEqual(conn.writable, True)
1366 self.assertRaises(EOFError, conn.recv)
1367 self.assertRaises(EOFError, conn.recv_bytes)
1369 p.join()
1371 def test_duplex_false(self):
1372 reader, writer = self.Pipe(duplex=False)
1373 self.assertEqual(writer.send(1), None)
1374 self.assertEqual(reader.recv(), 1)
1375 if self.TYPE == 'processes':
1376 self.assertEqual(reader.readable, True)
1377 self.assertEqual(reader.writable, False)
1378 self.assertEqual(writer.readable, False)
1379 self.assertEqual(writer.writable, True)
1380 self.assertRaises(IOError, reader.send, 2)
1381 self.assertRaises(IOError, writer.recv)
1382 self.assertRaises(IOError, writer.poll)
1384 def test_spawn_close(self):
1385 # We test that a pipe connection can be closed by parent
1386 # process immediately after child is spawned. On Windows this
1387 # would have sometimes failed on old versions because
1388 # child_conn would be closed before the child got a chance to
1389 # duplicate it.
1390 conn, child_conn = self.Pipe()
1392 p = self.Process(target=self._echo, args=(child_conn,))
1393 p.start()
1394 child_conn.close() # this might complete before child initializes
1396 msg = latin('hello')
1397 conn.send_bytes(msg)
1398 self.assertEqual(conn.recv_bytes(), msg)
1400 conn.send_bytes(SENTINEL)
1401 conn.close()
1402 p.join()
1404 def test_sendbytes(self):
1405 if self.TYPE != 'processes':
1406 return
1408 msg = latin('abcdefghijklmnopqrstuvwxyz')
1409 a, b = self.Pipe()
1411 a.send_bytes(msg)
1412 self.assertEqual(b.recv_bytes(), msg)
1414 a.send_bytes(msg, 5)
1415 self.assertEqual(b.recv_bytes(), msg[5:])
1417 a.send_bytes(msg, 7, 8)
1418 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1420 a.send_bytes(msg, 26)
1421 self.assertEqual(b.recv_bytes(), latin(''))
1423 a.send_bytes(msg, 26, 0)
1424 self.assertEqual(b.recv_bytes(), latin(''))
1426 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1428 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1430 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1432 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1434 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1436 class _TestListenerClient(BaseTestCase):
1438 ALLOWED_TYPES = ('processes', 'threads')
1440 def _test(self, address):
1441 conn = self.connection.Client(address)
1442 conn.send('hello')
1443 conn.close()
1445 def test_listener_client(self):
1446 for family in self.connection.families:
1447 l = self.connection.Listener(family=family)
1448 p = self.Process(target=self._test, args=(l.address,))
1449 p.daemon = True
1450 p.start()
1451 conn = l.accept()
1452 self.assertEqual(conn.recv(), 'hello')
1453 p.join()
1454 l.close()
1456 # Test of sending connection and socket objects between processes
1459 class _TestPicklingConnections(BaseTestCase):
1461 ALLOWED_TYPES = ('processes',)
1463 def _listener(self, conn, families):
1464 for fam in families:
1465 l = self.connection.Listener(family=fam)
1466 conn.send(l.address)
1467 new_conn = l.accept()
1468 conn.send(new_conn)
1470 if self.TYPE == 'processes':
1471 l = socket.socket()
1472 l.bind(('localhost', 0))
1473 conn.send(l.getsockname())
1474 l.listen(1)
1475 new_conn, addr = l.accept()
1476 conn.send(new_conn)
1478 conn.recv()
1480 def _remote(self, conn):
1481 for (address, msg) in iter(conn.recv, None):
1482 client = self.connection.Client(address)
1483 client.send(msg.upper())
1484 client.close()
1486 if self.TYPE == 'processes':
1487 address, msg = conn.recv()
1488 client = socket.socket()
1489 client.connect(address)
1490 client.sendall(msg.upper())
1491 client.close()
1493 conn.close()
1495 def test_pickling(self):
1496 try:
1497 multiprocessing.allow_connection_pickling()
1498 except ImportError:
1499 return
1501 families = self.connection.families
1503 lconn, lconn0 = self.Pipe()
1504 lp = self.Process(target=self._listener, args=(lconn0, families))
1505 lp.start()
1506 lconn0.close()
1508 rconn, rconn0 = self.Pipe()
1509 rp = self.Process(target=self._remote, args=(rconn0,))
1510 rp.start()
1511 rconn0.close()
1513 for fam in families:
1514 msg = ('This connection uses family %s' % fam).encode('ascii')
1515 address = lconn.recv()
1516 rconn.send((address, msg))
1517 new_conn = lconn.recv()
1518 self.assertEqual(new_conn.recv(), msg.upper())
1520 rconn.send(None)
1522 if self.TYPE == 'processes':
1523 msg = latin('This connection uses a normal socket')
1524 address = lconn.recv()
1525 rconn.send((address, msg))
1526 if hasattr(socket, 'fromfd'):
1527 new_conn = lconn.recv()
1528 self.assertEqual(new_conn.recv(100), msg.upper())
1529 else:
1530 # XXX On Windows with Py2.6 need to backport fromfd()
1531 discard = lconn.recv_bytes()
1533 lconn.send(None)
1535 rconn.close()
1536 lconn.close()
1538 lp.join()
1539 rp.join()
1545 class _TestHeap(BaseTestCase):
1547 ALLOWED_TYPES = ('processes',)
1549 def test_heap(self):
1550 iterations = 5000
1551 maxblocks = 50
1552 blocks = []
1554 # create and destroy lots of blocks of different sizes
1555 for i in xrange(iterations):
1556 size = int(random.lognormvariate(0, 1) * 1000)
1557 b = multiprocessing.heap.BufferWrapper(size)
1558 blocks.append(b)
1559 if len(blocks) > maxblocks:
1560 i = random.randrange(maxblocks)
1561 del blocks[i]
1563 # get the heap object
1564 heap = multiprocessing.heap.BufferWrapper._heap
1566 # verify the state of the heap
1567 all = []
1568 occupied = 0
1569 for L in heap._len_to_seq.values():
1570 for arena, start, stop in L:
1571 all.append((heap._arenas.index(arena), start, stop,
1572 stop-start, 'free'))
1573 for arena, start, stop in heap._allocated_blocks:
1574 all.append((heap._arenas.index(arena), start, stop,
1575 stop-start, 'occupied'))
1576 occupied += (stop-start)
1578 all.sort()
1580 for i in range(len(all)-1):
1581 (arena, start, stop) = all[i][:3]
1582 (narena, nstart, nstop) = all[i+1][:3]
1583 self.assertTrue((arena != narena and nstart == 0) or
1584 (stop == nstart))
1590 class _Foo(Structure):
1591 _fields_ = [
1592 ('x', c_int),
1593 ('y', c_double)
1596 class _TestSharedCTypes(BaseTestCase):
1598 ALLOWED_TYPES = ('processes',)
1600 def _double(self, x, y, foo, arr, string):
1601 x.value *= 2
1602 y.value *= 2
1603 foo.x *= 2
1604 foo.y *= 2
1605 string.value *= 2
1606 for i in range(len(arr)):
1607 arr[i] *= 2
1609 @unittest.skipIf(Value is None, "requires ctypes.Value")
1610 def test_sharedctypes(self, lock=False):
1611 x = Value('i', 7, lock=lock)
1612 y = Value(c_double, 1.0/3.0, lock=lock)
1613 foo = Value(_Foo, 3, 2, lock=lock)
1614 arr = self.Array('d', range(10), lock=lock)
1615 string = self.Array('c', 20, lock=lock)
1616 string.value = 'hello'
1618 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1619 p.start()
1620 p.join()
1622 self.assertEqual(x.value, 14)
1623 self.assertAlmostEqual(y.value, 2.0/3.0)
1624 self.assertEqual(foo.x, 6)
1625 self.assertAlmostEqual(foo.y, 4.0)
1626 for i in range(10):
1627 self.assertAlmostEqual(arr[i], i*2)
1628 self.assertEqual(string.value, latin('hellohello'))
1630 @unittest.skipIf(Value is None, "requires ctypes.Value")
1631 def test_synchronize(self):
1632 self.test_sharedctypes(lock=True)
1634 @unittest.skipIf(ctypes_copy is None, "requires ctypes.copy")
1635 def test_copy(self):
1636 foo = _Foo(2, 5.0)
1637 bar = ctypes_copy(foo)
1638 foo.x = 0
1639 foo.y = 0
1640 self.assertEqual(bar.x, 2)
1641 self.assertAlmostEqual(bar.y, 5.0)
1647 class _TestFinalize(BaseTestCase):
1649 ALLOWED_TYPES = ('processes',)
1651 def _test_finalize(self, conn):
1652 class Foo(object):
1653 pass
1655 a = Foo()
1656 util.Finalize(a, conn.send, args=('a',))
1657 del a # triggers callback for a
1659 b = Foo()
1660 close_b = util.Finalize(b, conn.send, args=('b',))
1661 close_b() # triggers callback for b
1662 close_b() # does nothing because callback has already been called
1663 del b # does nothing because callback has already been called
1665 c = Foo()
1666 util.Finalize(c, conn.send, args=('c',))
1668 d10 = Foo()
1669 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1671 d01 = Foo()
1672 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1673 d02 = Foo()
1674 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1675 d03 = Foo()
1676 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1678 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1680 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1682 # call mutliprocessing's cleanup function then exit process without
1683 # garbage collecting locals
1684 util._exit_function()
1685 conn.close()
1686 os._exit(0)
1688 def test_finalize(self):
1689 conn, child_conn = self.Pipe()
1691 p = self.Process(target=self._test_finalize, args=(child_conn,))
1692 p.start()
1693 p.join()
1695 result = [obj for obj in iter(conn.recv, 'STOP')]
1696 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1699 # Test that from ... import * works for each module
1702 class _TestImportStar(BaseTestCase):
1704 ALLOWED_TYPES = ('processes',)
1706 def test_import(self):
1707 modules = [
1708 'multiprocessing', 'multiprocessing.connection',
1709 'multiprocessing.heap', 'multiprocessing.managers',
1710 'multiprocessing.pool', 'multiprocessing.process',
1711 'multiprocessing.reduction',
1712 'multiprocessing.synchronize', 'multiprocessing.util'
1715 if c_int is not None:
1716 # This module requires _ctypes
1717 modules.append('multiprocessing.sharedctypes')
1719 for name in modules:
1720 __import__(name)
1721 mod = sys.modules[name]
1723 for attr in getattr(mod, '__all__', ()):
1724 self.assertTrue(
1725 hasattr(mod, attr),
1726 '%r does not have attribute %r' % (mod, attr)
1730 # Quick test that logging works -- does not test logging output
1733 class _TestLogging(BaseTestCase):
1735 ALLOWED_TYPES = ('processes',)
1737 def test_enable_logging(self):
1738 logger = multiprocessing.get_logger()
1739 logger.setLevel(util.SUBWARNING)
1740 self.assertTrue(logger is not None)
1741 logger.debug('this will not be printed')
1742 logger.info('nor will this')
1743 logger.setLevel(LOG_LEVEL)
1745 def _test_level(self, conn):
1746 logger = multiprocessing.get_logger()
1747 conn.send(logger.getEffectiveLevel())
1749 def test_level(self):
1750 LEVEL1 = 32
1751 LEVEL2 = 37
1753 logger = multiprocessing.get_logger()
1754 root_logger = logging.getLogger()
1755 root_level = root_logger.level
1757 reader, writer = multiprocessing.Pipe(duplex=False)
1759 logger.setLevel(LEVEL1)
1760 self.Process(target=self._test_level, args=(writer,)).start()
1761 self.assertEqual(LEVEL1, reader.recv())
1763 logger.setLevel(logging.NOTSET)
1764 root_logger.setLevel(LEVEL2)
1765 self.Process(target=self._test_level, args=(writer,)).start()
1766 self.assertEqual(LEVEL2, reader.recv())
1768 root_logger.setLevel(root_level)
1769 logger.setLevel(level=LOG_LEVEL)
1772 # class _TestLoggingProcessName(BaseTestCase):
1774 # def handle(self, record):
1775 # assert record.processName == multiprocessing.current_process().name
1776 # self.__handled = True
1778 # def test_logging(self):
1779 # handler = logging.Handler()
1780 # handler.handle = self.handle
1781 # self.__handled = False
1782 # # Bypass getLogger() and side-effects
1783 # logger = logging.getLoggerClass()(
1784 # 'multiprocessing.test.TestLoggingProcessName')
1785 # logger.addHandler(handler)
1786 # logger.propagate = False
1788 # logger.warn('foo')
1789 # assert self.__handled
1792 # Test to verify handle verification, see issue 3321
1795 class TestInvalidHandle(unittest.TestCase):
1797 @unittest.skipIf(WIN32, "skipped on Windows")
1798 def test_invalid_handles(self):
1799 conn = _multiprocessing.Connection(44977608)
1800 self.assertRaises(IOError, conn.poll)
1801 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1804 # Functions used to create test cases from the base ones in this module
1807 def get_attributes(Source, names):
1808 d = {}
1809 for name in names:
1810 obj = getattr(Source, name)
1811 if type(obj) == type(get_attributes):
1812 obj = staticmethod(obj)
1813 d[name] = obj
1814 return d
1816 def create_test_cases(Mixin, type):
1817 result = {}
1818 glob = globals()
1819 Type = type.capitalize()
1821 for name in glob.keys():
1822 if name.startswith('_Test'):
1823 base = glob[name]
1824 if type in base.ALLOWED_TYPES:
1825 newname = 'With' + Type + name[1:]
1826 class Temp(base, unittest.TestCase, Mixin):
1827 pass
1828 result[newname] = Temp
1829 Temp.__name__ = newname
1830 Temp.__module__ = Mixin.__module__
1831 return result
1834 # Create test cases
1837 class ProcessesMixin(object):
1838 TYPE = 'processes'
1839 Process = multiprocessing.Process
1840 locals().update(get_attributes(multiprocessing, (
1841 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1842 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1843 'RawArray', 'current_process', 'active_children', 'Pipe',
1844 'connection', 'JoinableQueue'
1847 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1848 globals().update(testcases_processes)
1851 class ManagerMixin(object):
1852 TYPE = 'manager'
1853 Process = multiprocessing.Process
1854 manager = object.__new__(multiprocessing.managers.SyncManager)
1855 locals().update(get_attributes(manager, (
1856 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1857 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1858 'Namespace', 'JoinableQueue'
1861 testcases_manager = create_test_cases(ManagerMixin, type='manager')
1862 globals().update(testcases_manager)
1865 class ThreadsMixin(object):
1866 TYPE = 'threads'
1867 Process = multiprocessing.dummy.Process
1868 locals().update(get_attributes(multiprocessing.dummy, (
1869 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1870 'Condition', 'Event', 'Value', 'Array', 'current_process',
1871 'active_children', 'Pipe', 'connection', 'dict', 'list',
1872 'Namespace', 'JoinableQueue'
1875 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1876 globals().update(testcases_threads)
1878 class OtherTest(unittest.TestCase):
1879 # TODO: add more tests for deliver/answer challenge.
1880 def test_deliver_challenge_auth_failure(self):
1881 class _FakeConnection(object):
1882 def recv_bytes(self, size):
1883 return b'something bogus'
1884 def send_bytes(self, data):
1885 pass
1886 self.assertRaises(multiprocessing.AuthenticationError,
1887 multiprocessing.connection.deliver_challenge,
1888 _FakeConnection(), b'abc')
1890 def test_answer_challenge_auth_failure(self):
1891 class _FakeConnection(object):
1892 def __init__(self):
1893 self.count = 0
1894 def recv_bytes(self, size):
1895 self.count += 1
1896 if self.count == 1:
1897 return multiprocessing.connection.CHALLENGE
1898 elif self.count == 2:
1899 return b'something bogus'
1900 return b''
1901 def send_bytes(self, data):
1902 pass
1903 self.assertRaises(multiprocessing.AuthenticationError,
1904 multiprocessing.connection.answer_challenge,
1905 _FakeConnection(), b'abc')
1908 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1911 def initializer(ns):
1912 ns.test += 1
1914 class TestInitializers(unittest.TestCase):
1915 def setUp(self):
1916 self.mgr = multiprocessing.Manager()
1917 self.ns = self.mgr.Namespace()
1918 self.ns.test = 0
1920 def tearDown(self):
1921 self.mgr.shutdown()
1923 def test_manager_initializer(self):
1924 m = multiprocessing.managers.SyncManager()
1925 self.assertRaises(TypeError, m.start, 1)
1926 m.start(initializer, (self.ns,))
1927 self.assertEqual(self.ns.test, 1)
1928 m.shutdown()
1930 def test_pool_initializer(self):
1931 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1932 p = multiprocessing.Pool(1, initializer, (self.ns,))
1933 p.close()
1934 p.join()
1935 self.assertEqual(self.ns.test, 1)
1938 # Issue 5155, 5313, 5331: Test process in processes
1939 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1942 def _ThisSubProcess(q):
1943 try:
1944 item = q.get(block=False)
1945 except Queue.Empty:
1946 pass
1948 def _TestProcess(q):
1949 queue = multiprocessing.Queue()
1950 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1951 subProc.start()
1952 subProc.join()
1954 def _afunc(x):
1955 return x*x
1957 def pool_in_process():
1958 pool = multiprocessing.Pool(processes=4)
1959 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1961 class _file_like(object):
1962 def __init__(self, delegate):
1963 self._delegate = delegate
1964 self._pid = None
1966 @property
1967 def cache(self):
1968 pid = os.getpid()
1969 # There are no race conditions since fork keeps only the running thread
1970 if pid != self._pid:
1971 self._pid = pid
1972 self._cache = []
1973 return self._cache
1975 def write(self, data):
1976 self.cache.append(data)
1978 def flush(self):
1979 self._delegate.write(''.join(self.cache))
1980 self._cache = []
1982 class TestStdinBadfiledescriptor(unittest.TestCase):
1984 def test_queue_in_process(self):
1985 queue = multiprocessing.Queue()
1986 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1987 proc.start()
1988 proc.join()
1990 def test_pool_in_process(self):
1991 p = multiprocessing.Process(target=pool_in_process)
1992 p.start()
1993 p.join()
1995 def test_flushing(self):
1996 sio = StringIO()
1997 flike = _file_like(sio)
1998 flike.write('foo')
1999 proc = multiprocessing.Process(target=lambda: flike.flush())
2000 flike.flush()
2001 assert sio.getvalue() == 'foo'
2003 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2004 TestStdinBadfiledescriptor]
2010 def test_main(run=None):
2011 if sys.platform.startswith("linux"):
2012 try:
2013 lock = multiprocessing.RLock()
2014 except OSError:
2015 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2017 if run is None:
2018 from test.test_support import run_unittest as run
2020 util.get_temp_dir() # creates temp directory for use by all processes
2022 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2024 ProcessesMixin.pool = multiprocessing.Pool(4)
2025 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2026 ManagerMixin.manager.__init__()
2027 ManagerMixin.manager.start()
2028 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2030 testcases = (
2031 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2032 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2033 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2034 testcases_other
2037 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2038 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2039 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2040 # module during these tests is at least platform dependent and possibly
2041 # non-deterministic on any given platform. So we don't mind if the listed
2042 # warnings aren't actually raised.
2043 with test_support.check_py3k_warnings(
2044 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2045 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2046 quiet=True):
2047 run(suite)
2049 ThreadsMixin.pool.terminate()
2050 ProcessesMixin.pool.terminate()
2051 ManagerMixin.pool.terminate()
2052 ManagerMixin.manager.shutdown()
2054 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2056 def main():
2057 test_main(unittest.TextTestRunner(verbosity=2).run)
2059 if __name__ == '__main__':
2060 main()