Require implementations for warnings.showwarning() support the 'line' argument.
[python.git] / Lib / test / test_multiprocessing.py
blob33debfb3ada3c2f2a3dde8d64413a10cd80b3b47
1 #!/usr/bin/env python
4 # Unit tests for the multiprocessing package
7 import unittest
8 import threading
9 import Queue
10 import time
11 import sys
12 import os
13 import gc
14 import signal
15 import array
16 import copy
17 import socket
18 import random
19 import logging
22 # Work around broken sem_open implementations
23 try:
24 import multiprocessing.synchronize
25 except ImportError, e:
26 from test.test_support import TestSkipped
27 raise TestSkipped(e)
29 import multiprocessing.dummy
30 import multiprocessing.connection
31 import multiprocessing.managers
32 import multiprocessing.heap
33 import multiprocessing.pool
34 import _multiprocessing
36 from multiprocessing import util
42 latin = str
45 # Constants
48 LOG_LEVEL = util.SUBWARNING
49 #LOG_LEVEL = logging.WARNING
51 DELTA = 0.1
52 CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56 if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58 else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61 HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
64 WIN32 = (sys.platform == "win32")
67 # Creates a wrapper for a function which records the time it takes to finish
70 class TimingWrapper(object):
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
84 # Base class for test cases
87 class BaseTestCase(object):
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
104 # Return the value of a semaphore
107 def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
120 # Testcases
123 class _TestProcess(BaseTestCase):
125 ALLOWED_TYPES = ('processes', 'threads')
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
131 current = self.current_process()
132 authkey = current.authkey
134 self.assertTrue(current.is_alive())
135 self.assertTrue(not current.daemon)
136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
138 self.assertEqual(current.ident, os.getpid())
139 self.assertEqual(current.exitcode, None)
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
145 q.put(current.name)
146 if self.TYPE != 'threads':
147 q.put(bytes(current.authkey))
148 q.put(current.pid)
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
159 p.daemon = True
160 current = self.current_process()
162 if self.TYPE != 'threads':
163 self.assertEquals(p.authkey, current.authkey)
164 self.assertEquals(p.is_alive(), False)
165 self.assertEquals(p.daemon, True)
166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
168 self.assertEqual(p.exitcode, None)
170 p.start()
172 self.assertEquals(p.exitcode, None)
173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
178 self.assertEquals(q.get(), p.name)
179 if self.TYPE != 'threads':
180 self.assertEquals(q.get(), current.authkey)
181 self.assertEquals(q.get(), p.pid)
183 p.join()
185 self.assertEquals(p.exitcode, 0)
186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
189 def _test_terminate(self):
190 time.sleep(1000)
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
196 p = self.Process(target=self._test_terminate)
197 p.daemon = True
198 p.start()
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
202 self.assertEqual(p.exitcode, None)
204 p.terminate()
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
213 p.join()
215 # XXX sometimes get p.exitcode == 0 on Windows ...
216 #self.assertEqual(p.exitcode, -signal.SIGTERM)
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
232 p.start()
233 self.assertTrue(p in self.active_children())
235 p.join()
236 self.assertTrue(p not in self.active_children())
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
246 p.start()
247 p.join()
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
258 expected = [
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
267 self.assertEqual(result, expected)
273 class _UpperCaser(multiprocessing.Process):
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
295 class _TestSubclassingProcess(BaseTestCase):
297 ALLOWED_TYPES = ('processes',)
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
311 def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
317 def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
324 class _TestQueue(BaseTestCase):
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
343 proc.daemon = True
344 proc.start()
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
365 self.assertRaises(Queue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
368 self.assertRaises(Queue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
371 self.assertRaises(Queue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
374 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
377 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
380 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
383 child_can_start.set()
384 parent_can_continue.wait()
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
389 proc.join()
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
393 #queue.put(1)
394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
409 proc.daemon = True
410 proc.start()
412 self.assertEqual(queue_empty(queue), True)
414 child_can_start.set()
415 parent_can_continue.wait()
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
427 self.assertEqual(queue_empty(queue), True)
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
432 self.assertRaises(Queue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
435 self.assertRaises(Queue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
438 self.assertRaises(Queue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
441 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
444 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
447 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
450 proc.join()
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
465 queue = self.Queue()
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(Queue.Empty, queue.get, False)
483 p.join()
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
505 def test_task_done(self):
506 queue = self.JoinableQueue()
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in xrange(4)]
514 for p in workers:
515 p.start()
517 for i in xrange(10):
518 queue.put(i)
520 queue.join()
522 for p in workers:
523 queue.put(None)
525 for p in workers:
526 p.join()
532 class _TestLock(BaseTestCase):
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
552 class _TestSemaphore(BaseTestCase):
554 def _test_semaphore(self, sem):
555 self.assertReturnsIfImplemented(2, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(1, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.acquire(False), False)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(1, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(2, get_value, sem)
567 def test_semaphore(self):
568 sem = self.Semaphore(2)
569 self._test_semaphore(sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(3, get_value, sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(4, get_value, sem)
575 def test_bounded_semaphore(self):
576 sem = self.BoundedSemaphore(2)
577 self._test_semaphore(sem)
578 # Currently fails on OS/X
579 #if HAVE_GETVALUE:
580 # self.assertRaises(ValueError, sem.release)
581 # self.assertReturnsIfImplemented(2, get_value, sem)
583 def test_timeout(self):
584 if self.TYPE != 'processes':
585 return
587 sem = self.Semaphore(0)
588 acquire = TimingWrapper(sem.acquire)
590 self.assertEqual(acquire(False), False)
591 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
593 self.assertEqual(acquire(False, None), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596 self.assertEqual(acquire(False, TIMEOUT1), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0)
599 self.assertEqual(acquire(True, TIMEOUT2), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
602 self.assertEqual(acquire(timeout=TIMEOUT3), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
606 class _TestCondition(BaseTestCase):
608 def f(self, cond, sleeping, woken, timeout=None):
609 cond.acquire()
610 sleeping.release()
611 cond.wait(timeout)
612 woken.release()
613 cond.release()
615 def check_invariant(self, cond):
616 # this is only supposed to succeed when there are no sleepers
617 if self.TYPE == 'processes':
618 try:
619 sleepers = (cond._sleeping_count.get_value() -
620 cond._woken_count.get_value())
621 self.assertEqual(sleepers, 0)
622 self.assertEqual(cond._wait_semaphore.get_value(), 0)
623 except NotImplementedError:
624 pass
626 def test_notify(self):
627 cond = self.Condition()
628 sleeping = self.Semaphore(0)
629 woken = self.Semaphore(0)
631 p = self.Process(target=self.f, args=(cond, sleeping, woken))
632 p.daemon = True
633 p.start()
635 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
636 p.daemon = True
637 p.start()
639 # wait for both children to start sleeping
640 sleeping.acquire()
641 sleeping.acquire()
643 # check no process/thread has woken up
644 time.sleep(DELTA)
645 self.assertReturnsIfImplemented(0, get_value, woken)
647 # wake up one process/thread
648 cond.acquire()
649 cond.notify()
650 cond.release()
652 # check one process/thread has woken up
653 time.sleep(DELTA)
654 self.assertReturnsIfImplemented(1, get_value, woken)
656 # wake up another
657 cond.acquire()
658 cond.notify()
659 cond.release()
661 # check other has woken up
662 time.sleep(DELTA)
663 self.assertReturnsIfImplemented(2, get_value, woken)
665 # check state is not mucked up
666 self.check_invariant(cond)
667 p.join()
669 def test_notify_all(self):
670 cond = self.Condition()
671 sleeping = self.Semaphore(0)
672 woken = self.Semaphore(0)
674 # start some threads/processes which will timeout
675 for i in range(3):
676 p = self.Process(target=self.f,
677 args=(cond, sleeping, woken, TIMEOUT1))
678 p.daemon = True
679 p.start()
681 t = threading.Thread(target=self.f,
682 args=(cond, sleeping, woken, TIMEOUT1))
683 t.daemon = True
684 t.start()
686 # wait for them all to sleep
687 for i in xrange(6):
688 sleeping.acquire()
690 # check they have all timed out
691 for i in xrange(6):
692 woken.acquire()
693 self.assertReturnsIfImplemented(0, get_value, woken)
695 # check state is not mucked up
696 self.check_invariant(cond)
698 # start some more threads/processes
699 for i in range(3):
700 p = self.Process(target=self.f, args=(cond, sleeping, woken))
701 p.daemon = True
702 p.start()
704 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
705 t.daemon = True
706 t.start()
708 # wait for them to all sleep
709 for i in xrange(6):
710 sleeping.acquire()
712 # check no process/thread has woken up
713 time.sleep(DELTA)
714 self.assertReturnsIfImplemented(0, get_value, woken)
716 # wake them all up
717 cond.acquire()
718 cond.notify_all()
719 cond.release()
721 # check they have all woken
722 time.sleep(DELTA)
723 self.assertReturnsIfImplemented(6, get_value, woken)
725 # check state is not mucked up
726 self.check_invariant(cond)
728 def test_timeout(self):
729 cond = self.Condition()
730 wait = TimingWrapper(cond.wait)
731 cond.acquire()
732 res = wait(TIMEOUT1)
733 cond.release()
734 self.assertEqual(res, None)
735 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
738 class _TestEvent(BaseTestCase):
740 def _test_event(self, event):
741 time.sleep(TIMEOUT2)
742 event.set()
744 def test_event(self):
745 event = self.Event()
746 wait = TimingWrapper(event.wait)
748 # Removed temporaily, due to API shear, this does not
749 # work with threading._Event objects. is_set == isSet
750 #self.assertEqual(event.is_set(), False)
752 self.assertEqual(wait(0.0), None)
753 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
754 self.assertEqual(wait(TIMEOUT1), None)
755 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
757 event.set()
759 # See note above on the API differences
760 # self.assertEqual(event.is_set(), True)
761 self.assertEqual(wait(), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 self.assertEqual(wait(TIMEOUT1), None)
764 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
765 # self.assertEqual(event.is_set(), True)
767 event.clear()
769 #self.assertEqual(event.is_set(), False)
771 self.Process(target=self._test_event, args=(event,)).start()
772 self.assertEqual(wait(), None)
778 class _TestValue(BaseTestCase):
780 codes_values = [
781 ('i', 4343, 24234),
782 ('d', 3.625, -4.25),
783 ('h', -232, 234),
784 ('c', latin('x'), latin('y'))
787 def _test(self, values):
788 for sv, cv in zip(values, self.codes_values):
789 sv.value = cv[2]
792 def test_value(self, raw=False):
793 if self.TYPE != 'processes':
794 return
796 if raw:
797 values = [self.RawValue(code, value)
798 for code, value, _ in self.codes_values]
799 else:
800 values = [self.Value(code, value)
801 for code, value, _ in self.codes_values]
803 for sv, cv in zip(values, self.codes_values):
804 self.assertEqual(sv.value, cv[1])
806 proc = self.Process(target=self._test, args=(values,))
807 proc.start()
808 proc.join()
810 for sv, cv in zip(values, self.codes_values):
811 self.assertEqual(sv.value, cv[2])
813 def test_rawvalue(self):
814 self.test_value(raw=True)
816 def test_getobj_getlock(self):
817 if self.TYPE != 'processes':
818 return
820 val1 = self.Value('i', 5)
821 lock1 = val1.get_lock()
822 obj1 = val1.get_obj()
824 val2 = self.Value('i', 5, lock=None)
825 lock2 = val2.get_lock()
826 obj2 = val2.get_obj()
828 lock = self.Lock()
829 val3 = self.Value('i', 5, lock=lock)
830 lock3 = val3.get_lock()
831 obj3 = val3.get_obj()
832 self.assertEqual(lock, lock3)
834 arr4 = self.Value('i', 5, lock=False)
835 self.assertFalse(hasattr(arr4, 'get_lock'))
836 self.assertFalse(hasattr(arr4, 'get_obj'))
838 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
840 arr5 = self.RawValue('i', 5)
841 self.assertFalse(hasattr(arr5, 'get_lock'))
842 self.assertFalse(hasattr(arr5, 'get_obj'))
845 class _TestArray(BaseTestCase):
847 def f(self, seq):
848 for i in range(1, len(seq)):
849 seq[i] += seq[i-1]
851 def test_array(self, raw=False):
852 if self.TYPE != 'processes':
853 return
855 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
856 if raw:
857 arr = self.RawArray('i', seq)
858 else:
859 arr = self.Array('i', seq)
861 self.assertEqual(len(arr), len(seq))
862 self.assertEqual(arr[3], seq[3])
863 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
865 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
867 self.assertEqual(list(arr[:]), seq)
869 self.f(seq)
871 p = self.Process(target=self.f, args=(arr,))
872 p.start()
873 p.join()
875 self.assertEqual(list(arr[:]), seq)
877 def test_rawarray(self):
878 self.test_array(raw=True)
880 def test_getobj_getlock_obj(self):
881 if self.TYPE != 'processes':
882 return
884 arr1 = self.Array('i', range(10))
885 lock1 = arr1.get_lock()
886 obj1 = arr1.get_obj()
888 arr2 = self.Array('i', range(10), lock=None)
889 lock2 = arr2.get_lock()
890 obj2 = arr2.get_obj()
892 lock = self.Lock()
893 arr3 = self.Array('i', range(10), lock=lock)
894 lock3 = arr3.get_lock()
895 obj3 = arr3.get_obj()
896 self.assertEqual(lock, lock3)
898 arr4 = self.Array('i', range(10), lock=False)
899 self.assertFalse(hasattr(arr4, 'get_lock'))
900 self.assertFalse(hasattr(arr4, 'get_obj'))
901 self.assertRaises(AttributeError,
902 self.Array, 'i', range(10), lock='notalock')
904 arr5 = self.RawArray('i', range(10))
905 self.assertFalse(hasattr(arr5, 'get_lock'))
906 self.assertFalse(hasattr(arr5, 'get_obj'))
912 class _TestContainers(BaseTestCase):
914 ALLOWED_TYPES = ('manager',)
916 def test_list(self):
917 a = self.list(range(10))
918 self.assertEqual(a[:], range(10))
920 b = self.list()
921 self.assertEqual(b[:], [])
923 b.extend(range(5))
924 self.assertEqual(b[:], range(5))
926 self.assertEqual(b[2], 2)
927 self.assertEqual(b[2:10], [2,3,4])
929 b *= 2
930 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
932 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
934 self.assertEqual(a[:], range(10))
936 d = [a, b]
937 e = self.list(d)
938 self.assertEqual(
939 e[:],
940 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
943 f = self.list([a])
944 a.append('hello')
945 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
947 def test_dict(self):
948 d = self.dict()
949 indices = range(65, 70)
950 for i in indices:
951 d[i] = chr(i)
952 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
953 self.assertEqual(sorted(d.keys()), indices)
954 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
955 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
957 def test_namespace(self):
958 n = self.Namespace()
959 n.name = 'Bob'
960 n.job = 'Builder'
961 n._hidden = 'hidden'
962 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
963 del n.job
964 self.assertEqual(str(n), "Namespace(name='Bob')")
965 self.assertTrue(hasattr(n, 'name'))
966 self.assertTrue(not hasattr(n, 'job'))
972 def sqr(x, wait=0.0):
973 time.sleep(wait)
974 return x*x
975 class _TestPool(BaseTestCase):
977 def test_apply(self):
978 papply = self.pool.apply
979 self.assertEqual(papply(sqr, (5,)), sqr(5))
980 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
982 def test_map(self):
983 pmap = self.pool.map
984 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
985 self.assertEqual(pmap(sqr, range(100), chunksize=20),
986 map(sqr, range(100)))
988 def test_async(self):
989 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
990 get = TimingWrapper(res.get)
991 self.assertEqual(get(), 49)
992 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
994 def test_async_timeout(self):
995 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
996 get = TimingWrapper(res.get)
997 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
998 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1000 def test_imap(self):
1001 it = self.pool.imap(sqr, range(10))
1002 self.assertEqual(list(it), map(sqr, range(10)))
1004 it = self.pool.imap(sqr, range(10))
1005 for i in range(10):
1006 self.assertEqual(it.next(), i*i)
1007 self.assertRaises(StopIteration, it.next)
1009 it = self.pool.imap(sqr, range(1000), chunksize=100)
1010 for i in range(1000):
1011 self.assertEqual(it.next(), i*i)
1012 self.assertRaises(StopIteration, it.next)
1014 def test_imap_unordered(self):
1015 it = self.pool.imap_unordered(sqr, range(1000))
1016 self.assertEqual(sorted(it), map(sqr, range(1000)))
1018 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1019 self.assertEqual(sorted(it), map(sqr, range(1000)))
1021 def test_make_pool(self):
1022 p = multiprocessing.Pool(3)
1023 self.assertEqual(3, len(p._pool))
1024 p.close()
1025 p.join()
1027 def test_terminate(self):
1028 if self.TYPE == 'manager':
1029 # On Unix a forked process increfs each shared object to
1030 # which its parent process held a reference. If the
1031 # forked process gets terminated then there is likely to
1032 # be a reference leak. So to prevent
1033 # _TestZZZNumberOfObjects from failing we skip this test
1034 # when using a manager.
1035 return
1037 result = self.pool.map_async(
1038 time.sleep, [0.1 for i in range(10000)], chunksize=1
1040 self.pool.terminate()
1041 join = TimingWrapper(self.pool.join)
1042 join()
1043 self.assertTrue(join.elapsed < 0.2)
1045 # Test that manager has expected number of shared objects left
1048 class _TestZZZNumberOfObjects(BaseTestCase):
1049 # Because test cases are sorted alphabetically, this one will get
1050 # run after all the other tests for the manager. It tests that
1051 # there have been no "reference leaks" for the manager's shared
1052 # objects. Note the comment in _TestPool.test_terminate().
1053 ALLOWED_TYPES = ('manager',)
1055 def test_number_of_objects(self):
1056 EXPECTED_NUMBER = 1 # the pool object is still alive
1057 multiprocessing.active_children() # discard dead process objs
1058 gc.collect() # do garbage collection
1059 refs = self.manager._number_of_objects()
1060 debug_info = self.manager._debug_info()
1061 if refs != EXPECTED_NUMBER:
1062 print self.manager._debug_info()
1063 print debug_info
1065 self.assertEqual(refs, EXPECTED_NUMBER)
1068 # Test of creating a customized manager class
1071 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1073 class FooBar(object):
1074 def f(self):
1075 return 'f()'
1076 def g(self):
1077 raise ValueError
1078 def _h(self):
1079 return '_h()'
1081 def baz():
1082 for i in xrange(10):
1083 yield i*i
1085 class IteratorProxy(BaseProxy):
1086 _exposed_ = ('next', '__next__')
1087 def __iter__(self):
1088 return self
1089 def next(self):
1090 return self._callmethod('next')
1091 def __next__(self):
1092 return self._callmethod('__next__')
1094 class MyManager(BaseManager):
1095 pass
1097 MyManager.register('Foo', callable=FooBar)
1098 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1099 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1102 class _TestMyManager(BaseTestCase):
1104 ALLOWED_TYPES = ('manager',)
1106 def test_mymanager(self):
1107 manager = MyManager()
1108 manager.start()
1110 foo = manager.Foo()
1111 bar = manager.Bar()
1112 baz = manager.baz()
1114 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1115 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1117 self.assertEqual(foo_methods, ['f', 'g'])
1118 self.assertEqual(bar_methods, ['f', '_h'])
1120 self.assertEqual(foo.f(), 'f()')
1121 self.assertRaises(ValueError, foo.g)
1122 self.assertEqual(foo._callmethod('f'), 'f()')
1123 self.assertRaises(RemoteError, foo._callmethod, '_h')
1125 self.assertEqual(bar.f(), 'f()')
1126 self.assertEqual(bar._h(), '_h()')
1127 self.assertEqual(bar._callmethod('f'), 'f()')
1128 self.assertEqual(bar._callmethod('_h'), '_h()')
1130 self.assertEqual(list(baz), [i*i for i in range(10)])
1132 manager.shutdown()
1135 # Test of connecting to a remote server and using xmlrpclib for serialization
1138 _queue = Queue.Queue()
1139 def get_queue():
1140 return _queue
1142 class QueueManager(BaseManager):
1143 '''manager class used by server process'''
1144 QueueManager.register('get_queue', callable=get_queue)
1146 class QueueManager2(BaseManager):
1147 '''manager class which specifies the same interface as QueueManager'''
1148 QueueManager2.register('get_queue')
1151 SERIALIZER = 'xmlrpclib'
1153 class _TestRemoteManager(BaseTestCase):
1155 ALLOWED_TYPES = ('manager',)
1157 def _putter(self, address, authkey):
1158 manager = QueueManager2(
1159 address=address, authkey=authkey, serializer=SERIALIZER
1161 manager.connect()
1162 queue = manager.get_queue()
1163 queue.put(('hello world', None, True, 2.25))
1165 def test_remote(self):
1166 authkey = os.urandom(32)
1168 manager = QueueManager(
1169 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1171 manager.start()
1173 p = self.Process(target=self._putter, args=(manager.address, authkey))
1174 p.start()
1176 manager2 = QueueManager2(
1177 address=manager.address, authkey=authkey, serializer=SERIALIZER
1179 manager2.connect()
1180 queue = manager2.get_queue()
1182 # Note that xmlrpclib will deserialize object as a list not a tuple
1183 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1185 # Because we are using xmlrpclib for serialization instead of
1186 # pickle this will cause a serialization error.
1187 self.assertRaises(Exception, queue.put, time.sleep)
1189 # Make queue finalizer run before the server is stopped
1190 del queue
1191 manager.shutdown()
1197 SENTINEL = latin('')
1199 class _TestConnection(BaseTestCase):
1201 ALLOWED_TYPES = ('processes', 'threads')
1203 def _echo(self, conn):
1204 for msg in iter(conn.recv_bytes, SENTINEL):
1205 conn.send_bytes(msg)
1206 conn.close()
1208 def test_connection(self):
1209 conn, child_conn = self.Pipe()
1211 p = self.Process(target=self._echo, args=(child_conn,))
1212 p.daemon = True
1213 p.start()
1215 seq = [1, 2.25, None]
1216 msg = latin('hello world')
1217 longmsg = msg * 10
1218 arr = array.array('i', range(4))
1220 if self.TYPE == 'processes':
1221 self.assertEqual(type(conn.fileno()), int)
1223 self.assertEqual(conn.send(seq), None)
1224 self.assertEqual(conn.recv(), seq)
1226 self.assertEqual(conn.send_bytes(msg), None)
1227 self.assertEqual(conn.recv_bytes(), msg)
1229 if self.TYPE == 'processes':
1230 buffer = array.array('i', [0]*10)
1231 expected = list(arr) + [0] * (10 - len(arr))
1232 self.assertEqual(conn.send_bytes(arr), None)
1233 self.assertEqual(conn.recv_bytes_into(buffer),
1234 len(arr) * buffer.itemsize)
1235 self.assertEqual(list(buffer), expected)
1237 buffer = array.array('i', [0]*10)
1238 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1239 self.assertEqual(conn.send_bytes(arr), None)
1240 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1241 len(arr) * buffer.itemsize)
1242 self.assertEqual(list(buffer), expected)
1244 buffer = bytearray(latin(' ' * 40))
1245 self.assertEqual(conn.send_bytes(longmsg), None)
1246 try:
1247 res = conn.recv_bytes_into(buffer)
1248 except multiprocessing.BufferTooShort, e:
1249 self.assertEqual(e.args, (longmsg,))
1250 else:
1251 self.fail('expected BufferTooShort, got %s' % res)
1253 poll = TimingWrapper(conn.poll)
1255 self.assertEqual(poll(), False)
1256 self.assertTimingAlmostEqual(poll.elapsed, 0)
1258 self.assertEqual(poll(TIMEOUT1), False)
1259 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1261 conn.send(None)
1263 self.assertEqual(poll(TIMEOUT1), True)
1264 self.assertTimingAlmostEqual(poll.elapsed, 0)
1266 self.assertEqual(conn.recv(), None)
1268 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1269 conn.send_bytes(really_big_msg)
1270 self.assertEqual(conn.recv_bytes(), really_big_msg)
1272 conn.send_bytes(SENTINEL) # tell child to quit
1273 child_conn.close()
1275 if self.TYPE == 'processes':
1276 self.assertEqual(conn.readable, True)
1277 self.assertEqual(conn.writable, True)
1278 self.assertRaises(EOFError, conn.recv)
1279 self.assertRaises(EOFError, conn.recv_bytes)
1281 p.join()
1283 def test_duplex_false(self):
1284 reader, writer = self.Pipe(duplex=False)
1285 self.assertEqual(writer.send(1), None)
1286 self.assertEqual(reader.recv(), 1)
1287 if self.TYPE == 'processes':
1288 self.assertEqual(reader.readable, True)
1289 self.assertEqual(reader.writable, False)
1290 self.assertEqual(writer.readable, False)
1291 self.assertEqual(writer.writable, True)
1292 self.assertRaises(IOError, reader.send, 2)
1293 self.assertRaises(IOError, writer.recv)
1294 self.assertRaises(IOError, writer.poll)
1296 def test_spawn_close(self):
1297 # We test that a pipe connection can be closed by parent
1298 # process immediately after child is spawned. On Windows this
1299 # would have sometimes failed on old versions because
1300 # child_conn would be closed before the child got a chance to
1301 # duplicate it.
1302 conn, child_conn = self.Pipe()
1304 p = self.Process(target=self._echo, args=(child_conn,))
1305 p.start()
1306 child_conn.close() # this might complete before child initializes
1308 msg = latin('hello')
1309 conn.send_bytes(msg)
1310 self.assertEqual(conn.recv_bytes(), msg)
1312 conn.send_bytes(SENTINEL)
1313 conn.close()
1314 p.join()
1316 def test_sendbytes(self):
1317 if self.TYPE != 'processes':
1318 return
1320 msg = latin('abcdefghijklmnopqrstuvwxyz')
1321 a, b = self.Pipe()
1323 a.send_bytes(msg)
1324 self.assertEqual(b.recv_bytes(), msg)
1326 a.send_bytes(msg, 5)
1327 self.assertEqual(b.recv_bytes(), msg[5:])
1329 a.send_bytes(msg, 7, 8)
1330 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1332 a.send_bytes(msg, 26)
1333 self.assertEqual(b.recv_bytes(), latin(''))
1335 a.send_bytes(msg, 26, 0)
1336 self.assertEqual(b.recv_bytes(), latin(''))
1338 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1340 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1342 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1344 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1346 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1348 class _TestListenerClient(BaseTestCase):
1350 ALLOWED_TYPES = ('processes', 'threads')
1352 def _test(self, address):
1353 conn = self.connection.Client(address)
1354 conn.send('hello')
1355 conn.close()
1357 def test_listener_client(self):
1358 for family in self.connection.families:
1359 l = self.connection.Listener(family=family)
1360 p = self.Process(target=self._test, args=(l.address,))
1361 p.daemon = True
1362 p.start()
1363 conn = l.accept()
1364 self.assertEqual(conn.recv(), 'hello')
1365 p.join()
1366 l.close()
1368 # Test of sending connection and socket objects between processes
1371 class _TestPicklingConnections(BaseTestCase):
1373 ALLOWED_TYPES = ('processes',)
1375 def _listener(self, conn, families):
1376 for fam in families:
1377 l = self.connection.Listener(family=fam)
1378 conn.send(l.address)
1379 new_conn = l.accept()
1380 conn.send(new_conn)
1382 if self.TYPE == 'processes':
1383 l = socket.socket()
1384 l.bind(('localhost', 0))
1385 conn.send(l.getsockname())
1386 l.listen(1)
1387 new_conn, addr = l.accept()
1388 conn.send(new_conn)
1390 conn.recv()
1392 def _remote(self, conn):
1393 for (address, msg) in iter(conn.recv, None):
1394 client = self.connection.Client(address)
1395 client.send(msg.upper())
1396 client.close()
1398 if self.TYPE == 'processes':
1399 address, msg = conn.recv()
1400 client = socket.socket()
1401 client.connect(address)
1402 client.sendall(msg.upper())
1403 client.close()
1405 conn.close()
1407 def test_pickling(self):
1408 try:
1409 multiprocessing.allow_connection_pickling()
1410 except ImportError:
1411 return
1413 families = self.connection.families
1415 lconn, lconn0 = self.Pipe()
1416 lp = self.Process(target=self._listener, args=(lconn0, families))
1417 lp.start()
1418 lconn0.close()
1420 rconn, rconn0 = self.Pipe()
1421 rp = self.Process(target=self._remote, args=(rconn0,))
1422 rp.start()
1423 rconn0.close()
1425 for fam in families:
1426 msg = ('This connection uses family %s' % fam).encode('ascii')
1427 address = lconn.recv()
1428 rconn.send((address, msg))
1429 new_conn = lconn.recv()
1430 self.assertEqual(new_conn.recv(), msg.upper())
1432 rconn.send(None)
1434 if self.TYPE == 'processes':
1435 msg = latin('This connection uses a normal socket')
1436 address = lconn.recv()
1437 rconn.send((address, msg))
1438 if hasattr(socket, 'fromfd'):
1439 new_conn = lconn.recv()
1440 self.assertEqual(new_conn.recv(100), msg.upper())
1441 else:
1442 # XXX On Windows with Py2.6 need to backport fromfd()
1443 discard = lconn.recv_bytes()
1445 lconn.send(None)
1447 rconn.close()
1448 lconn.close()
1450 lp.join()
1451 rp.join()
1457 class _TestHeap(BaseTestCase):
1459 ALLOWED_TYPES = ('processes',)
1461 def test_heap(self):
1462 iterations = 5000
1463 maxblocks = 50
1464 blocks = []
1466 # create and destroy lots of blocks of different sizes
1467 for i in xrange(iterations):
1468 size = int(random.lognormvariate(0, 1) * 1000)
1469 b = multiprocessing.heap.BufferWrapper(size)
1470 blocks.append(b)
1471 if len(blocks) > maxblocks:
1472 i = random.randrange(maxblocks)
1473 del blocks[i]
1475 # get the heap object
1476 heap = multiprocessing.heap.BufferWrapper._heap
1478 # verify the state of the heap
1479 all = []
1480 occupied = 0
1481 for L in heap._len_to_seq.values():
1482 for arena, start, stop in L:
1483 all.append((heap._arenas.index(arena), start, stop,
1484 stop-start, 'free'))
1485 for arena, start, stop in heap._allocated_blocks:
1486 all.append((heap._arenas.index(arena), start, stop,
1487 stop-start, 'occupied'))
1488 occupied += (stop-start)
1490 all.sort()
1492 for i in range(len(all)-1):
1493 (arena, start, stop) = all[i][:3]
1494 (narena, nstart, nstop) = all[i+1][:3]
1495 self.assertTrue((arena != narena and nstart == 0) or
1496 (stop == nstart))
1502 try:
1503 from ctypes import Structure, Value, copy, c_int, c_double
1504 except ImportError:
1505 Structure = object
1506 c_int = c_double = None
1508 class _Foo(Structure):
1509 _fields_ = [
1510 ('x', c_int),
1511 ('y', c_double)
1514 class _TestSharedCTypes(BaseTestCase):
1516 ALLOWED_TYPES = ('processes',)
1518 def _double(self, x, y, foo, arr, string):
1519 x.value *= 2
1520 y.value *= 2
1521 foo.x *= 2
1522 foo.y *= 2
1523 string.value *= 2
1524 for i in range(len(arr)):
1525 arr[i] *= 2
1527 def test_sharedctypes(self, lock=False):
1528 if c_int is None:
1529 return
1531 x = Value('i', 7, lock=lock)
1532 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1533 foo = Value(_Foo, 3, 2, lock=lock)
1534 arr = Array('d', range(10), lock=lock)
1535 string = Array('c', 20, lock=lock)
1536 string.value = 'hello'
1538 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1539 p.start()
1540 p.join()
1542 self.assertEqual(x.value, 14)
1543 self.assertAlmostEqual(y.value, 2.0/3.0)
1544 self.assertEqual(foo.x, 6)
1545 self.assertAlmostEqual(foo.y, 4.0)
1546 for i in range(10):
1547 self.assertAlmostEqual(arr[i], i*2)
1548 self.assertEqual(string.value, latin('hellohello'))
1550 def test_synchronize(self):
1551 self.test_sharedctypes(lock=True)
1553 def test_copy(self):
1554 if c_int is None:
1555 return
1557 foo = _Foo(2, 5.0)
1558 bar = copy(foo)
1559 foo.x = 0
1560 foo.y = 0
1561 self.assertEqual(bar.x, 2)
1562 self.assertAlmostEqual(bar.y, 5.0)
1568 class _TestFinalize(BaseTestCase):
1570 ALLOWED_TYPES = ('processes',)
1572 def _test_finalize(self, conn):
1573 class Foo(object):
1574 pass
1576 a = Foo()
1577 util.Finalize(a, conn.send, args=('a',))
1578 del a # triggers callback for a
1580 b = Foo()
1581 close_b = util.Finalize(b, conn.send, args=('b',))
1582 close_b() # triggers callback for b
1583 close_b() # does nothing because callback has already been called
1584 del b # does nothing because callback has already been called
1586 c = Foo()
1587 util.Finalize(c, conn.send, args=('c',))
1589 d10 = Foo()
1590 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1592 d01 = Foo()
1593 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1594 d02 = Foo()
1595 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1596 d03 = Foo()
1597 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1599 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1601 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1603 # call mutliprocessing's cleanup function then exit process without
1604 # garbage collecting locals
1605 util._exit_function()
1606 conn.close()
1607 os._exit(0)
1609 def test_finalize(self):
1610 conn, child_conn = self.Pipe()
1612 p = self.Process(target=self._test_finalize, args=(child_conn,))
1613 p.start()
1614 p.join()
1616 result = [obj for obj in iter(conn.recv, 'STOP')]
1617 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1620 # Test that from ... import * works for each module
1623 class _TestImportStar(BaseTestCase):
1625 ALLOWED_TYPES = ('processes',)
1627 def test_import(self):
1628 modules = (
1629 'multiprocessing', 'multiprocessing.connection',
1630 'multiprocessing.heap', 'multiprocessing.managers',
1631 'multiprocessing.pool', 'multiprocessing.process',
1632 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1633 'multiprocessing.synchronize', 'multiprocessing.util'
1636 for name in modules:
1637 __import__(name)
1638 mod = sys.modules[name]
1640 for attr in getattr(mod, '__all__', ()):
1641 self.assertTrue(
1642 hasattr(mod, attr),
1643 '%r does not have attribute %r' % (mod, attr)
1647 # Quick test that logging works -- does not test logging output
1650 class _TestLogging(BaseTestCase):
1652 ALLOWED_TYPES = ('processes',)
1654 def test_enable_logging(self):
1655 logger = multiprocessing.get_logger()
1656 logger.setLevel(util.SUBWARNING)
1657 self.assertTrue(logger is not None)
1658 logger.debug('this will not be printed')
1659 logger.info('nor will this')
1660 logger.setLevel(LOG_LEVEL)
1662 def _test_level(self, conn):
1663 logger = multiprocessing.get_logger()
1664 conn.send(logger.getEffectiveLevel())
1666 def test_level(self):
1667 LEVEL1 = 32
1668 LEVEL2 = 37
1670 logger = multiprocessing.get_logger()
1671 root_logger = logging.getLogger()
1672 root_level = root_logger.level
1674 reader, writer = multiprocessing.Pipe(duplex=False)
1676 logger.setLevel(LEVEL1)
1677 self.Process(target=self._test_level, args=(writer,)).start()
1678 self.assertEqual(LEVEL1, reader.recv())
1680 logger.setLevel(logging.NOTSET)
1681 root_logger.setLevel(LEVEL2)
1682 self.Process(target=self._test_level, args=(writer,)).start()
1683 self.assertEqual(LEVEL2, reader.recv())
1685 root_logger.setLevel(root_level)
1686 logger.setLevel(level=LOG_LEVEL)
1689 # Test to verify handle verification, see issue 3321
1692 class TestInvalidHandle(unittest.TestCase):
1694 def test_invalid_handles(self):
1695 if WIN32:
1696 return
1697 conn = _multiprocessing.Connection(44977608)
1698 self.assertRaises(IOError, conn.poll)
1699 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1701 # Functions used to create test cases from the base ones in this module
1704 def get_attributes(Source, names):
1705 d = {}
1706 for name in names:
1707 obj = getattr(Source, name)
1708 if type(obj) == type(get_attributes):
1709 obj = staticmethod(obj)
1710 d[name] = obj
1711 return d
1713 def create_test_cases(Mixin, type):
1714 result = {}
1715 glob = globals()
1716 Type = type[0].upper() + type[1:]
1718 for name in glob.keys():
1719 if name.startswith('_Test'):
1720 base = glob[name]
1721 if type in base.ALLOWED_TYPES:
1722 newname = 'With' + Type + name[1:]
1723 class Temp(base, unittest.TestCase, Mixin):
1724 pass
1725 result[newname] = Temp
1726 Temp.__name__ = newname
1727 Temp.__module__ = Mixin.__module__
1728 return result
1731 # Create test cases
1734 class ProcessesMixin(object):
1735 TYPE = 'processes'
1736 Process = multiprocessing.Process
1737 locals().update(get_attributes(multiprocessing, (
1738 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1739 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1740 'RawArray', 'current_process', 'active_children', 'Pipe',
1741 'connection', 'JoinableQueue'
1744 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1745 globals().update(testcases_processes)
1748 class ManagerMixin(object):
1749 TYPE = 'manager'
1750 Process = multiprocessing.Process
1751 manager = object.__new__(multiprocessing.managers.SyncManager)
1752 locals().update(get_attributes(manager, (
1753 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1754 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1755 'Namespace', 'JoinableQueue'
1758 testcases_manager = create_test_cases(ManagerMixin, type='manager')
1759 globals().update(testcases_manager)
1762 class ThreadsMixin(object):
1763 TYPE = 'threads'
1764 Process = multiprocessing.dummy.Process
1765 locals().update(get_attributes(multiprocessing.dummy, (
1766 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1767 'Condition', 'Event', 'Value', 'Array', 'current_process',
1768 'active_children', 'Pipe', 'connection', 'dict', 'list',
1769 'Namespace', 'JoinableQueue'
1772 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1773 globals().update(testcases_threads)
1775 class OtherTest(unittest.TestCase):
1776 # TODO: add more tests for deliver/answer challenge.
1777 def test_deliver_challenge_auth_failure(self):
1778 class _FakeConnection(object):
1779 def recv_bytes(self, size):
1780 return b'something bogus'
1781 def send_bytes(self, data):
1782 pass
1783 self.assertRaises(multiprocessing.AuthenticationError,
1784 multiprocessing.connection.deliver_challenge,
1785 _FakeConnection(), b'abc')
1787 def test_answer_challenge_auth_failure(self):
1788 class _FakeConnection(object):
1789 def __init__(self):
1790 self.count = 0
1791 def recv_bytes(self, size):
1792 self.count += 1
1793 if self.count == 1:
1794 return multiprocessing.connection.CHALLENGE
1795 elif self.count == 2:
1796 return b'something bogus'
1797 return b''
1798 def send_bytes(self, data):
1799 pass
1800 self.assertRaises(multiprocessing.AuthenticationError,
1801 multiprocessing.connection.answer_challenge,
1802 _FakeConnection(), b'abc')
1804 testcases_other = [OtherTest, TestInvalidHandle]
1810 def test_main(run=None):
1811 if sys.platform.startswith("linux"):
1812 try:
1813 lock = multiprocessing.RLock()
1814 except OSError:
1815 from test.test_support import TestSkipped
1816 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
1818 if run is None:
1819 from test.test_support import run_unittest as run
1821 util.get_temp_dir() # creates temp directory for use by all processes
1823 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1825 ProcessesMixin.pool = multiprocessing.Pool(4)
1826 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1827 ManagerMixin.manager.__init__()
1828 ManagerMixin.manager.start()
1829 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
1831 testcases = (
1832 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1833 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1834 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1835 testcases_other
1838 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1839 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1840 run(suite)
1842 ThreadsMixin.pool.terminate()
1843 ProcessesMixin.pool.terminate()
1844 ManagerMixin.pool.terminate()
1845 ManagerMixin.manager.shutdown()
1847 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
1849 def main():
1850 test_main(unittest.TextTestRunner(verbosity=2).run)
1852 if __name__ == '__main__':
1853 main()