4 # Unit tests for the multiprocessing package
18 from test
import test_support
19 from StringIO
import StringIO
20 _multiprocessing
= test_support
.import_module('_multiprocessing')
21 # import threading after _multiprocessing to raise a more revelant error
22 # message: "No module named _multiprocessing". _multiprocessing is not compiled
23 # without thread support.
26 # Work around broken sem_open implementations
27 test_support
.import_module('multiprocessing.synchronize')
29 import multiprocessing
.dummy
30 import multiprocessing
.connection
31 import multiprocessing
.managers
32 import multiprocessing
.heap
33 import multiprocessing
.pool
35 from multiprocessing
import util
47 LOG_LEVEL
= util
.SUBWARNING
48 #LOG_LEVEL = logging.DEBUG
51 CHECK_TIMINGS
= False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
56 TIMEOUT1
, TIMEOUT2
, TIMEOUT3
= 0.82, 0.35, 1.4
58 TIMEOUT1
, TIMEOUT2
, TIMEOUT3
= 0.1, 0.1, 0.1
60 HAVE_GETVALUE
= not getattr(_multiprocessing
,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
63 WIN32
= (sys
.platform
== "win32")
66 # Some tests require ctypes
70 from ctypes
import Structure
, c_int
, c_double
73 c_int
= c_double
= None
76 from ctypes
import Value
81 from ctypes
import copy
as ctypes_copy
86 # Creates a wrapper for a function which records the time it takes to finish
89 class TimingWrapper(object):
91 def __init__(self
, func
):
95 def __call__(self
, *args
, **kwds
):
98 return self
.func(*args
, **kwds
)
100 self
.elapsed
= time
.time() - t
103 # Base class for test cases
106 class BaseTestCase(object):
108 ALLOWED_TYPES
= ('processes', 'manager', 'threads')
110 def assertTimingAlmostEqual(self
, a
, b
):
112 self
.assertAlmostEqual(a
, b
, 1)
114 def assertReturnsIfImplemented(self
, value
, func
, *args
):
117 except NotImplementedError:
120 return self
.assertEqual(value
, res
)
123 # Return the value of a semaphore
128 return self
.get_value()
129 except AttributeError:
131 return self
._Semaphore
__value
132 except AttributeError:
135 except AttributeError:
136 raise NotImplementedError
142 class _TestProcess(BaseTestCase
):
144 ALLOWED_TYPES
= ('processes', 'threads')
146 def test_current(self
):
147 if self
.TYPE
== 'threads':
150 current
= self
.current_process()
151 authkey
= current
.authkey
153 self
.assertTrue(current
.is_alive())
154 self
.assertTrue(not current
.daemon
)
155 self
.assertIsInstance(authkey
, bytes
)
156 self
.assertTrue(len(authkey
) > 0)
157 self
.assertEqual(current
.ident
, os
.getpid())
158 self
.assertEqual(current
.exitcode
, None)
160 def _test(self
, q
, *args
, **kwds
):
161 current
= self
.current_process()
165 if self
.TYPE
!= 'threads':
166 q
.put(bytes(current
.authkey
))
169 def test_process(self
):
173 kwargs
= {'hello':23, 'bye':2.54}
176 target
=self
._test
, args
=args
, kwargs
=kwargs
, name
=name
179 current
= self
.current_process()
181 if self
.TYPE
!= 'threads':
182 self
.assertEquals(p
.authkey
, current
.authkey
)
183 self
.assertEquals(p
.is_alive(), False)
184 self
.assertEquals(p
.daemon
, True)
185 self
.assertNotIn(p
, self
.active_children())
186 self
.assertTrue(type(self
.active_children()) is list)
187 self
.assertEqual(p
.exitcode
, None)
191 self
.assertEquals(p
.exitcode
, None)
192 self
.assertEquals(p
.is_alive(), True)
193 self
.assertIn(p
, self
.active_children())
195 self
.assertEquals(q
.get(), args
[1:])
196 self
.assertEquals(q
.get(), kwargs
)
197 self
.assertEquals(q
.get(), p
.name
)
198 if self
.TYPE
!= 'threads':
199 self
.assertEquals(q
.get(), current
.authkey
)
200 self
.assertEquals(q
.get(), p
.pid
)
204 self
.assertEquals(p
.exitcode
, 0)
205 self
.assertEquals(p
.is_alive(), False)
206 self
.assertNotIn(p
, self
.active_children())
208 def _test_terminate(self
):
211 def test_terminate(self
):
212 if self
.TYPE
== 'threads':
215 p
= self
.Process(target
=self
._test
_terminate
)
219 self
.assertEqual(p
.is_alive(), True)
220 self
.assertIn(p
, self
.active_children())
221 self
.assertEqual(p
.exitcode
, None)
225 join
= TimingWrapper(p
.join
)
226 self
.assertEqual(join(), None)
227 self
.assertTimingAlmostEqual(join
.elapsed
, 0.0)
229 self
.assertEqual(p
.is_alive(), False)
230 self
.assertNotIn(p
, self
.active_children())
234 # XXX sometimes get p.exitcode == 0 on Windows ...
235 #self.assertEqual(p.exitcode, -signal.SIGTERM)
237 def test_cpu_count(self
):
239 cpus
= multiprocessing
.cpu_count()
240 except NotImplementedError:
242 self
.assertTrue(type(cpus
) is int)
243 self
.assertTrue(cpus
>= 1)
245 def test_active_children(self
):
246 self
.assertEqual(type(self
.active_children()), list)
248 p
= self
.Process(target
=time
.sleep
, args
=(DELTA
,))
249 self
.assertNotIn(p
, self
.active_children())
252 self
.assertIn(p
, self
.active_children())
255 self
.assertNotIn(p
, self
.active_children())
257 def _test_recursion(self
, wconn
, id):
258 from multiprocessing
import forking
263 target
=self
._test
_recursion
, args
=(wconn
, id+[i
])
268 def test_recursion(self
):
269 rconn
, wconn
= self
.Pipe(duplex
=False)
270 self
._test
_recursion
(wconn
, [])
275 result
.append(rconn
.recv())
286 self
.assertEqual(result
, expected
)
292 class _UpperCaser(multiprocessing
.Process
):
295 multiprocessing
.Process
.__init
__(self
)
296 self
.child_conn
, self
.parent_conn
= multiprocessing
.Pipe()
299 self
.parent_conn
.close()
300 for s
in iter(self
.child_conn
.recv
, None):
301 self
.child_conn
.send(s
.upper())
302 self
.child_conn
.close()
305 assert type(s
) is str
306 self
.parent_conn
.send(s
)
307 return self
.parent_conn
.recv()
310 self
.parent_conn
.send(None)
311 self
.parent_conn
.close()
312 self
.child_conn
.close()
314 class _TestSubclassingProcess(BaseTestCase
):
316 ALLOWED_TYPES
= ('processes',)
318 def test_subclassing(self
):
319 uppercaser
= _UpperCaser()
321 self
.assertEqual(uppercaser
.submit('hello'), 'HELLO')
322 self
.assertEqual(uppercaser
.submit('world'), 'WORLD')
331 if hasattr(q
, 'empty'):
334 return q
.qsize() == 0
336 def queue_full(q
, maxsize
):
337 if hasattr(q
, 'full'):
340 return q
.qsize() == maxsize
343 class _TestQueue(BaseTestCase
):
346 def _test_put(self
, queue
, child_can_start
, parent_can_continue
):
347 child_can_start
.wait()
350 parent_can_continue
.set()
354 queue
= self
.Queue(maxsize
=MAXSIZE
)
355 child_can_start
= self
.Event()
356 parent_can_continue
= self
.Event()
359 target
=self
._test
_put
,
360 args
=(queue
, child_can_start
, parent_can_continue
)
365 self
.assertEqual(queue_empty(queue
), True)
366 self
.assertEqual(queue_full(queue
, MAXSIZE
), False)
370 queue
.put(3, True, None)
372 queue
.put(5, False, None)
375 # the values may be in buffer but not yet in pipe so sleep a bit
378 self
.assertEqual(queue_empty(queue
), False)
379 self
.assertEqual(queue_full(queue
, MAXSIZE
), True)
381 put
= TimingWrapper(queue
.put
)
382 put_nowait
= TimingWrapper(queue
.put_nowait
)
384 self
.assertRaises(Queue
.Full
, put
, 7, False)
385 self
.assertTimingAlmostEqual(put
.elapsed
, 0)
387 self
.assertRaises(Queue
.Full
, put
, 7, False, None)
388 self
.assertTimingAlmostEqual(put
.elapsed
, 0)
390 self
.assertRaises(Queue
.Full
, put_nowait
, 7)
391 self
.assertTimingAlmostEqual(put_nowait
.elapsed
, 0)
393 self
.assertRaises(Queue
.Full
, put
, 7, True, TIMEOUT1
)
394 self
.assertTimingAlmostEqual(put
.elapsed
, TIMEOUT1
)
396 self
.assertRaises(Queue
.Full
, put
, 7, False, TIMEOUT2
)
397 self
.assertTimingAlmostEqual(put
.elapsed
, 0)
399 self
.assertRaises(Queue
.Full
, put
, 7, True, timeout
=TIMEOUT3
)
400 self
.assertTimingAlmostEqual(put
.elapsed
, TIMEOUT3
)
402 child_can_start
.set()
403 parent_can_continue
.wait()
405 self
.assertEqual(queue_empty(queue
), True)
406 self
.assertEqual(queue_full(queue
, MAXSIZE
), False)
410 def _test_get(self
, queue
, child_can_start
, parent_can_continue
):
411 child_can_start
.wait()
417 parent_can_continue
.set()
421 child_can_start
= self
.Event()
422 parent_can_continue
= self
.Event()
425 target
=self
._test
_get
,
426 args
=(queue
, child_can_start
, parent_can_continue
)
431 self
.assertEqual(queue_empty(queue
), True)
433 child_can_start
.set()
434 parent_can_continue
.wait()
437 self
.assertEqual(queue_empty(queue
), False)
439 # Hangs unexpectedly, remove for now
440 #self.assertEqual(queue.get(), 1)
441 self
.assertEqual(queue
.get(True, None), 2)
442 self
.assertEqual(queue
.get(True), 3)
443 self
.assertEqual(queue
.get(timeout
=1), 4)
444 self
.assertEqual(queue
.get_nowait(), 5)
446 self
.assertEqual(queue_empty(queue
), True)
448 get
= TimingWrapper(queue
.get
)
449 get_nowait
= TimingWrapper(queue
.get_nowait
)
451 self
.assertRaises(Queue
.Empty
, get
, False)
452 self
.assertTimingAlmostEqual(get
.elapsed
, 0)
454 self
.assertRaises(Queue
.Empty
, get
, False, None)
455 self
.assertTimingAlmostEqual(get
.elapsed
, 0)
457 self
.assertRaises(Queue
.Empty
, get_nowait
)
458 self
.assertTimingAlmostEqual(get_nowait
.elapsed
, 0)
460 self
.assertRaises(Queue
.Empty
, get
, True, TIMEOUT1
)
461 self
.assertTimingAlmostEqual(get
.elapsed
, TIMEOUT1
)
463 self
.assertRaises(Queue
.Empty
, get
, False, TIMEOUT2
)
464 self
.assertTimingAlmostEqual(get
.elapsed
, 0)
466 self
.assertRaises(Queue
.Empty
, get
, timeout
=TIMEOUT3
)
467 self
.assertTimingAlmostEqual(get
.elapsed
, TIMEOUT3
)
471 def _test_fork(self
, queue
):
472 for i
in range(10, 20):
474 # note that at this point the items may only be buffered, so the
475 # process cannot shutdown until the feeder thread has finished
476 # pushing items onto the pipe.
479 # Old versions of Queue would fail to create a new feeder
480 # thread for a forked process if the original process had its
481 # own feeder thread. This test checks that this no longer
486 # put items on queue so that main process starts a feeder thread
490 # wait to make sure thread starts before we fork a new process
494 p
= self
.Process(target
=self
._test
_fork
, args
=(queue
,))
497 # check that all expected items are in the queue
499 self
.assertEqual(queue
.get(), i
)
500 self
.assertRaises(Queue
.Empty
, queue
.get
, False)
504 def test_qsize(self
):
507 self
.assertEqual(q
.qsize(), 0)
508 except NotImplementedError:
511 self
.assertEqual(q
.qsize(), 1)
513 self
.assertEqual(q
.qsize(), 2)
515 self
.assertEqual(q
.qsize(), 1)
517 self
.assertEqual(q
.qsize(), 0)
519 def _test_task_done(self
, q
):
520 for obj
in iter(q
.get
, None):
524 def test_task_done(self
):
525 queue
= self
.JoinableQueue()
527 if sys
.version_info
< (2, 5) and not hasattr(queue
, 'task_done'):
528 self
.skipTest("requires 'queue.task_done()' method")
530 workers
= [self
.Process(target
=self
._test
_task
_done
, args
=(queue
,))
551 class _TestLock(BaseTestCase
):
555 self
.assertEqual(lock
.acquire(), True)
556 self
.assertEqual(lock
.acquire(False), False)
557 self
.assertEqual(lock
.release(), None)
558 self
.assertRaises((ValueError, threading
.ThreadError
), lock
.release
)
560 def test_rlock(self
):
562 self
.assertEqual(lock
.acquire(), True)
563 self
.assertEqual(lock
.acquire(), True)
564 self
.assertEqual(lock
.acquire(), True)
565 self
.assertEqual(lock
.release(), None)
566 self
.assertEqual(lock
.release(), None)
567 self
.assertEqual(lock
.release(), None)
568 self
.assertRaises((AssertionError, RuntimeError), lock
.release
)
570 def test_lock_context(self
):
575 class _TestSemaphore(BaseTestCase
):
577 def _test_semaphore(self
, sem
):
578 self
.assertReturnsIfImplemented(2, get_value
, sem
)
579 self
.assertEqual(sem
.acquire(), True)
580 self
.assertReturnsIfImplemented(1, get_value
, sem
)
581 self
.assertEqual(sem
.acquire(), True)
582 self
.assertReturnsIfImplemented(0, get_value
, sem
)
583 self
.assertEqual(sem
.acquire(False), False)
584 self
.assertReturnsIfImplemented(0, get_value
, sem
)
585 self
.assertEqual(sem
.release(), None)
586 self
.assertReturnsIfImplemented(1, get_value
, sem
)
587 self
.assertEqual(sem
.release(), None)
588 self
.assertReturnsIfImplemented(2, get_value
, sem
)
590 def test_semaphore(self
):
591 sem
= self
.Semaphore(2)
592 self
._test
_semaphore
(sem
)
593 self
.assertEqual(sem
.release(), None)
594 self
.assertReturnsIfImplemented(3, get_value
, sem
)
595 self
.assertEqual(sem
.release(), None)
596 self
.assertReturnsIfImplemented(4, get_value
, sem
)
598 def test_bounded_semaphore(self
):
599 sem
= self
.BoundedSemaphore(2)
600 self
._test
_semaphore
(sem
)
601 # Currently fails on OS/X
603 # self.assertRaises(ValueError, sem.release)
604 # self.assertReturnsIfImplemented(2, get_value, sem)
606 def test_timeout(self
):
607 if self
.TYPE
!= 'processes':
610 sem
= self
.Semaphore(0)
611 acquire
= TimingWrapper(sem
.acquire
)
613 self
.assertEqual(acquire(False), False)
614 self
.assertTimingAlmostEqual(acquire
.elapsed
, 0.0)
616 self
.assertEqual(acquire(False, None), False)
617 self
.assertTimingAlmostEqual(acquire
.elapsed
, 0.0)
619 self
.assertEqual(acquire(False, TIMEOUT1
), False)
620 self
.assertTimingAlmostEqual(acquire
.elapsed
, 0)
622 self
.assertEqual(acquire(True, TIMEOUT2
), False)
623 self
.assertTimingAlmostEqual(acquire
.elapsed
, TIMEOUT2
)
625 self
.assertEqual(acquire(timeout
=TIMEOUT3
), False)
626 self
.assertTimingAlmostEqual(acquire
.elapsed
, TIMEOUT3
)
629 class _TestCondition(BaseTestCase
):
631 def f(self
, cond
, sleeping
, woken
, timeout
=None):
638 def check_invariant(self
, cond
):
639 # this is only supposed to succeed when there are no sleepers
640 if self
.TYPE
== 'processes':
642 sleepers
= (cond
._sleeping
_count
.get_value() -
643 cond
._woken
_count
.get_value())
644 self
.assertEqual(sleepers
, 0)
645 self
.assertEqual(cond
._wait
_semaphore
.get_value(), 0)
646 except NotImplementedError:
649 def test_notify(self
):
650 cond
= self
.Condition()
651 sleeping
= self
.Semaphore(0)
652 woken
= self
.Semaphore(0)
654 p
= self
.Process(target
=self
.f
, args
=(cond
, sleeping
, woken
))
658 p
= threading
.Thread(target
=self
.f
, args
=(cond
, sleeping
, woken
))
662 # wait for both children to start sleeping
666 # check no process/thread has woken up
668 self
.assertReturnsIfImplemented(0, get_value
, woken
)
670 # wake up one process/thread
675 # check one process/thread has woken up
677 self
.assertReturnsIfImplemented(1, get_value
, woken
)
684 # check other has woken up
686 self
.assertReturnsIfImplemented(2, get_value
, woken
)
688 # check state is not mucked up
689 self
.check_invariant(cond
)
692 def test_notify_all(self
):
693 cond
= self
.Condition()
694 sleeping
= self
.Semaphore(0)
695 woken
= self
.Semaphore(0)
697 # start some threads/processes which will timeout
699 p
= self
.Process(target
=self
.f
,
700 args
=(cond
, sleeping
, woken
, TIMEOUT1
))
704 t
= threading
.Thread(target
=self
.f
,
705 args
=(cond
, sleeping
, woken
, TIMEOUT1
))
709 # wait for them all to sleep
713 # check they have all timed out
716 self
.assertReturnsIfImplemented(0, get_value
, woken
)
718 # check state is not mucked up
719 self
.check_invariant(cond
)
721 # start some more threads/processes
723 p
= self
.Process(target
=self
.f
, args
=(cond
, sleeping
, woken
))
727 t
= threading
.Thread(target
=self
.f
, args
=(cond
, sleeping
, woken
))
731 # wait for them to all sleep
735 # check no process/thread has woken up
737 self
.assertReturnsIfImplemented(0, get_value
, woken
)
744 # check they have all woken
746 self
.assertReturnsIfImplemented(6, get_value
, woken
)
748 # check state is not mucked up
749 self
.check_invariant(cond
)
751 def test_timeout(self
):
752 cond
= self
.Condition()
753 wait
= TimingWrapper(cond
.wait
)
757 self
.assertEqual(res
, None)
758 self
.assertTimingAlmostEqual(wait
.elapsed
, TIMEOUT1
)
761 class _TestEvent(BaseTestCase
):
763 def _test_event(self
, event
):
767 def test_event(self
):
769 wait
= TimingWrapper(event
.wait
)
771 # Removed temporaily, due to API shear, this does not
772 # work with threading._Event objects. is_set == isSet
773 self
.assertEqual(event
.is_set(), False)
775 # Removed, threading.Event.wait() will return the value of the __flag
776 # instead of None. API Shear with the semaphore backed mp.Event
777 self
.assertEqual(wait(0.0), False)
778 self
.assertTimingAlmostEqual(wait
.elapsed
, 0.0)
779 self
.assertEqual(wait(TIMEOUT1
), False)
780 self
.assertTimingAlmostEqual(wait
.elapsed
, TIMEOUT1
)
784 # See note above on the API differences
785 self
.assertEqual(event
.is_set(), True)
786 self
.assertEqual(wait(), True)
787 self
.assertTimingAlmostEqual(wait
.elapsed
, 0.0)
788 self
.assertEqual(wait(TIMEOUT1
), True)
789 self
.assertTimingAlmostEqual(wait
.elapsed
, 0.0)
790 # self.assertEqual(event.is_set(), True)
794 #self.assertEqual(event.is_set(), False)
796 self
.Process(target
=self
._test
_event
, args
=(event
,)).start()
797 self
.assertEqual(wait(), True)
803 class _TestValue(BaseTestCase
):
805 ALLOWED_TYPES
= ('processes',)
811 ('c', latin('x'), latin('y'))
814 def _test(self
, values
):
815 for sv
, cv
in zip(values
, self
.codes_values
):
819 @unittest.skipIf(c_int
is None, "requires _ctypes")
820 def test_value(self
, raw
=False):
822 values
= [self
.RawValue(code
, value
)
823 for code
, value
, _
in self
.codes_values
]
825 values
= [self
.Value(code
, value
)
826 for code
, value
, _
in self
.codes_values
]
828 for sv
, cv
in zip(values
, self
.codes_values
):
829 self
.assertEqual(sv
.value
, cv
[1])
831 proc
= self
.Process(target
=self
._test
, args
=(values
,))
835 for sv
, cv
in zip(values
, self
.codes_values
):
836 self
.assertEqual(sv
.value
, cv
[2])
838 @unittest.skipIf(c_int
is None, "requires _ctypes")
839 def test_rawvalue(self
):
840 self
.test_value(raw
=True)
842 @unittest.skipIf(c_int
is None, "requires _ctypes")
843 def test_getobj_getlock(self
):
844 val1
= self
.Value('i', 5)
845 lock1
= val1
.get_lock()
846 obj1
= val1
.get_obj()
848 val2
= self
.Value('i', 5, lock
=None)
849 lock2
= val2
.get_lock()
850 obj2
= val2
.get_obj()
853 val3
= self
.Value('i', 5, lock
=lock
)
854 lock3
= val3
.get_lock()
855 obj3
= val3
.get_obj()
856 self
.assertEqual(lock
, lock3
)
858 arr4
= self
.Value('i', 5, lock
=False)
859 self
.assertFalse(hasattr(arr4
, 'get_lock'))
860 self
.assertFalse(hasattr(arr4
, 'get_obj'))
862 self
.assertRaises(AttributeError, self
.Value
, 'i', 5, lock
='navalue')
864 arr5
= self
.RawValue('i', 5)
865 self
.assertFalse(hasattr(arr5
, 'get_lock'))
866 self
.assertFalse(hasattr(arr5
, 'get_obj'))
869 class _TestArray(BaseTestCase
):
871 ALLOWED_TYPES
= ('processes',)
874 for i
in range(1, len(seq
)):
877 @unittest.skipIf(c_int
is None, "requires _ctypes")
878 def test_array(self
, raw
=False):
879 seq
= [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
881 arr
= self
.RawArray('i', seq
)
883 arr
= self
.Array('i', seq
)
885 self
.assertEqual(len(arr
), len(seq
))
886 self
.assertEqual(arr
[3], seq
[3])
887 self
.assertEqual(list(arr
[2:7]), list(seq
[2:7]))
889 arr
[4:8] = seq
[4:8] = array
.array('i', [1, 2, 3, 4])
891 self
.assertEqual(list(arr
[:]), seq
)
895 p
= self
.Process(target
=self
.f
, args
=(arr
,))
899 self
.assertEqual(list(arr
[:]), seq
)
901 @unittest.skipIf(c_int
is None, "requires _ctypes")
902 def test_rawarray(self
):
903 self
.test_array(raw
=True)
905 @unittest.skipIf(c_int
is None, "requires _ctypes")
906 def test_getobj_getlock_obj(self
):
907 arr1
= self
.Array('i', range(10))
908 lock1
= arr1
.get_lock()
909 obj1
= arr1
.get_obj()
911 arr2
= self
.Array('i', range(10), lock
=None)
912 lock2
= arr2
.get_lock()
913 obj2
= arr2
.get_obj()
916 arr3
= self
.Array('i', range(10), lock
=lock
)
917 lock3
= arr3
.get_lock()
918 obj3
= arr3
.get_obj()
919 self
.assertEqual(lock
, lock3
)
921 arr4
= self
.Array('i', range(10), lock
=False)
922 self
.assertFalse(hasattr(arr4
, 'get_lock'))
923 self
.assertFalse(hasattr(arr4
, 'get_obj'))
924 self
.assertRaises(AttributeError,
925 self
.Array
, 'i', range(10), lock
='notalock')
927 arr5
= self
.RawArray('i', range(10))
928 self
.assertFalse(hasattr(arr5
, 'get_lock'))
929 self
.assertFalse(hasattr(arr5
, 'get_obj'))
935 class _TestContainers(BaseTestCase
):
937 ALLOWED_TYPES
= ('manager',)
940 a
= self
.list(range(10))
941 self
.assertEqual(a
[:], range(10))
944 self
.assertEqual(b
[:], [])
947 self
.assertEqual(b
[:], range(5))
949 self
.assertEqual(b
[2], 2)
950 self
.assertEqual(b
[2:10], [2,3,4])
953 self
.assertEqual(b
[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
955 self
.assertEqual(b
+ [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
957 self
.assertEqual(a
[:], range(10))
963 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
968 self
.assertEqual(f
[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
972 indices
= range(65, 70)
975 self
.assertEqual(d
.copy(), dict((i
, chr(i
)) for i
in indices
))
976 self
.assertEqual(sorted(d
.keys()), indices
)
977 self
.assertEqual(sorted(d
.values()), [chr(i
) for i
in indices
])
978 self
.assertEqual(sorted(d
.items()), [(i
, chr(i
)) for i
in indices
])
980 def test_namespace(self
):
985 self
.assertEqual((n
.name
, n
.job
), ('Bob', 'Builder'))
987 self
.assertEqual(str(n
), "Namespace(name='Bob')")
988 self
.assertTrue(hasattr(n
, 'name'))
989 self
.assertTrue(not hasattr(n
, 'job'))
995 def sqr(x
, wait
=0.0):
998 class _TestPool(BaseTestCase
):
1000 def test_apply(self
):
1001 papply
= self
.pool
.apply
1002 self
.assertEqual(papply(sqr
, (5,)), sqr(5))
1003 self
.assertEqual(papply(sqr
, (), {'x':3}), sqr(x
=3))
1006 pmap
= self
.pool
.map
1007 self
.assertEqual(pmap(sqr
, range(10)), map(sqr
, range(10)))
1008 self
.assertEqual(pmap(sqr
, range(100), chunksize
=20),
1009 map(sqr
, range(100)))
1011 def test_map_chunksize(self
):
1013 self
.pool
.map_async(sqr
, [], chunksize
=1).get(timeout
=TIMEOUT1
)
1014 except multiprocessing
.TimeoutError
:
1015 self
.fail("pool.map_async with chunksize stalled on null list")
1017 def test_async(self
):
1018 res
= self
.pool
.apply_async(sqr
, (7, TIMEOUT1
,))
1019 get
= TimingWrapper(res
.get
)
1020 self
.assertEqual(get(), 49)
1021 self
.assertTimingAlmostEqual(get
.elapsed
, TIMEOUT1
)
1023 def test_async_timeout(self
):
1024 res
= self
.pool
.apply_async(sqr
, (6, TIMEOUT2
+ 0.2))
1025 get
= TimingWrapper(res
.get
)
1026 self
.assertRaises(multiprocessing
.TimeoutError
, get
, timeout
=TIMEOUT2
)
1027 self
.assertTimingAlmostEqual(get
.elapsed
, TIMEOUT2
)
1029 def test_imap(self
):
1030 it
= self
.pool
.imap(sqr
, range(10))
1031 self
.assertEqual(list(it
), map(sqr
, range(10)))
1033 it
= self
.pool
.imap(sqr
, range(10))
1035 self
.assertEqual(it
.next(), i
*i
)
1036 self
.assertRaises(StopIteration, it
.next
)
1038 it
= self
.pool
.imap(sqr
, range(1000), chunksize
=100)
1039 for i
in range(1000):
1040 self
.assertEqual(it
.next(), i
*i
)
1041 self
.assertRaises(StopIteration, it
.next
)
1043 def test_imap_unordered(self
):
1044 it
= self
.pool
.imap_unordered(sqr
, range(1000))
1045 self
.assertEqual(sorted(it
), map(sqr
, range(1000)))
1047 it
= self
.pool
.imap_unordered(sqr
, range(1000), chunksize
=53)
1048 self
.assertEqual(sorted(it
), map(sqr
, range(1000)))
1050 def test_make_pool(self
):
1051 p
= multiprocessing
.Pool(3)
1052 self
.assertEqual(3, len(p
._pool
))
1056 def test_terminate(self
):
1057 if self
.TYPE
== 'manager':
1058 # On Unix a forked process increfs each shared object to
1059 # which its parent process held a reference. If the
1060 # forked process gets terminated then there is likely to
1061 # be a reference leak. So to prevent
1062 # _TestZZZNumberOfObjects from failing we skip this test
1063 # when using a manager.
1066 result
= self
.pool
.map_async(
1067 time
.sleep
, [0.1 for i
in range(10000)], chunksize
=1
1069 self
.pool
.terminate()
1070 join
= TimingWrapper(self
.pool
.join
)
1072 self
.assertTrue(join
.elapsed
< 0.2)
1074 class _TestPoolWorkerLifetime(BaseTestCase
):
1076 ALLOWED_TYPES
= ('processes', )
1077 def test_pool_worker_lifetime(self
):
1078 p
= multiprocessing
.Pool(3, maxtasksperchild
=10)
1079 self
.assertEqual(3, len(p
._pool
))
1080 origworkerpids
= [w
.pid
for w
in p
._pool
]
1081 # Run many tasks so each worker gets replaced (hopefully)
1083 for i
in range(100):
1084 results
.append(p
.apply_async(sqr
, (i
, )))
1085 # Fetch the results and verify we got the right answers,
1086 # also ensuring all the tasks have completed.
1087 for (j
, res
) in enumerate(results
):
1088 self
.assertEqual(res
.get(), sqr(j
))
1090 p
._repopulate
_pool
()
1091 # Wait until all workers are alive
1093 while countdown
and not all(w
.is_alive() for w
in p
._pool
):
1096 finalworkerpids
= [w
.pid
for w
in p
._pool
]
1097 # All pids should be assigned. See issue #7805.
1098 self
.assertNotIn(None, origworkerpids
)
1099 self
.assertNotIn(None, finalworkerpids
)
1100 # Finally, check that the worker pids have changed
1101 self
.assertNotEqual(sorted(origworkerpids
), sorted(finalworkerpids
))
1106 # Test that manager has expected number of shared objects left
1109 class _TestZZZNumberOfObjects(BaseTestCase
):
1110 # Because test cases are sorted alphabetically, this one will get
1111 # run after all the other tests for the manager. It tests that
1112 # there have been no "reference leaks" for the manager's shared
1113 # objects. Note the comment in _TestPool.test_terminate().
1114 ALLOWED_TYPES
= ('manager',)
1116 def test_number_of_objects(self
):
1117 EXPECTED_NUMBER
= 1 # the pool object is still alive
1118 multiprocessing
.active_children() # discard dead process objs
1119 gc
.collect() # do garbage collection
1120 refs
= self
.manager
._number
_of
_objects
()
1121 debug_info
= self
.manager
._debug
_info
()
1122 if refs
!= EXPECTED_NUMBER
:
1123 print self
.manager
._debug
_info
()
1126 self
.assertEqual(refs
, EXPECTED_NUMBER
)
1129 # Test of creating a customized manager class
1132 from multiprocessing
.managers
import BaseManager
, BaseProxy
, RemoteError
1134 class FooBar(object):
1143 for i
in xrange(10):
1146 class IteratorProxy(BaseProxy
):
1147 _exposed_
= ('next', '__next__')
1151 return self
._callmethod
('next')
1153 return self
._callmethod
('__next__')
1155 class MyManager(BaseManager
):
1158 MyManager
.register('Foo', callable=FooBar
)
1159 MyManager
.register('Bar', callable=FooBar
, exposed
=('f', '_h'))
1160 MyManager
.register('baz', callable=baz
, proxytype
=IteratorProxy
)
1163 class _TestMyManager(BaseTestCase
):
1165 ALLOWED_TYPES
= ('manager',)
1167 def test_mymanager(self
):
1168 manager
= MyManager()
1175 foo_methods
= [name
for name
in ('f', 'g', '_h') if hasattr(foo
, name
)]
1176 bar_methods
= [name
for name
in ('f', 'g', '_h') if hasattr(bar
, name
)]
1178 self
.assertEqual(foo_methods
, ['f', 'g'])
1179 self
.assertEqual(bar_methods
, ['f', '_h'])
1181 self
.assertEqual(foo
.f(), 'f()')
1182 self
.assertRaises(ValueError, foo
.g
)
1183 self
.assertEqual(foo
._callmethod
('f'), 'f()')
1184 self
.assertRaises(RemoteError
, foo
._callmethod
, '_h')
1186 self
.assertEqual(bar
.f(), 'f()')
1187 self
.assertEqual(bar
._h
(), '_h()')
1188 self
.assertEqual(bar
._callmethod
('f'), 'f()')
1189 self
.assertEqual(bar
._callmethod
('_h'), '_h()')
1191 self
.assertEqual(list(baz
), [i
*i
for i
in range(10)])
1196 # Test of connecting to a remote server and using xmlrpclib for serialization
1199 _queue
= Queue
.Queue()
1203 class QueueManager(BaseManager
):
1204 '''manager class used by server process'''
1205 QueueManager
.register('get_queue', callable=get_queue
)
1207 class QueueManager2(BaseManager
):
1208 '''manager class which specifies the same interface as QueueManager'''
1209 QueueManager2
.register('get_queue')
1212 SERIALIZER
= 'xmlrpclib'
1214 class _TestRemoteManager(BaseTestCase
):
1216 ALLOWED_TYPES
= ('manager',)
1218 def _putter(self
, address
, authkey
):
1219 manager
= QueueManager2(
1220 address
=address
, authkey
=authkey
, serializer
=SERIALIZER
1223 queue
= manager
.get_queue()
1224 queue
.put(('hello world', None, True, 2.25))
1226 def test_remote(self
):
1227 authkey
= os
.urandom(32)
1229 manager
= QueueManager(
1230 address
=('localhost', 0), authkey
=authkey
, serializer
=SERIALIZER
1234 p
= self
.Process(target
=self
._putter
, args
=(manager
.address
, authkey
))
1237 manager2
= QueueManager2(
1238 address
=manager
.address
, authkey
=authkey
, serializer
=SERIALIZER
1241 queue
= manager2
.get_queue()
1243 # Note that xmlrpclib will deserialize object as a list not a tuple
1244 self
.assertEqual(queue
.get(), ['hello world', None, True, 2.25])
1246 # Because we are using xmlrpclib for serialization instead of
1247 # pickle this will cause a serialization error.
1248 self
.assertRaises(Exception, queue
.put
, time
.sleep
)
1250 # Make queue finalizer run before the server is stopped
1254 class _TestManagerRestart(BaseTestCase
):
1256 def _putter(self
, address
, authkey
):
1257 manager
= QueueManager(
1258 address
=address
, authkey
=authkey
, serializer
=SERIALIZER
)
1260 queue
= manager
.get_queue()
1261 queue
.put('hello world')
1263 def test_rapid_restart(self
):
1264 authkey
= os
.urandom(32)
1265 manager
= QueueManager(
1266 address
=('localhost', 0), authkey
=authkey
, serializer
=SERIALIZER
)
1267 addr
= manager
.get_server().address
1270 p
= self
.Process(target
=self
._putter
, args
=(manager
.address
, authkey
))
1272 queue
= manager
.get_queue()
1273 self
.assertEqual(queue
.get(), 'hello world')
1276 manager
= QueueManager(
1277 address
=addr
, authkey
=authkey
, serializer
=SERIALIZER
)
1285 SENTINEL
= latin('')
1287 class _TestConnection(BaseTestCase
):
1289 ALLOWED_TYPES
= ('processes', 'threads')
1291 def _echo(self
, conn
):
1292 for msg
in iter(conn
.recv_bytes
, SENTINEL
):
1293 conn
.send_bytes(msg
)
1296 def test_connection(self
):
1297 conn
, child_conn
= self
.Pipe()
1299 p
= self
.Process(target
=self
._echo
, args
=(child_conn
,))
1303 seq
= [1, 2.25, None]
1304 msg
= latin('hello world')
1306 arr
= array
.array('i', range(4))
1308 if self
.TYPE
== 'processes':
1309 self
.assertEqual(type(conn
.fileno()), int)
1311 self
.assertEqual(conn
.send(seq
), None)
1312 self
.assertEqual(conn
.recv(), seq
)
1314 self
.assertEqual(conn
.send_bytes(msg
), None)
1315 self
.assertEqual(conn
.recv_bytes(), msg
)
1317 if self
.TYPE
== 'processes':
1318 buffer = array
.array('i', [0]*10)
1319 expected
= list(arr
) + [0] * (10 - len(arr
))
1320 self
.assertEqual(conn
.send_bytes(arr
), None)
1321 self
.assertEqual(conn
.recv_bytes_into(buffer),
1322 len(arr
) * buffer.itemsize
)
1323 self
.assertEqual(list(buffer), expected
)
1325 buffer = array
.array('i', [0]*10)
1326 expected
= [0] * 3 + list(arr
) + [0] * (10 - 3 - len(arr
))
1327 self
.assertEqual(conn
.send_bytes(arr
), None)
1328 self
.assertEqual(conn
.recv_bytes_into(buffer, 3 * buffer.itemsize
),
1329 len(arr
) * buffer.itemsize
)
1330 self
.assertEqual(list(buffer), expected
)
1332 buffer = bytearray(latin(' ' * 40))
1333 self
.assertEqual(conn
.send_bytes(longmsg
), None)
1335 res
= conn
.recv_bytes_into(buffer)
1336 except multiprocessing
.BufferTooShort
, e
:
1337 self
.assertEqual(e
.args
, (longmsg
,))
1339 self
.fail('expected BufferTooShort, got %s' % res
)
1341 poll
= TimingWrapper(conn
.poll
)
1343 self
.assertEqual(poll(), False)
1344 self
.assertTimingAlmostEqual(poll
.elapsed
, 0)
1346 self
.assertEqual(poll(TIMEOUT1
), False)
1347 self
.assertTimingAlmostEqual(poll
.elapsed
, TIMEOUT1
)
1351 self
.assertEqual(poll(TIMEOUT1
), True)
1352 self
.assertTimingAlmostEqual(poll
.elapsed
, 0)
1354 self
.assertEqual(conn
.recv(), None)
1356 really_big_msg
= latin('X') * (1024 * 1024 * 16) # 16Mb
1357 conn
.send_bytes(really_big_msg
)
1358 self
.assertEqual(conn
.recv_bytes(), really_big_msg
)
1360 conn
.send_bytes(SENTINEL
) # tell child to quit
1363 if self
.TYPE
== 'processes':
1364 self
.assertEqual(conn
.readable
, True)
1365 self
.assertEqual(conn
.writable
, True)
1366 self
.assertRaises(EOFError, conn
.recv
)
1367 self
.assertRaises(EOFError, conn
.recv_bytes
)
1371 def test_duplex_false(self
):
1372 reader
, writer
= self
.Pipe(duplex
=False)
1373 self
.assertEqual(writer
.send(1), None)
1374 self
.assertEqual(reader
.recv(), 1)
1375 if self
.TYPE
== 'processes':
1376 self
.assertEqual(reader
.readable
, True)
1377 self
.assertEqual(reader
.writable
, False)
1378 self
.assertEqual(writer
.readable
, False)
1379 self
.assertEqual(writer
.writable
, True)
1380 self
.assertRaises(IOError, reader
.send
, 2)
1381 self
.assertRaises(IOError, writer
.recv
)
1382 self
.assertRaises(IOError, writer
.poll
)
1384 def test_spawn_close(self
):
1385 # We test that a pipe connection can be closed by parent
1386 # process immediately after child is spawned. On Windows this
1387 # would have sometimes failed on old versions because
1388 # child_conn would be closed before the child got a chance to
1390 conn
, child_conn
= self
.Pipe()
1392 p
= self
.Process(target
=self
._echo
, args
=(child_conn
,))
1394 child_conn
.close() # this might complete before child initializes
1396 msg
= latin('hello')
1397 conn
.send_bytes(msg
)
1398 self
.assertEqual(conn
.recv_bytes(), msg
)
1400 conn
.send_bytes(SENTINEL
)
1404 def test_sendbytes(self
):
1405 if self
.TYPE
!= 'processes':
1408 msg
= latin('abcdefghijklmnopqrstuvwxyz')
1412 self
.assertEqual(b
.recv_bytes(), msg
)
1414 a
.send_bytes(msg
, 5)
1415 self
.assertEqual(b
.recv_bytes(), msg
[5:])
1417 a
.send_bytes(msg
, 7, 8)
1418 self
.assertEqual(b
.recv_bytes(), msg
[7:7+8])
1420 a
.send_bytes(msg
, 26)
1421 self
.assertEqual(b
.recv_bytes(), latin(''))
1423 a
.send_bytes(msg
, 26, 0)
1424 self
.assertEqual(b
.recv_bytes(), latin(''))
1426 self
.assertRaises(ValueError, a
.send_bytes
, msg
, 27)
1428 self
.assertRaises(ValueError, a
.send_bytes
, msg
, 22, 5)
1430 self
.assertRaises(ValueError, a
.send_bytes
, msg
, 26, 1)
1432 self
.assertRaises(ValueError, a
.send_bytes
, msg
, -1)
1434 self
.assertRaises(ValueError, a
.send_bytes
, msg
, 4, -1)
1436 class _TestListenerClient(BaseTestCase
):
1438 ALLOWED_TYPES
= ('processes', 'threads')
1440 def _test(self
, address
):
1441 conn
= self
.connection
.Client(address
)
1445 def test_listener_client(self
):
1446 for family
in self
.connection
.families
:
1447 l
= self
.connection
.Listener(family
=family
)
1448 p
= self
.Process(target
=self
._test
, args
=(l
.address
,))
1452 self
.assertEqual(conn
.recv(), 'hello')
1456 # Test of sending connection and socket objects between processes
1459 class _TestPicklingConnections(BaseTestCase):
1461 ALLOWED_TYPES = ('processes',)
1463 def _listener(self, conn, families):
1464 for fam in families:
1465 l = self.connection.Listener(family=fam)
1466 conn.send(l.address)
1467 new_conn = l.accept()
1470 if self.TYPE == 'processes':
1472 l.bind(('localhost', 0))
1473 conn.send(l.getsockname())
1475 new_conn, addr = l.accept()
1480 def _remote(self, conn):
1481 for (address, msg) in iter(conn.recv, None):
1482 client = self.connection.Client(address)
1483 client.send(msg.upper())
1486 if self.TYPE == 'processes':
1487 address, msg = conn.recv()
1488 client = socket.socket()
1489 client.connect(address)
1490 client.sendall(msg.upper())
1495 def test_pickling(self):
1497 multiprocessing.allow_connection_pickling()
1501 families = self.connection.families
1503 lconn, lconn0 = self.Pipe()
1504 lp = self.Process(target=self._listener, args=(lconn0, families))
1508 rconn, rconn0 = self.Pipe()
1509 rp = self.Process(target=self._remote, args=(rconn0,))
1513 for fam in families:
1514 msg = ('This connection uses family %s' % fam).encode('ascii')
1515 address = lconn.recv()
1516 rconn.send((address, msg))
1517 new_conn = lconn.recv()
1518 self.assertEqual(new_conn.recv(), msg.upper())
1522 if self.TYPE == 'processes':
1523 msg = latin('This connection uses a normal socket')
1524 address = lconn.recv()
1525 rconn.send((address, msg))
1526 if hasattr(socket, 'fromfd'):
1527 new_conn = lconn.recv()
1528 self.assertEqual(new_conn.recv(100), msg.upper())
1530 # XXX On Windows with Py2.6 need to backport fromfd()
1531 discard = lconn.recv_bytes()
1545 class _TestHeap(BaseTestCase
):
1547 ALLOWED_TYPES
= ('processes',)
1549 def test_heap(self
):
1554 # create and destroy lots of blocks of different sizes
1555 for i
in xrange(iterations
):
1556 size
= int(random
.lognormvariate(0, 1) * 1000)
1557 b
= multiprocessing
.heap
.BufferWrapper(size
)
1559 if len(blocks
) > maxblocks
:
1560 i
= random
.randrange(maxblocks
)
1563 # get the heap object
1564 heap
= multiprocessing
.heap
.BufferWrapper
._heap
1566 # verify the state of the heap
1569 for L
in heap
._len
_to
_seq
.values():
1570 for arena
, start
, stop
in L
:
1571 all
.append((heap
._arenas
.index(arena
), start
, stop
,
1572 stop
-start
, 'free'))
1573 for arena
, start
, stop
in heap
._allocated
_blocks
:
1574 all
.append((heap
._arenas
.index(arena
), start
, stop
,
1575 stop
-start
, 'occupied'))
1576 occupied
+= (stop
-start
)
1580 for i
in range(len(all
)-1):
1581 (arena
, start
, stop
) = all
[i
][:3]
1582 (narena
, nstart
, nstop
) = all
[i
+1][:3]
1583 self
.assertTrue((arena
!= narena
and nstart
== 0) or
1590 class _Foo(Structure
):
1596 class _TestSharedCTypes(BaseTestCase
):
1598 ALLOWED_TYPES
= ('processes',)
1600 def _double(self
, x
, y
, foo
, arr
, string
):
1606 for i
in range(len(arr
)):
1609 @unittest.skipIf(Value
is None, "requires ctypes.Value")
1610 def test_sharedctypes(self
, lock
=False):
1611 x
= Value('i', 7, lock
=lock
)
1612 y
= Value(c_double
, 1.0/3.0, lock
=lock
)
1613 foo
= Value(_Foo
, 3, 2, lock
=lock
)
1614 arr
= self
.Array('d', range(10), lock
=lock
)
1615 string
= self
.Array('c', 20, lock
=lock
)
1616 string
.value
= 'hello'
1618 p
= self
.Process(target
=self
._double
, args
=(x
, y
, foo
, arr
, string
))
1622 self
.assertEqual(x
.value
, 14)
1623 self
.assertAlmostEqual(y
.value
, 2.0/3.0)
1624 self
.assertEqual(foo
.x
, 6)
1625 self
.assertAlmostEqual(foo
.y
, 4.0)
1627 self
.assertAlmostEqual(arr
[i
], i
*2)
1628 self
.assertEqual(string
.value
, latin('hellohello'))
1630 @unittest.skipIf(Value
is None, "requires ctypes.Value")
1631 def test_synchronize(self
):
1632 self
.test_sharedctypes(lock
=True)
1634 @unittest.skipIf(ctypes_copy
is None, "requires ctypes.copy")
1635 def test_copy(self
):
1637 bar
= ctypes_copy(foo
)
1640 self
.assertEqual(bar
.x
, 2)
1641 self
.assertAlmostEqual(bar
.y
, 5.0)
1647 class _TestFinalize(BaseTestCase
):
1649 ALLOWED_TYPES
= ('processes',)
1651 def _test_finalize(self
, conn
):
1656 util
.Finalize(a
, conn
.send
, args
=('a',))
1657 del a
# triggers callback for a
1660 close_b
= util
.Finalize(b
, conn
.send
, args
=('b',))
1661 close_b() # triggers callback for b
1662 close_b() # does nothing because callback has already been called
1663 del b
# does nothing because callback has already been called
1666 util
.Finalize(c
, conn
.send
, args
=('c',))
1669 util
.Finalize(d10
, conn
.send
, args
=('d10',), exitpriority
=1)
1672 util
.Finalize(d01
, conn
.send
, args
=('d01',), exitpriority
=0)
1674 util
.Finalize(d02
, conn
.send
, args
=('d02',), exitpriority
=0)
1676 util
.Finalize(d03
, conn
.send
, args
=('d03',), exitpriority
=0)
1678 util
.Finalize(None, conn
.send
, args
=('e',), exitpriority
=-10)
1680 util
.Finalize(None, conn
.send
, args
=('STOP',), exitpriority
=-100)
1682 # call mutliprocessing's cleanup function then exit process without
1683 # garbage collecting locals
1684 util
._exit
_function
()
1688 def test_finalize(self
):
1689 conn
, child_conn
= self
.Pipe()
1691 p
= self
.Process(target
=self
._test
_finalize
, args
=(child_conn
,))
1695 result
= [obj
for obj
in iter(conn
.recv
, 'STOP')]
1696 self
.assertEqual(result
, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1699 # Test that from ... import * works for each module
1702 class _TestImportStar(BaseTestCase
):
1704 ALLOWED_TYPES
= ('processes',)
1706 def test_import(self
):
1708 'multiprocessing', 'multiprocessing.connection',
1709 'multiprocessing.heap', 'multiprocessing.managers',
1710 'multiprocessing.pool', 'multiprocessing.process',
1711 'multiprocessing.reduction',
1712 'multiprocessing.synchronize', 'multiprocessing.util'
1715 if c_int
is not None:
1716 # This module requires _ctypes
1717 modules
.append('multiprocessing.sharedctypes')
1719 for name
in modules
:
1721 mod
= sys
.modules
[name
]
1723 for attr
in getattr(mod
, '__all__', ()):
1726 '%r does not have attribute %r' % (mod
, attr
)
1730 # Quick test that logging works -- does not test logging output
1733 class _TestLogging(BaseTestCase
):
1735 ALLOWED_TYPES
= ('processes',)
1737 def test_enable_logging(self
):
1738 logger
= multiprocessing
.get_logger()
1739 logger
.setLevel(util
.SUBWARNING
)
1740 self
.assertTrue(logger
is not None)
1741 logger
.debug('this will not be printed')
1742 logger
.info('nor will this')
1743 logger
.setLevel(LOG_LEVEL
)
1745 def _test_level(self
, conn
):
1746 logger
= multiprocessing
.get_logger()
1747 conn
.send(logger
.getEffectiveLevel())
1749 def test_level(self
):
1753 logger
= multiprocessing
.get_logger()
1754 root_logger
= logging
.getLogger()
1755 root_level
= root_logger
.level
1757 reader
, writer
= multiprocessing
.Pipe(duplex
=False)
1759 logger
.setLevel(LEVEL1
)
1760 self
.Process(target
=self
._test
_level
, args
=(writer
,)).start()
1761 self
.assertEqual(LEVEL1
, reader
.recv())
1763 logger
.setLevel(logging
.NOTSET
)
1764 root_logger
.setLevel(LEVEL2
)
1765 self
.Process(target
=self
._test
_level
, args
=(writer
,)).start()
1766 self
.assertEqual(LEVEL2
, reader
.recv())
1768 root_logger
.setLevel(root_level
)
1769 logger
.setLevel(level
=LOG_LEVEL
)
1772 # class _TestLoggingProcessName(BaseTestCase):
1774 # def handle(self, record):
1775 # assert record.processName == multiprocessing.current_process().name
1776 # self.__handled = True
1778 # def test_logging(self):
1779 # handler = logging.Handler()
1780 # handler.handle = self.handle
1781 # self.__handled = False
1782 # # Bypass getLogger() and side-effects
1783 # logger = logging.getLoggerClass()(
1784 # 'multiprocessing.test.TestLoggingProcessName')
1785 # logger.addHandler(handler)
1786 # logger.propagate = False
1788 # logger.warn('foo')
1789 # assert self.__handled
1792 # Test to verify handle verification, see issue 3321
1795 class TestInvalidHandle(unittest
.TestCase
):
1797 @unittest.skipIf(WIN32
, "skipped on Windows")
1798 def test_invalid_handles(self
):
1799 conn
= _multiprocessing
.Connection(44977608)
1800 self
.assertRaises(IOError, conn
.poll
)
1801 self
.assertRaises(IOError, _multiprocessing
.Connection
, -1)
1804 # Functions used to create test cases from the base ones in this module
1807 def get_attributes(Source
, names
):
1810 obj
= getattr(Source
, name
)
1811 if type(obj
) == type(get_attributes
):
1812 obj
= staticmethod(obj
)
1816 def create_test_cases(Mixin
, type):
1819 Type
= type.capitalize()
1821 for name
in glob
.keys():
1822 if name
.startswith('_Test'):
1824 if type in base
.ALLOWED_TYPES
:
1825 newname
= 'With' + Type
+ name
[1:]
1826 class Temp(base
, unittest
.TestCase
, Mixin
):
1828 result
[newname
] = Temp
1829 Temp
.__name
__ = newname
1830 Temp
.__module
__ = Mixin
.__module
__
1837 class ProcessesMixin(object):
1839 Process
= multiprocessing
.Process
1840 locals().update(get_attributes(multiprocessing
, (
1841 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1842 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1843 'RawArray', 'current_process', 'active_children', 'Pipe',
1844 'connection', 'JoinableQueue'
1847 testcases_processes
= create_test_cases(ProcessesMixin
, type='processes')
1848 globals().update(testcases_processes
)
1851 class ManagerMixin(object):
1853 Process
= multiprocessing
.Process
1854 manager
= object.__new
__(multiprocessing
.managers
.SyncManager
)
1855 locals().update(get_attributes(manager
, (
1856 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1857 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1858 'Namespace', 'JoinableQueue'
1861 testcases_manager
= create_test_cases(ManagerMixin
, type='manager')
1862 globals().update(testcases_manager
)
1865 class ThreadsMixin(object):
1867 Process
= multiprocessing
.dummy
.Process
1868 locals().update(get_attributes(multiprocessing
.dummy
, (
1869 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1870 'Condition', 'Event', 'Value', 'Array', 'current_process',
1871 'active_children', 'Pipe', 'connection', 'dict', 'list',
1872 'Namespace', 'JoinableQueue'
1875 testcases_threads
= create_test_cases(ThreadsMixin
, type='threads')
1876 globals().update(testcases_threads
)
1878 class OtherTest(unittest
.TestCase
):
1879 # TODO: add more tests for deliver/answer challenge.
1880 def test_deliver_challenge_auth_failure(self
):
1881 class _FakeConnection(object):
1882 def recv_bytes(self
, size
):
1883 return b
'something bogus'
1884 def send_bytes(self
, data
):
1886 self
.assertRaises(multiprocessing
.AuthenticationError
,
1887 multiprocessing
.connection
.deliver_challenge
,
1888 _FakeConnection(), b
'abc')
1890 def test_answer_challenge_auth_failure(self
):
1891 class _FakeConnection(object):
1894 def recv_bytes(self
, size
):
1897 return multiprocessing
.connection
.CHALLENGE
1898 elif self
.count
== 2:
1899 return b
'something bogus'
1901 def send_bytes(self
, data
):
1903 self
.assertRaises(multiprocessing
.AuthenticationError
,
1904 multiprocessing
.connection
.answer_challenge
,
1905 _FakeConnection(), b
'abc')
1908 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1911 def initializer(ns
):
1914 class TestInitializers(unittest
.TestCase
):
1916 self
.mgr
= multiprocessing
.Manager()
1917 self
.ns
= self
.mgr
.Namespace()
1923 def test_manager_initializer(self
):
1924 m
= multiprocessing
.managers
.SyncManager()
1925 self
.assertRaises(TypeError, m
.start
, 1)
1926 m
.start(initializer
, (self
.ns
,))
1927 self
.assertEqual(self
.ns
.test
, 1)
1930 def test_pool_initializer(self
):
1931 self
.assertRaises(TypeError, multiprocessing
.Pool
, initializer
=1)
1932 p
= multiprocessing
.Pool(1, initializer
, (self
.ns
,))
1935 self
.assertEqual(self
.ns
.test
, 1)
1938 # Issue 5155, 5313, 5331: Test process in processes
1939 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1942 def _ThisSubProcess(q
):
1944 item
= q
.get(block
=False)
1948 def _TestProcess(q
):
1949 queue
= multiprocessing
.Queue()
1950 subProc
= multiprocessing
.Process(target
=_ThisSubProcess
, args
=(queue
,))
1957 def pool_in_process():
1958 pool
= multiprocessing
.Pool(processes
=4)
1959 x
= pool
.map(_afunc
, [1, 2, 3, 4, 5, 6, 7])
1961 class _file_like(object):
1962 def __init__(self
, delegate
):
1963 self
._delegate
= delegate
1969 # There are no race conditions since fork keeps only the running thread
1970 if pid
!= self
._pid
:
1975 def write(self
, data
):
1976 self
.cache
.append(data
)
1979 self
._delegate
.write(''.join(self
.cache
))
1982 class TestStdinBadfiledescriptor(unittest
.TestCase
):
1984 def test_queue_in_process(self
):
1985 queue
= multiprocessing
.Queue()
1986 proc
= multiprocessing
.Process(target
=_TestProcess
, args
=(queue
,))
1990 def test_pool_in_process(self
):
1991 p
= multiprocessing
.Process(target
=pool_in_process
)
1995 def test_flushing(self
):
1997 flike
= _file_like(sio
)
1999 proc
= multiprocessing
.Process(target
=lambda: flike
.flush())
2001 assert sio
.getvalue() == 'foo'
2003 testcases_other
= [OtherTest
, TestInvalidHandle
, TestInitializers
,
2004 TestStdinBadfiledescriptor
]
2010 def test_main(run
=None):
2011 if sys
.platform
.startswith("linux"):
2013 lock
= multiprocessing
.RLock()
2015 raise unittest
.SkipTest("OSError raises on RLock creation, see issue 3111!")
2018 from test
.test_support
import run_unittest
as run
2020 util
.get_temp_dir() # creates temp directory for use by all processes
2022 multiprocessing
.get_logger().setLevel(LOG_LEVEL
)
2024 ProcessesMixin
.pool
= multiprocessing
.Pool(4)
2025 ThreadsMixin
.pool
= multiprocessing
.dummy
.Pool(4)
2026 ManagerMixin
.manager
.__init
__()
2027 ManagerMixin
.manager
.start()
2028 ManagerMixin
.pool
= ManagerMixin
.manager
.Pool(4)
2031 sorted(testcases_processes
.values(), key
=lambda tc
:tc
.__name
__) +
2032 sorted(testcases_threads
.values(), key
=lambda tc
:tc
.__name
__) +
2033 sorted(testcases_manager
.values(), key
=lambda tc
:tc
.__name
__) +
2037 loadTestsFromTestCase
= unittest
.defaultTestLoader
.loadTestsFromTestCase
2038 suite
= unittest
.TestSuite(loadTestsFromTestCase(tc
) for tc
in testcases
)
2039 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2040 # module during these tests is at least platform dependent and possibly
2041 # non-deterministic on any given platform. So we don't mind if the listed
2042 # warnings aren't actually raised.
2043 with test_support
.check_py3k_warnings(
2044 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2045 (r
"sys.exc_clear\(\) not supported", DeprecationWarning),
2049 ThreadsMixin
.pool
.terminate()
2050 ProcessesMixin
.pool
.terminate()
2051 ManagerMixin
.pool
.terminate()
2052 ManagerMixin
.manager
.shutdown()
2054 del ProcessesMixin
.pool
, ThreadsMixin
.pool
, ManagerMixin
.pool
2057 test_main(unittest
.TextTestRunner(verbosity
=2).run
)
2059 if __name__
== '__main__':