Issue #6615: logging: Used weak references in internal handler list. Thanks to flox...
[python.git] / Lib / test / test_multiprocessing.py
blob6efeb69b5dc92a92460d647c9705c1ed828f5634
1 #!/usr/bin/env python
4 # Unit tests for the multiprocessing package
7 import unittest
8 import threading
9 import Queue
10 import time
11 import sys
12 import os
13 import gc
14 import signal
15 import array
16 import copy
17 import socket
18 import random
19 import logging
20 from test import test_support
21 from StringIO import StringIO
24 _multiprocessing = test_support.import_module('_multiprocessing')
26 # Work around broken sem_open implementations
27 test_support.import_module('multiprocessing.synchronize')
29 import multiprocessing.dummy
30 import multiprocessing.connection
31 import multiprocessing.managers
32 import multiprocessing.heap
33 import multiprocessing.pool
35 from multiprocessing import util
41 latin = str
44 # Constants
47 LOG_LEVEL = util.SUBWARNING
48 #LOG_LEVEL = logging.WARNING
50 DELTA = 0.1
51 CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55 if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57 else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60 HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
63 WIN32 = (sys.platform == "win32")
66 # Creates a wrapper for a function which records the time it takes to finish
69 class TimingWrapper(object):
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
83 # Base class for test cases
86 class BaseTestCase(object):
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
103 # Return the value of a semaphore
106 def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
119 # Testcases
122 class _TestProcess(BaseTestCase):
124 ALLOWED_TYPES = ('processes', 'threads')
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
130 current = self.current_process()
131 authkey = current.authkey
133 self.assertTrue(current.is_alive())
134 self.assertTrue(not current.daemon)
135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
144 q.put(current.name)
145 if self.TYPE != 'threads':
146 q.put(bytes(current.authkey))
147 q.put(current.pid)
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
158 p.daemon = True
159 current = self.current_process()
161 if self.TYPE != 'threads':
162 self.assertEquals(p.authkey, current.authkey)
163 self.assertEquals(p.is_alive(), False)
164 self.assertEquals(p.daemon, True)
165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
167 self.assertEqual(p.exitcode, None)
169 p.start()
171 self.assertEquals(p.exitcode, None)
172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
177 self.assertEquals(q.get(), p.name)
178 if self.TYPE != 'threads':
179 self.assertEquals(q.get(), current.authkey)
180 self.assertEquals(q.get(), p.pid)
182 p.join()
184 self.assertEquals(p.exitcode, 0)
185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
188 def _test_terminate(self):
189 time.sleep(1000)
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
195 p = self.Process(target=self._test_terminate)
196 p.daemon = True
197 p.start()
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
201 self.assertEqual(p.exitcode, None)
203 p.terminate()
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
212 p.join()
214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
231 p.start()
232 self.assertTrue(p in self.active_children())
234 p.join()
235 self.assertTrue(p not in self.active_children())
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
245 p.start()
246 p.join()
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
257 expected = [
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
266 self.assertEqual(result, expected)
272 class _UpperCaser(multiprocessing.Process):
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
294 class _TestSubclassingProcess(BaseTestCase):
296 ALLOWED_TYPES = ('processes',)
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
310 def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
316 def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
323 class _TestQueue(BaseTestCase):
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
342 proc.daemon = True
343 proc.start()
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
364 self.assertRaises(Queue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
367 self.assertRaises(Queue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
370 self.assertRaises(Queue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
379 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382 child_can_start.set()
383 parent_can_continue.wait()
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
388 proc.join()
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
392 #queue.put(1)
393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
408 proc.daemon = True
409 proc.start()
411 self.assertEqual(queue_empty(queue), True)
413 child_can_start.set()
414 parent_can_continue.wait()
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
426 self.assertEqual(queue_empty(queue), True)
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
431 self.assertRaises(Queue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
434 self.assertRaises(Queue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
437 self.assertRaises(Queue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
446 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449 proc.join()
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
464 queue = self.Queue()
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(Queue.Empty, queue.get, False)
482 p.join()
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
504 def test_task_done(self):
505 queue = self.JoinableQueue()
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in xrange(4)]
513 for p in workers:
514 p.start()
516 for i in xrange(10):
517 queue.put(i)
519 queue.join()
521 for p in workers:
522 queue.put(None)
524 for p in workers:
525 p.join()
531 class _TestLock(BaseTestCase):
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
550 def test_lock_context(self):
551 with self.Lock():
552 pass
555 class _TestSemaphore(BaseTestCase):
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
609 class _TestCondition(BaseTestCase):
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
635 p.daemon = True
636 p.start()
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
639 p.daemon = True
640 p.start()
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
681 p.daemon = True
682 p.start()
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
686 t.daemon = True
687 t.start()
689 # wait for them all to sleep
690 for i in xrange(6):
691 sleeping.acquire()
693 # check they have all timed out
694 for i in xrange(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
698 # check state is not mucked up
699 self.check_invariant(cond)
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
704 p.daemon = True
705 p.start()
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
708 t.daemon = True
709 t.start()
711 # wait for them to all sleep
712 for i in xrange(6):
713 sleeping.acquire()
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
728 # check state is not mucked up
729 self.check_invariant(cond)
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
741 class _TestEvent(BaseTestCase):
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
753 self.assertEqual(event.is_set(), False)
755 # Removed, threading.Event.wait() will return the value of the __flag
756 # instead of None. API Shear with the semaphore backed mp.Event
757 self.assertEqual(wait(0.0), False)
758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
759 self.assertEqual(wait(TIMEOUT1), False)
760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
762 event.set()
764 # See note above on the API differences
765 self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), True)
767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
768 self.assertEqual(wait(TIMEOUT1), True)
769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
772 event.clear()
774 #self.assertEqual(event.is_set(), False)
776 self.Process(target=self._test_event, args=(event,)).start()
777 self.assertEqual(wait(), True)
783 class _TestValue(BaseTestCase):
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
818 def test_rawvalue(self):
819 self.test_value(raw=True)
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
839 arr4 = self.Value('i', 5, lock=False)
840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
850 class _TestArray(BaseTestCase):
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
872 self.assertEqual(list(arr[:]), seq)
874 self.f(seq)
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
880 self.assertEqual(list(arr[:]), seq)
882 def test_rawarray(self):
883 self.test_array(raw=True)
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
889 arr1 = self.Array('i', range(10))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
893 arr2 = self.Array('i', range(10), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
897 lock = self.Lock()
898 arr3 = self.Array('i', range(10), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
903 arr4 = self.Array('i', range(10), lock=False)
904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
917 class _TestContainers(BaseTestCase):
919 ALLOWED_TYPES = ('manager',)
921 def test_list(self):
922 a = self.list(range(10))
923 self.assertEqual(a[:], range(10))
925 b = self.list()
926 self.assertEqual(b[:], [])
928 b.extend(range(5))
929 self.assertEqual(b[:], range(5))
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
939 self.assertEqual(a[:], range(10))
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
952 def test_dict(self):
953 d = self.dict()
954 indices = range(65, 70)
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
977 def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
980 class _TestPool(BaseTestCase):
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
990 self.assertEqual(pmap(sqr, range(100), chunksize=20),
991 map(sqr, range(100)))
993 def test_map_chunksize(self):
994 try:
995 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
996 except multiprocessing.TimeoutError:
997 self.fail("pool.map_async with chunksize stalled on null list")
999 def test_async(self):
1000 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1001 get = TimingWrapper(res.get)
1002 self.assertEqual(get(), 49)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1005 def test_async_timeout(self):
1006 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1007 get = TimingWrapper(res.get)
1008 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1009 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1011 def test_imap(self):
1012 it = self.pool.imap(sqr, range(10))
1013 self.assertEqual(list(it), map(sqr, range(10)))
1015 it = self.pool.imap(sqr, range(10))
1016 for i in range(10):
1017 self.assertEqual(it.next(), i*i)
1018 self.assertRaises(StopIteration, it.next)
1020 it = self.pool.imap(sqr, range(1000), chunksize=100)
1021 for i in range(1000):
1022 self.assertEqual(it.next(), i*i)
1023 self.assertRaises(StopIteration, it.next)
1025 def test_imap_unordered(self):
1026 it = self.pool.imap_unordered(sqr, range(1000))
1027 self.assertEqual(sorted(it), map(sqr, range(1000)))
1029 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1030 self.assertEqual(sorted(it), map(sqr, range(1000)))
1032 def test_make_pool(self):
1033 p = multiprocessing.Pool(3)
1034 self.assertEqual(3, len(p._pool))
1035 p.close()
1036 p.join()
1038 def test_terminate(self):
1039 if self.TYPE == 'manager':
1040 # On Unix a forked process increfs each shared object to
1041 # which its parent process held a reference. If the
1042 # forked process gets terminated then there is likely to
1043 # be a reference leak. So to prevent
1044 # _TestZZZNumberOfObjects from failing we skip this test
1045 # when using a manager.
1046 return
1048 result = self.pool.map_async(
1049 time.sleep, [0.1 for i in range(10000)], chunksize=1
1051 self.pool.terminate()
1052 join = TimingWrapper(self.pool.join)
1053 join()
1054 self.assertTrue(join.elapsed < 0.2)
1056 # Test that manager has expected number of shared objects left
1059 class _TestZZZNumberOfObjects(BaseTestCase):
1060 # Because test cases are sorted alphabetically, this one will get
1061 # run after all the other tests for the manager. It tests that
1062 # there have been no "reference leaks" for the manager's shared
1063 # objects. Note the comment in _TestPool.test_terminate().
1064 ALLOWED_TYPES = ('manager',)
1066 def test_number_of_objects(self):
1067 EXPECTED_NUMBER = 1 # the pool object is still alive
1068 multiprocessing.active_children() # discard dead process objs
1069 gc.collect() # do garbage collection
1070 refs = self.manager._number_of_objects()
1071 debug_info = self.manager._debug_info()
1072 if refs != EXPECTED_NUMBER:
1073 print self.manager._debug_info()
1074 print debug_info
1076 self.assertEqual(refs, EXPECTED_NUMBER)
1079 # Test of creating a customized manager class
1082 from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1084 class FooBar(object):
1085 def f(self):
1086 return 'f()'
1087 def g(self):
1088 raise ValueError
1089 def _h(self):
1090 return '_h()'
1092 def baz():
1093 for i in xrange(10):
1094 yield i*i
1096 class IteratorProxy(BaseProxy):
1097 _exposed_ = ('next', '__next__')
1098 def __iter__(self):
1099 return self
1100 def next(self):
1101 return self._callmethod('next')
1102 def __next__(self):
1103 return self._callmethod('__next__')
1105 class MyManager(BaseManager):
1106 pass
1108 MyManager.register('Foo', callable=FooBar)
1109 MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1110 MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1113 class _TestMyManager(BaseTestCase):
1115 ALLOWED_TYPES = ('manager',)
1117 def test_mymanager(self):
1118 manager = MyManager()
1119 manager.start()
1121 foo = manager.Foo()
1122 bar = manager.Bar()
1123 baz = manager.baz()
1125 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1126 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1128 self.assertEqual(foo_methods, ['f', 'g'])
1129 self.assertEqual(bar_methods, ['f', '_h'])
1131 self.assertEqual(foo.f(), 'f()')
1132 self.assertRaises(ValueError, foo.g)
1133 self.assertEqual(foo._callmethod('f'), 'f()')
1134 self.assertRaises(RemoteError, foo._callmethod, '_h')
1136 self.assertEqual(bar.f(), 'f()')
1137 self.assertEqual(bar._h(), '_h()')
1138 self.assertEqual(bar._callmethod('f'), 'f()')
1139 self.assertEqual(bar._callmethod('_h'), '_h()')
1141 self.assertEqual(list(baz), [i*i for i in range(10)])
1143 manager.shutdown()
1146 # Test of connecting to a remote server and using xmlrpclib for serialization
1149 _queue = Queue.Queue()
1150 def get_queue():
1151 return _queue
1153 class QueueManager(BaseManager):
1154 '''manager class used by server process'''
1155 QueueManager.register('get_queue', callable=get_queue)
1157 class QueueManager2(BaseManager):
1158 '''manager class which specifies the same interface as QueueManager'''
1159 QueueManager2.register('get_queue')
1162 SERIALIZER = 'xmlrpclib'
1164 class _TestRemoteManager(BaseTestCase):
1166 ALLOWED_TYPES = ('manager',)
1168 def _putter(self, address, authkey):
1169 manager = QueueManager2(
1170 address=address, authkey=authkey, serializer=SERIALIZER
1172 manager.connect()
1173 queue = manager.get_queue()
1174 queue.put(('hello world', None, True, 2.25))
1176 def test_remote(self):
1177 authkey = os.urandom(32)
1179 manager = QueueManager(
1180 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1182 manager.start()
1184 p = self.Process(target=self._putter, args=(manager.address, authkey))
1185 p.start()
1187 manager2 = QueueManager2(
1188 address=manager.address, authkey=authkey, serializer=SERIALIZER
1190 manager2.connect()
1191 queue = manager2.get_queue()
1193 # Note that xmlrpclib will deserialize object as a list not a tuple
1194 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1196 # Because we are using xmlrpclib for serialization instead of
1197 # pickle this will cause a serialization error.
1198 self.assertRaises(Exception, queue.put, time.sleep)
1200 # Make queue finalizer run before the server is stopped
1201 del queue
1202 manager.shutdown()
1204 class _TestManagerRestart(BaseTestCase):
1206 def _putter(self, address, authkey):
1207 manager = QueueManager(
1208 address=address, authkey=authkey, serializer=SERIALIZER)
1209 manager.connect()
1210 queue = manager.get_queue()
1211 queue.put('hello world')
1213 def test_rapid_restart(self):
1214 authkey = os.urandom(32)
1215 manager = QueueManager(
1216 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1217 manager.start()
1219 p = self.Process(target=self._putter, args=(manager.address, authkey))
1220 p.start()
1221 queue = manager.get_queue()
1222 self.assertEqual(queue.get(), 'hello world')
1223 del queue
1224 manager.shutdown()
1225 manager = QueueManager(
1226 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1227 manager.start()
1228 manager.shutdown()
1234 SENTINEL = latin('')
1236 class _TestConnection(BaseTestCase):
1238 ALLOWED_TYPES = ('processes', 'threads')
1240 def _echo(self, conn):
1241 for msg in iter(conn.recv_bytes, SENTINEL):
1242 conn.send_bytes(msg)
1243 conn.close()
1245 def test_connection(self):
1246 conn, child_conn = self.Pipe()
1248 p = self.Process(target=self._echo, args=(child_conn,))
1249 p.daemon = True
1250 p.start()
1252 seq = [1, 2.25, None]
1253 msg = latin('hello world')
1254 longmsg = msg * 10
1255 arr = array.array('i', range(4))
1257 if self.TYPE == 'processes':
1258 self.assertEqual(type(conn.fileno()), int)
1260 self.assertEqual(conn.send(seq), None)
1261 self.assertEqual(conn.recv(), seq)
1263 self.assertEqual(conn.send_bytes(msg), None)
1264 self.assertEqual(conn.recv_bytes(), msg)
1266 if self.TYPE == 'processes':
1267 buffer = array.array('i', [0]*10)
1268 expected = list(arr) + [0] * (10 - len(arr))
1269 self.assertEqual(conn.send_bytes(arr), None)
1270 self.assertEqual(conn.recv_bytes_into(buffer),
1271 len(arr) * buffer.itemsize)
1272 self.assertEqual(list(buffer), expected)
1274 buffer = array.array('i', [0]*10)
1275 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1276 self.assertEqual(conn.send_bytes(arr), None)
1277 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1278 len(arr) * buffer.itemsize)
1279 self.assertEqual(list(buffer), expected)
1281 buffer = bytearray(latin(' ' * 40))
1282 self.assertEqual(conn.send_bytes(longmsg), None)
1283 try:
1284 res = conn.recv_bytes_into(buffer)
1285 except multiprocessing.BufferTooShort, e:
1286 self.assertEqual(e.args, (longmsg,))
1287 else:
1288 self.fail('expected BufferTooShort, got %s' % res)
1290 poll = TimingWrapper(conn.poll)
1292 self.assertEqual(poll(), False)
1293 self.assertTimingAlmostEqual(poll.elapsed, 0)
1295 self.assertEqual(poll(TIMEOUT1), False)
1296 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1298 conn.send(None)
1300 self.assertEqual(poll(TIMEOUT1), True)
1301 self.assertTimingAlmostEqual(poll.elapsed, 0)
1303 self.assertEqual(conn.recv(), None)
1305 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1306 conn.send_bytes(really_big_msg)
1307 self.assertEqual(conn.recv_bytes(), really_big_msg)
1309 conn.send_bytes(SENTINEL) # tell child to quit
1310 child_conn.close()
1312 if self.TYPE == 'processes':
1313 self.assertEqual(conn.readable, True)
1314 self.assertEqual(conn.writable, True)
1315 self.assertRaises(EOFError, conn.recv)
1316 self.assertRaises(EOFError, conn.recv_bytes)
1318 p.join()
1320 def test_duplex_false(self):
1321 reader, writer = self.Pipe(duplex=False)
1322 self.assertEqual(writer.send(1), None)
1323 self.assertEqual(reader.recv(), 1)
1324 if self.TYPE == 'processes':
1325 self.assertEqual(reader.readable, True)
1326 self.assertEqual(reader.writable, False)
1327 self.assertEqual(writer.readable, False)
1328 self.assertEqual(writer.writable, True)
1329 self.assertRaises(IOError, reader.send, 2)
1330 self.assertRaises(IOError, writer.recv)
1331 self.assertRaises(IOError, writer.poll)
1333 def test_spawn_close(self):
1334 # We test that a pipe connection can be closed by parent
1335 # process immediately after child is spawned. On Windows this
1336 # would have sometimes failed on old versions because
1337 # child_conn would be closed before the child got a chance to
1338 # duplicate it.
1339 conn, child_conn = self.Pipe()
1341 p = self.Process(target=self._echo, args=(child_conn,))
1342 p.start()
1343 child_conn.close() # this might complete before child initializes
1345 msg = latin('hello')
1346 conn.send_bytes(msg)
1347 self.assertEqual(conn.recv_bytes(), msg)
1349 conn.send_bytes(SENTINEL)
1350 conn.close()
1351 p.join()
1353 def test_sendbytes(self):
1354 if self.TYPE != 'processes':
1355 return
1357 msg = latin('abcdefghijklmnopqrstuvwxyz')
1358 a, b = self.Pipe()
1360 a.send_bytes(msg)
1361 self.assertEqual(b.recv_bytes(), msg)
1363 a.send_bytes(msg, 5)
1364 self.assertEqual(b.recv_bytes(), msg[5:])
1366 a.send_bytes(msg, 7, 8)
1367 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1369 a.send_bytes(msg, 26)
1370 self.assertEqual(b.recv_bytes(), latin(''))
1372 a.send_bytes(msg, 26, 0)
1373 self.assertEqual(b.recv_bytes(), latin(''))
1375 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1377 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1379 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1381 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1383 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1385 class _TestListenerClient(BaseTestCase):
1387 ALLOWED_TYPES = ('processes', 'threads')
1389 def _test(self, address):
1390 conn = self.connection.Client(address)
1391 conn.send('hello')
1392 conn.close()
1394 def test_listener_client(self):
1395 for family in self.connection.families:
1396 l = self.connection.Listener(family=family)
1397 p = self.Process(target=self._test, args=(l.address,))
1398 p.daemon = True
1399 p.start()
1400 conn = l.accept()
1401 self.assertEqual(conn.recv(), 'hello')
1402 p.join()
1403 l.close()
1405 # Test of sending connection and socket objects between processes
1408 class _TestPicklingConnections(BaseTestCase):
1410 ALLOWED_TYPES = ('processes',)
1412 def _listener(self, conn, families):
1413 for fam in families:
1414 l = self.connection.Listener(family=fam)
1415 conn.send(l.address)
1416 new_conn = l.accept()
1417 conn.send(new_conn)
1419 if self.TYPE == 'processes':
1420 l = socket.socket()
1421 l.bind(('localhost', 0))
1422 conn.send(l.getsockname())
1423 l.listen(1)
1424 new_conn, addr = l.accept()
1425 conn.send(new_conn)
1427 conn.recv()
1429 def _remote(self, conn):
1430 for (address, msg) in iter(conn.recv, None):
1431 client = self.connection.Client(address)
1432 client.send(msg.upper())
1433 client.close()
1435 if self.TYPE == 'processes':
1436 address, msg = conn.recv()
1437 client = socket.socket()
1438 client.connect(address)
1439 client.sendall(msg.upper())
1440 client.close()
1442 conn.close()
1444 def test_pickling(self):
1445 try:
1446 multiprocessing.allow_connection_pickling()
1447 except ImportError:
1448 return
1450 families = self.connection.families
1452 lconn, lconn0 = self.Pipe()
1453 lp = self.Process(target=self._listener, args=(lconn0, families))
1454 lp.start()
1455 lconn0.close()
1457 rconn, rconn0 = self.Pipe()
1458 rp = self.Process(target=self._remote, args=(rconn0,))
1459 rp.start()
1460 rconn0.close()
1462 for fam in families:
1463 msg = ('This connection uses family %s' % fam).encode('ascii')
1464 address = lconn.recv()
1465 rconn.send((address, msg))
1466 new_conn = lconn.recv()
1467 self.assertEqual(new_conn.recv(), msg.upper())
1469 rconn.send(None)
1471 if self.TYPE == 'processes':
1472 msg = latin('This connection uses a normal socket')
1473 address = lconn.recv()
1474 rconn.send((address, msg))
1475 if hasattr(socket, 'fromfd'):
1476 new_conn = lconn.recv()
1477 self.assertEqual(new_conn.recv(100), msg.upper())
1478 else:
1479 # XXX On Windows with Py2.6 need to backport fromfd()
1480 discard = lconn.recv_bytes()
1482 lconn.send(None)
1484 rconn.close()
1485 lconn.close()
1487 lp.join()
1488 rp.join()
1494 class _TestHeap(BaseTestCase):
1496 ALLOWED_TYPES = ('processes',)
1498 def test_heap(self):
1499 iterations = 5000
1500 maxblocks = 50
1501 blocks = []
1503 # create and destroy lots of blocks of different sizes
1504 for i in xrange(iterations):
1505 size = int(random.lognormvariate(0, 1) * 1000)
1506 b = multiprocessing.heap.BufferWrapper(size)
1507 blocks.append(b)
1508 if len(blocks) > maxblocks:
1509 i = random.randrange(maxblocks)
1510 del blocks[i]
1512 # get the heap object
1513 heap = multiprocessing.heap.BufferWrapper._heap
1515 # verify the state of the heap
1516 all = []
1517 occupied = 0
1518 for L in heap._len_to_seq.values():
1519 for arena, start, stop in L:
1520 all.append((heap._arenas.index(arena), start, stop,
1521 stop-start, 'free'))
1522 for arena, start, stop in heap._allocated_blocks:
1523 all.append((heap._arenas.index(arena), start, stop,
1524 stop-start, 'occupied'))
1525 occupied += (stop-start)
1527 all.sort()
1529 for i in range(len(all)-1):
1530 (arena, start, stop) = all[i][:3]
1531 (narena, nstart, nstop) = all[i+1][:3]
1532 self.assertTrue((arena != narena and nstart == 0) or
1533 (stop == nstart))
1539 try:
1540 from ctypes import Structure, Value, copy, c_int, c_double
1541 except ImportError:
1542 Structure = object
1543 c_int = c_double = None
1545 class _Foo(Structure):
1546 _fields_ = [
1547 ('x', c_int),
1548 ('y', c_double)
1551 class _TestSharedCTypes(BaseTestCase):
1553 ALLOWED_TYPES = ('processes',)
1555 def _double(self, x, y, foo, arr, string):
1556 x.value *= 2
1557 y.value *= 2
1558 foo.x *= 2
1559 foo.y *= 2
1560 string.value *= 2
1561 for i in range(len(arr)):
1562 arr[i] *= 2
1564 def test_sharedctypes(self, lock=False):
1565 if c_int is None:
1566 return
1568 x = Value('i', 7, lock=lock)
1569 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1570 foo = Value(_Foo, 3, 2, lock=lock)
1571 arr = Array('d', range(10), lock=lock)
1572 string = Array('c', 20, lock=lock)
1573 string.value = 'hello'
1575 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1576 p.start()
1577 p.join()
1579 self.assertEqual(x.value, 14)
1580 self.assertAlmostEqual(y.value, 2.0/3.0)
1581 self.assertEqual(foo.x, 6)
1582 self.assertAlmostEqual(foo.y, 4.0)
1583 for i in range(10):
1584 self.assertAlmostEqual(arr[i], i*2)
1585 self.assertEqual(string.value, latin('hellohello'))
1587 def test_synchronize(self):
1588 self.test_sharedctypes(lock=True)
1590 def test_copy(self):
1591 if c_int is None:
1592 return
1594 foo = _Foo(2, 5.0)
1595 bar = copy(foo)
1596 foo.x = 0
1597 foo.y = 0
1598 self.assertEqual(bar.x, 2)
1599 self.assertAlmostEqual(bar.y, 5.0)
1605 class _TestFinalize(BaseTestCase):
1607 ALLOWED_TYPES = ('processes',)
1609 def _test_finalize(self, conn):
1610 class Foo(object):
1611 pass
1613 a = Foo()
1614 util.Finalize(a, conn.send, args=('a',))
1615 del a # triggers callback for a
1617 b = Foo()
1618 close_b = util.Finalize(b, conn.send, args=('b',))
1619 close_b() # triggers callback for b
1620 close_b() # does nothing because callback has already been called
1621 del b # does nothing because callback has already been called
1623 c = Foo()
1624 util.Finalize(c, conn.send, args=('c',))
1626 d10 = Foo()
1627 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1629 d01 = Foo()
1630 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1631 d02 = Foo()
1632 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1633 d03 = Foo()
1634 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1636 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1638 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1640 # call mutliprocessing's cleanup function then exit process without
1641 # garbage collecting locals
1642 util._exit_function()
1643 conn.close()
1644 os._exit(0)
1646 def test_finalize(self):
1647 conn, child_conn = self.Pipe()
1649 p = self.Process(target=self._test_finalize, args=(child_conn,))
1650 p.start()
1651 p.join()
1653 result = [obj for obj in iter(conn.recv, 'STOP')]
1654 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1657 # Test that from ... import * works for each module
1660 class _TestImportStar(BaseTestCase):
1662 ALLOWED_TYPES = ('processes',)
1664 def test_import(self):
1665 modules = (
1666 'multiprocessing', 'multiprocessing.connection',
1667 'multiprocessing.heap', 'multiprocessing.managers',
1668 'multiprocessing.pool', 'multiprocessing.process',
1669 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1670 'multiprocessing.synchronize', 'multiprocessing.util'
1673 for name in modules:
1674 __import__(name)
1675 mod = sys.modules[name]
1677 for attr in getattr(mod, '__all__', ()):
1678 self.assertTrue(
1679 hasattr(mod, attr),
1680 '%r does not have attribute %r' % (mod, attr)
1684 # Quick test that logging works -- does not test logging output
1687 class _TestLogging(BaseTestCase):
1689 ALLOWED_TYPES = ('processes',)
1691 def test_enable_logging(self):
1692 logger = multiprocessing.get_logger()
1693 logger.setLevel(util.SUBWARNING)
1694 self.assertTrue(logger is not None)
1695 logger.debug('this will not be printed')
1696 logger.info('nor will this')
1697 logger.setLevel(LOG_LEVEL)
1699 def _test_level(self, conn):
1700 logger = multiprocessing.get_logger()
1701 conn.send(logger.getEffectiveLevel())
1703 def test_level(self):
1704 LEVEL1 = 32
1705 LEVEL2 = 37
1707 logger = multiprocessing.get_logger()
1708 root_logger = logging.getLogger()
1709 root_level = root_logger.level
1711 reader, writer = multiprocessing.Pipe(duplex=False)
1713 logger.setLevel(LEVEL1)
1714 self.Process(target=self._test_level, args=(writer,)).start()
1715 self.assertEqual(LEVEL1, reader.recv())
1717 logger.setLevel(logging.NOTSET)
1718 root_logger.setLevel(LEVEL2)
1719 self.Process(target=self._test_level, args=(writer,)).start()
1720 self.assertEqual(LEVEL2, reader.recv())
1722 root_logger.setLevel(root_level)
1723 logger.setLevel(level=LOG_LEVEL)
1726 # class _TestLoggingProcessName(BaseTestCase):
1728 # def handle(self, record):
1729 # assert record.processName == multiprocessing.current_process().name
1730 # self.__handled = True
1732 # def test_logging(self):
1733 # handler = logging.Handler()
1734 # handler.handle = self.handle
1735 # self.__handled = False
1736 # # Bypass getLogger() and side-effects
1737 # logger = logging.getLoggerClass()(
1738 # 'multiprocessing.test.TestLoggingProcessName')
1739 # logger.addHandler(handler)
1740 # logger.propagate = False
1742 # logger.warn('foo')
1743 # assert self.__handled
1746 # Test to verify handle verification, see issue 3321
1749 class TestInvalidHandle(unittest.TestCase):
1751 def test_invalid_handles(self):
1752 if WIN32:
1753 return
1754 conn = _multiprocessing.Connection(44977608)
1755 self.assertRaises(IOError, conn.poll)
1756 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1758 # Functions used to create test cases from the base ones in this module
1761 def get_attributes(Source, names):
1762 d = {}
1763 for name in names:
1764 obj = getattr(Source, name)
1765 if type(obj) == type(get_attributes):
1766 obj = staticmethod(obj)
1767 d[name] = obj
1768 return d
1770 def create_test_cases(Mixin, type):
1771 result = {}
1772 glob = globals()
1773 Type = type[0].upper() + type[1:]
1775 for name in glob.keys():
1776 if name.startswith('_Test'):
1777 base = glob[name]
1778 if type in base.ALLOWED_TYPES:
1779 newname = 'With' + Type + name[1:]
1780 class Temp(base, unittest.TestCase, Mixin):
1781 pass
1782 result[newname] = Temp
1783 Temp.__name__ = newname
1784 Temp.__module__ = Mixin.__module__
1785 return result
1788 # Create test cases
1791 class ProcessesMixin(object):
1792 TYPE = 'processes'
1793 Process = multiprocessing.Process
1794 locals().update(get_attributes(multiprocessing, (
1795 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1796 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1797 'RawArray', 'current_process', 'active_children', 'Pipe',
1798 'connection', 'JoinableQueue'
1801 testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1802 globals().update(testcases_processes)
1805 class ManagerMixin(object):
1806 TYPE = 'manager'
1807 Process = multiprocessing.Process
1808 manager = object.__new__(multiprocessing.managers.SyncManager)
1809 locals().update(get_attributes(manager, (
1810 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1811 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1812 'Namespace', 'JoinableQueue'
1815 testcases_manager = create_test_cases(ManagerMixin, type='manager')
1816 globals().update(testcases_manager)
1819 class ThreadsMixin(object):
1820 TYPE = 'threads'
1821 Process = multiprocessing.dummy.Process
1822 locals().update(get_attributes(multiprocessing.dummy, (
1823 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1824 'Condition', 'Event', 'Value', 'Array', 'current_process',
1825 'active_children', 'Pipe', 'connection', 'dict', 'list',
1826 'Namespace', 'JoinableQueue'
1829 testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1830 globals().update(testcases_threads)
1832 class OtherTest(unittest.TestCase):
1833 # TODO: add more tests for deliver/answer challenge.
1834 def test_deliver_challenge_auth_failure(self):
1835 class _FakeConnection(object):
1836 def recv_bytes(self, size):
1837 return b'something bogus'
1838 def send_bytes(self, data):
1839 pass
1840 self.assertRaises(multiprocessing.AuthenticationError,
1841 multiprocessing.connection.deliver_challenge,
1842 _FakeConnection(), b'abc')
1844 def test_answer_challenge_auth_failure(self):
1845 class _FakeConnection(object):
1846 def __init__(self):
1847 self.count = 0
1848 def recv_bytes(self, size):
1849 self.count += 1
1850 if self.count == 1:
1851 return multiprocessing.connection.CHALLENGE
1852 elif self.count == 2:
1853 return b'something bogus'
1854 return b''
1855 def send_bytes(self, data):
1856 pass
1857 self.assertRaises(multiprocessing.AuthenticationError,
1858 multiprocessing.connection.answer_challenge,
1859 _FakeConnection(), b'abc')
1862 # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1865 def initializer(ns):
1866 ns.test += 1
1868 class TestInitializers(unittest.TestCase):
1869 def setUp(self):
1870 self.mgr = multiprocessing.Manager()
1871 self.ns = self.mgr.Namespace()
1872 self.ns.test = 0
1874 def tearDown(self):
1875 self.mgr.shutdown()
1877 def test_manager_initializer(self):
1878 m = multiprocessing.managers.SyncManager()
1879 self.assertRaises(TypeError, m.start, 1)
1880 m.start(initializer, (self.ns,))
1881 self.assertEqual(self.ns.test, 1)
1882 m.shutdown()
1884 def test_pool_initializer(self):
1885 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1886 p = multiprocessing.Pool(1, initializer, (self.ns,))
1887 p.close()
1888 p.join()
1889 self.assertEqual(self.ns.test, 1)
1892 # Issue 5155, 5313, 5331: Test process in processes
1893 # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1896 def _ThisSubProcess(q):
1897 try:
1898 item = q.get(block=False)
1899 except Queue.Empty:
1900 pass
1902 def _TestProcess(q):
1903 queue = multiprocessing.Queue()
1904 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1905 subProc.start()
1906 subProc.join()
1908 def _afunc(x):
1909 return x*x
1911 def pool_in_process():
1912 pool = multiprocessing.Pool(processes=4)
1913 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1915 class _file_like(object):
1916 def __init__(self, delegate):
1917 self._delegate = delegate
1918 self._pid = None
1920 @property
1921 def cache(self):
1922 pid = os.getpid()
1923 # There are no race conditions since fork keeps only the running thread
1924 if pid != self._pid:
1925 self._pid = pid
1926 self._cache = []
1927 return self._cache
1929 def write(self, data):
1930 self.cache.append(data)
1932 def flush(self):
1933 self._delegate.write(''.join(self.cache))
1934 self._cache = []
1936 class TestStdinBadfiledescriptor(unittest.TestCase):
1938 def test_queue_in_process(self):
1939 queue = multiprocessing.Queue()
1940 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1941 proc.start()
1942 proc.join()
1944 def test_pool_in_process(self):
1945 p = multiprocessing.Process(target=pool_in_process)
1946 p.start()
1947 p.join()
1949 def test_flushing(self):
1950 sio = StringIO()
1951 flike = _file_like(sio)
1952 flike.write('foo')
1953 proc = multiprocessing.Process(target=lambda: flike.flush())
1954 flike.flush()
1955 assert sio.getvalue() == 'foo'
1957 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1958 TestStdinBadfiledescriptor]
1964 def test_main(run=None):
1965 if sys.platform.startswith("linux"):
1966 try:
1967 lock = multiprocessing.RLock()
1968 except OSError:
1969 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
1971 if run is None:
1972 from test.test_support import run_unittest as run
1974 util.get_temp_dir() # creates temp directory for use by all processes
1976 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1978 ProcessesMixin.pool = multiprocessing.Pool(4)
1979 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1980 ManagerMixin.manager.__init__()
1981 ManagerMixin.manager.start()
1982 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
1984 testcases = (
1985 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1986 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1987 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1988 testcases_other
1991 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1992 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1993 run(suite)
1995 ThreadsMixin.pool.terminate()
1996 ProcessesMixin.pool.terminate()
1997 ManagerMixin.pool.terminate()
1998 ManagerMixin.manager.shutdown()
2000 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2002 def main():
2003 test_main(unittest.TextTestRunner(verbosity=2).run)
2005 if __name__ == '__main__':
2006 main()