2 # Module providing the `SyncManager` class for dealing
5 # multiprocessing/managers.py
7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
10 __all__
= [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
23 from traceback
import format_exc
24 from multiprocessing
import Process
, current_process
, active_children
, Pool
, util
, connection
25 from multiprocessing
.process
import AuthenticationString
26 from multiprocessing
.forking
import exit
, Popen
, assert_spawning
, ForkingPickler
27 from multiprocessing
.util
import Finalize
, info
30 from cPickle
import PicklingError
32 from pickle
import PicklingError
35 # Register some things for pickling
39 return array
.array
, (a
.typecode
, a
.tostring())
40 ForkingPickler
.register(array
.array
, reduce_array
)
42 view_types
= [type(getattr({}, name
)()) for name
in ('items','keys','values')]
45 # Type for identifying shared objects
50 Type to uniquely indentify a shared object
52 __slots__
= ('typeid', 'address', 'id')
54 def __init__(self
, typeid
, address
, id):
55 (self
.typeid
, self
.address
, self
.id) = (typeid
, address
, id)
57 def __getstate__(self
):
58 return (self
.typeid
, self
.address
, self
.id)
60 def __setstate__(self
, state
):
61 (self
.typeid
, self
.address
, self
.id) = state
64 return 'Token(typeid=%r, address=%r, id=%r)' % \
65 (self
.typeid
, self
.address
, self
.id)
68 # Function for communication with a manager's server process
71 def dispatch(c
, id, methodname
, args
=(), kwds
={}):
73 Send a message to manager using connection `c` and return response
75 c
.send((id, methodname
, args
, kwds
))
76 kind
, result
= c
.recv()
79 raise convert_to_error(kind
, result
)
81 def convert_to_error(kind
, result
):
84 elif kind
== '#TRACEBACK':
85 assert type(result
) is str
86 return RemoteError(result
)
87 elif kind
== '#UNSERIALIZABLE':
88 assert type(result
) is str
89 return RemoteError('Unserializable message: %s\n' % result
)
91 return ValueError('Unrecognized message type')
93 class RemoteError(Exception):
95 return ('\n' + '-'*75 + '\n' + str(self
.args
[0]) + '-'*75)
98 # Functions for finding the method names of an object
101 def all_methods(obj
):
103 Return a list of names of methods of `obj`
106 for name
in dir(obj
):
107 func
= getattr(obj
, name
)
108 if hasattr(func
, '__call__'):
112 def public_methods(obj
):
114 Return a list of names of methods of `obj` which do not start with '_'
116 return [name
for name
in all_methods(obj
) if name
[0] != '_']
119 # Server which is run in a process controlled by a manager
122 class Server(object):
124 Server class which runs in a process controlled by a manager object
126 public
= ['shutdown', 'create', 'accept_connection', 'get_methods',
127 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
129 def __init__(self
, registry
, address
, authkey
, serializer
):
130 assert isinstance(authkey
, bytes
)
131 self
.registry
= registry
132 self
.authkey
= AuthenticationString(authkey
)
133 Listener
, Client
= listener_client
[serializer
]
135 # do authentication later
136 self
.listener
= Listener(address
=address
, backlog
=5)
137 self
.address
= self
.listener
.address
139 self
.id_to_obj
= {'0': (None, ())}
140 self
.id_to_refcount
= {}
141 self
.mutex
= threading
.RLock()
144 def serve_forever(self
):
146 Run the server forever
148 current_process()._manager
_server
= self
153 c
= self
.listener
.accept()
154 except (OSError, IOError):
156 t
= threading
.Thread(target
=self
.handle_request
, args
=(c
,))
159 except (KeyboardInterrupt, SystemExit):
163 self
.listener
.close()
165 def handle_request(self
, c
):
167 Handle a new connection
169 funcname
= result
= request
= None
171 connection
.deliver_challenge(c
, self
.authkey
)
172 connection
.answer_challenge(c
, self
.authkey
)
174 ignore
, funcname
, args
, kwds
= request
175 assert funcname
in self
.public
, '%r unrecognized' % funcname
176 func
= getattr(self
, funcname
)
178 msg
= ('#TRACEBACK', format_exc())
181 result
= func(c
, *args
, **kwds
)
183 msg
= ('#TRACEBACK', format_exc())
185 msg
= ('#RETURN', result
)
190 c
.send(('#TRACEBACK', format_exc()))
193 util
.info('Failure to send message: %r', msg
)
194 util
.info(' ... request was %r', request
)
195 util
.info(' ... exception was %r', e
)
199 def serve_client(self
, conn
):
201 Handle requests from the proxies in a particular process/thread
203 util
.debug('starting server thread to service %r',
204 threading
.current_thread().name
)
208 id_to_obj
= self
.id_to_obj
213 methodname
= obj
= None
215 ident
, methodname
, args
, kwds
= request
216 obj
, exposed
, gettypeid
= id_to_obj
[ident
]
218 if methodname
not in exposed
:
219 raise AttributeError(
220 'method %r of %r object is not in exposed=%r' %
221 (methodname
, type(obj
), exposed
)
224 function
= getattr(obj
, methodname
)
227 res
= function(*args
, **kwds
)
231 typeid
= gettypeid
and gettypeid
.get(methodname
, None)
233 rident
, rexposed
= self
.create(conn
, typeid
, res
)
234 token
= Token(typeid
, self
.address
, rident
)
235 msg
= ('#PROXY', (rexposed
, token
))
237 msg
= ('#RETURN', res
)
239 except AttributeError:
240 if methodname
is None:
241 msg
= ('#TRACEBACK', format_exc())
244 fallback_func
= self
.fallback_mapping
[methodname
]
245 result
= fallback_func(
246 self
, conn
, ident
, obj
, *args
, **kwds
248 msg
= ('#RETURN', result
)
250 msg
= ('#TRACEBACK', format_exc())
253 util
.debug('got EOF -- exiting thread serving %r',
254 threading
.current_thread().name
)
258 msg
= ('#TRACEBACK', format_exc())
264 send(('#UNSERIALIZABLE', repr(msg
)))
266 util
.info('exception in thread serving %r',
267 threading
.current_thread().name
)
268 util
.info(' ... message was %r', msg
)
269 util
.info(' ... exception was %r', e
)
273 def fallback_getvalue(self
, conn
, ident
, obj
):
276 def fallback_str(self
, conn
, ident
, obj
):
279 def fallback_repr(self
, conn
, ident
, obj
):
283 '__str__':fallback_str
,
284 '__repr__':fallback_repr
,
285 '#GETVALUE':fallback_getvalue
291 def debug_info(self
, c
):
293 Return some info --- useful to spot problems with refcounting
298 keys
= self
.id_to_obj
.keys()
302 result
.append(' %s: refcount=%s\n %s' %
303 (ident
, self
.id_to_refcount
[ident
],
304 str(self
.id_to_obj
[ident
][0])[:75]))
305 return '\n'.join(result
)
309 def number_of_objects(self
, c
):
311 Number of shared objects
313 return len(self
.id_to_obj
) - 1 # don't count ident='0'
315 def shutdown(self
, c
):
317 Shutdown this process
321 util
.debug('manager received shutdown message')
322 c
.send(('#RETURN', None))
324 if sys
.stdout
!= sys
.__stdout
__:
325 util
.debug('resetting stdout, stderr')
326 sys
.stdout
= sys
.__stdout
__
327 sys
.stderr
= sys
.__stderr
__
329 util
._run
_finalizers
(0)
331 for p
in active_children():
332 util
.debug('terminating a child process of manager')
335 for p
in active_children():
336 util
.debug('terminating a child process of manager')
339 util
._run
_finalizers
()
340 util
.info('manager exiting with exitcode 0')
343 traceback
.print_exc()
347 def create(self
, c
, typeid
, *args
, **kwds
):
349 Create a new shared object and return its id
353 callable, exposed
, method_to_typeid
, proxytype
= \
354 self
.registry
[typeid
]
357 assert len(args
) == 1 and not kwds
360 obj
= callable(*args
, **kwds
)
363 exposed
= public_methods(obj
)
364 if method_to_typeid
is not None:
365 assert type(method_to_typeid
) is dict
366 exposed
= list(exposed
) + list(method_to_typeid
)
368 ident
= '%x' % id(obj
) # convert to string because xmlrpclib
369 # only has 32 bit signed integers
370 util
.debug('%r callable returned object with id %r', typeid
, ident
)
372 self
.id_to_obj
[ident
] = (obj
, set(exposed
), method_to_typeid
)
373 if ident
not in self
.id_to_refcount
:
374 self
.id_to_refcount
[ident
] = 0
375 # increment the reference count immediately, to avoid
376 # this object being garbage collected before a Proxy
377 # object for it can be created. The caller of create()
378 # is responsible for doing a decref once the Proxy object
380 self
.incref(c
, ident
)
381 return ident
, tuple(exposed
)
385 def get_methods(self
, c
, token
):
387 Return the methods of the shared object indicated by token
389 return tuple(self
.id_to_obj
[token
.id][1])
391 def accept_connection(self
, c
, name
):
393 Spawn a new thread to serve this connection
395 threading
.current_thread().name
= name
396 c
.send(('#RETURN', None))
399 def incref(self
, c
, ident
):
402 self
.id_to_refcount
[ident
] += 1
406 def decref(self
, c
, ident
):
409 assert self
.id_to_refcount
[ident
] >= 1
410 self
.id_to_refcount
[ident
] -= 1
411 if self
.id_to_refcount
[ident
] == 0:
412 del self
.id_to_obj
[ident
], self
.id_to_refcount
[ident
]
413 util
.debug('disposing of obj with id %r', ident
)
418 # Class to represent state of a manager
422 __slots__
= ['value']
428 # Mapping from serializer name to Listener and Client types
432 'pickle' : (connection
.Listener
, connection
.Client
),
433 'xmlrpclib' : (connection
.XmlListener
, connection
.XmlClient
)
437 # Definition of BaseManager
440 class BaseManager(object):
442 Base class for managers
447 def __init__(self
, address
=None, authkey
=None, serializer
='pickle'):
449 authkey
= current_process().authkey
450 self
._address
= address
# XXX not final address if eg ('', 0)
451 self
._authkey
= AuthenticationString(authkey
)
452 self
._state
= State()
453 self
._state
.value
= State
.INITIAL
454 self
._serializer
= serializer
455 self
._Listener
, self
._Client
= listener_client
[serializer
]
457 def __reduce__(self
):
458 return type(self
).from_address
, \
459 (self
._address
, self
._authkey
, self
._serializer
)
461 def get_server(self
):
463 Return server object with serve_forever() method and address attribute
465 assert self
._state
.value
== State
.INITIAL
466 return Server(self
._registry
, self
._address
,
467 self
._authkey
, self
._serializer
)
471 Connect manager object to the server process
473 Listener
, Client
= listener_client
[self
._serializer
]
474 conn
= Client(self
._address
, authkey
=self
._authkey
)
475 dispatch(conn
, None, 'dummy')
476 self
._state
.value
= State
.STARTED
478 def start(self
, initializer
=None, initargs
=()):
480 Spawn a server process for this manager object
482 assert self
._state
.value
== State
.INITIAL
484 if initializer
is not None and not hasattr(initializer
, '__call__'):
485 raise TypeError('initializer must be a callable')
487 # pipe over which we will retrieve address of server
488 reader
, writer
= connection
.Pipe(duplex
=False)
490 # spawn process which runs a server
491 self
._process
= Process(
492 target
=type(self
)._run
_server
,
493 args
=(self
._registry
, self
._address
, self
._authkey
,
494 self
._serializer
, writer
, initializer
, initargs
),
496 ident
= ':'.join(str(i
) for i
in self
._process
._identity
)
497 self
._process
.name
= type(self
).__name
__ + '-' + ident
498 self
._process
.start()
500 # get address of server
502 self
._address
= reader
.recv()
505 # register a finalizer
506 self
._state
.value
= State
.STARTED
507 self
.shutdown
= util
.Finalize(
508 self
, type(self
)._finalize
_manager
,
509 args
=(self
._process
, self
._address
, self
._authkey
,
510 self
._state
, self
._Client
),
515 def _run_server(cls
, registry
, address
, authkey
, serializer
, writer
,
516 initializer
=None, initargs
=()):
518 Create a server, report its address and run it
520 if initializer
is not None:
521 initializer(*initargs
)
524 server
= cls
._Server
(registry
, address
, authkey
, serializer
)
526 # inform parent process of the server's address
527 writer
.send(server
.address
)
531 util
.info('manager serving at %r', server
.address
)
532 server
.serve_forever()
534 def _create(self
, typeid
, *args
, **kwds
):
536 Create a new shared object; return the token and exposed tuple
538 assert self
._state
.value
== State
.STARTED
, 'server not yet started'
539 conn
= self
._Client
(self
._address
, authkey
=self
._authkey
)
541 id, exposed
= dispatch(conn
, None, 'create', (typeid
,)+args
, kwds
)
544 return Token(typeid
, self
._address
, id), exposed
546 def join(self
, timeout
=None):
548 Join the manager process (if it has been spawned)
550 self
._process
.join(timeout
)
552 def _debug_info(self
):
554 Return some info about the servers shared objects and connections
556 conn
= self
._Client
(self
._address
, authkey
=self
._authkey
)
558 return dispatch(conn
, None, 'debug_info')
562 def _number_of_objects(self
):
564 Return the number of shared objects
566 conn
= self
._Client
(self
._address
, authkey
=self
._authkey
)
568 return dispatch(conn
, None, 'number_of_objects')
575 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
579 def _finalize_manager(process
, address
, authkey
, state
, _Client
):
581 Shutdown the manager process; will be registered as a finalizer
583 if process
.is_alive():
584 util
.info('sending shutdown message to manager')
586 conn
= _Client(address
, authkey
=authkey
)
588 dispatch(conn
, None, 'shutdown')
594 process
.join(timeout
=0.2)
595 if process
.is_alive():
596 util
.info('manager still alive')
597 if hasattr(process
, 'terminate'):
598 util
.info('trying to `terminate()` manager process')
600 process
.join(timeout
=0.1)
601 if process
.is_alive():
602 util
.info('manager still alive after terminate')
604 state
.value
= State
.SHUTDOWN
606 del BaseProxy
._address
_to
_local
[address
]
610 address
= property(lambda self
: self
._address
)
613 def register(cls
, typeid
, callable=None, proxytype
=None, exposed
=None,
614 method_to_typeid
=None, create_method
=True):
616 Register a typeid with the manager type
618 if '_registry' not in cls
.__dict
__:
619 cls
._registry
= cls
._registry
.copy()
621 if proxytype
is None:
622 proxytype
= AutoProxy
624 exposed
= exposed
or getattr(proxytype
, '_exposed_', None)
626 method_to_typeid
= method_to_typeid
or \
627 getattr(proxytype
, '_method_to_typeid_', None)
630 for key
, value
in method_to_typeid
.items():
631 assert type(key
) is str, '%r is not a string' % key
632 assert type(value
) is str, '%r is not a string' % value
634 cls
._registry
[typeid
] = (
635 callable, exposed
, method_to_typeid
, proxytype
639 def temp(self
, *args
, **kwds
):
640 util
.debug('requesting creation of a shared %r object', typeid
)
641 token
, exp
= self
._create
(typeid
, *args
, **kwds
)
643 token
, self
._serializer
, manager
=self
,
644 authkey
=self
._authkey
, exposed
=exp
646 conn
= self
._Client
(token
.address
, authkey
=self
._authkey
)
647 dispatch(conn
, None, 'decref', (token
.id,))
649 temp
.__name
__ = typeid
650 setattr(cls
, typeid
, temp
)
653 # Subclass of set which get cleared after a fork
656 class ProcessLocalSet(set):
658 util
.register_after_fork(self
, lambda obj
: obj
.clear())
659 def __reduce__(self
):
660 return type(self
), ()
663 # Definition of BaseProxy
666 class BaseProxy(object):
668 A base for proxies of shared objects
670 _address_to_local
= {}
671 _mutex
= util
.ForkAwareThreadLock()
673 def __init__(self
, token
, serializer
, manager
=None,
674 authkey
=None, exposed
=None, incref
=True):
675 BaseProxy
._mutex
.acquire()
677 tls_idset
= BaseProxy
._address
_to
_local
.get(token
.address
, None)
678 if tls_idset
is None:
679 tls_idset
= util
.ForkAwareLocal(), ProcessLocalSet()
680 BaseProxy
._address
_to
_local
[token
.address
] = tls_idset
682 BaseProxy
._mutex
.release()
684 # self._tls is used to record the connection used by this
685 # thread to communicate with the manager at token.address
686 self
._tls
= tls_idset
[0]
688 # self._idset is used to record the identities of all shared
689 # objects for which the current process owns references and
690 # which are in the manager at token.address
691 self
._idset
= tls_idset
[1]
694 self
._id
= self
._token
.id
695 self
._manager
= manager
696 self
._serializer
= serializer
697 self
._Client
= listener_client
[serializer
][1]
699 if authkey
is not None:
700 self
._authkey
= AuthenticationString(authkey
)
701 elif self
._manager
is not None:
702 self
._authkey
= self
._manager
._authkey
704 self
._authkey
= current_process().authkey
709 util
.register_after_fork(self
, BaseProxy
._after
_fork
)
712 util
.debug('making connection to manager')
713 name
= current_process().name
714 if threading
.current_thread().name
!= 'MainThread':
715 name
+= '|' + threading
.current_thread().name
716 conn
= self
._Client
(self
._token
.address
, authkey
=self
._authkey
)
717 dispatch(conn
, None, 'accept_connection', (name
,))
718 self
._tls
.connection
= conn
720 def _callmethod(self
, methodname
, args
=(), kwds
={}):
722 Try to call a method of the referrent and return a copy of the result
725 conn
= self
._tls
.connection
726 except AttributeError:
727 util
.debug('thread %r does not own a connection',
728 threading
.current_thread().name
)
730 conn
= self
._tls
.connection
732 conn
.send((self
._id
, methodname
, args
, kwds
))
733 kind
, result
= conn
.recv()
735 if kind
== '#RETURN':
737 elif kind
== '#PROXY':
738 exposed
, token
= result
739 proxytype
= self
._manager
._registry
[token
.typeid
][-1]
741 token
, self
._serializer
, manager
=self
._manager
,
742 authkey
=self
._authkey
, exposed
=exposed
744 conn
= self
._Client
(token
.address
, authkey
=self
._authkey
)
745 dispatch(conn
, None, 'decref', (token
.id,))
747 raise convert_to_error(kind
, result
)
751 Get a copy of the value of the referent
753 return self
._callmethod
('#GETVALUE')
756 conn
= self
._Client
(self
._token
.address
, authkey
=self
._authkey
)
757 dispatch(conn
, None, 'incref', (self
._id
,))
758 util
.debug('INCREF %r', self
._token
.id)
760 self
._idset
.add(self
._id
)
762 state
= self
._manager
and self
._manager
._state
764 self
._close
= util
.Finalize(
765 self
, BaseProxy
._decref
,
766 args
=(self
._token
, self
._authkey
, state
,
767 self
._tls
, self
._idset
, self
._Client
),
772 def _decref(token
, authkey
, state
, tls
, idset
, _Client
):
773 idset
.discard(token
.id)
775 # check whether manager is still alive
776 if state
is None or state
.value
== State
.STARTED
:
777 # tell manager this process no longer cares about referent
779 util
.debug('DECREF %r', token
.id)
780 conn
= _Client(token
.address
, authkey
=authkey
)
781 dispatch(conn
, None, 'decref', (token
.id,))
783 util
.debug('... decref failed %s', e
)
786 util
.debug('DECREF %r -- manager already shutdown', token
.id)
788 # check whether we can close this thread's connection because
789 # the process owns no more references to objects for this manager
790 if not idset
and hasattr(tls
, 'connection'):
791 util
.debug('thread %r has no more proxies so closing conn',
792 threading
.current_thread().name
)
793 tls
.connection
.close()
796 def _after_fork(self
):
801 # the proxy may just be for a manager which has shutdown
802 util
.info('incref failed: %s' % e
)
804 def __reduce__(self
):
806 if Popen
.thread_is_spawning():
807 kwds
['authkey'] = self
._authkey
809 if getattr(self
, '_isauto', False):
810 kwds
['exposed'] = self
._exposed
_
811 return (RebuildProxy
,
812 (AutoProxy
, self
._token
, self
._serializer
, kwds
))
814 return (RebuildProxy
,
815 (type(self
), self
._token
, self
._serializer
, kwds
))
817 def __deepcopy__(self
, memo
):
818 return self
._getvalue
()
821 return '<%s object, typeid %r at %s>' % \
822 (type(self
).__name
__, self
._token
.typeid
, '0x%x' % id(self
))
826 Return representation of the referent (or a fall-back if that fails)
829 return self
._callmethod
('__repr__')
831 return repr(self
)[:-1] + "; '__str__()' failed>"
834 # Function used for unpickling
837 def RebuildProxy(func
, token
, serializer
, kwds
):
839 Function used for unpickling proxy objects.
841 If possible the shared object is returned, or otherwise a proxy for it.
843 server
= getattr(current_process(), '_manager_server', None)
845 if server
and server
.address
== token
.address
:
846 return server
.id_to_obj
[token
.id][0]
849 kwds
.pop('incref', True) and
850 not getattr(current_process(), '_inheriting', False)
852 return func(token
, serializer
, incref
=incref
, **kwds
)
855 # Functions to create proxies and proxy types
858 def MakeProxyType(name
, exposed
, _cache
={}):
860 Return an proxy type whose methods are given by `exposed`
862 exposed
= tuple(exposed
)
864 return _cache
[(name
, exposed
)]
871 exec '''def %s(self, *args, **kwds):
872 return self._callmethod(%r, args, kwds)''' % (meth
, meth
) in dic
874 ProxyType
= type(name
, (BaseProxy
,), dic
)
875 ProxyType
._exposed
_ = exposed
876 _cache
[(name
, exposed
)] = ProxyType
880 def AutoProxy(token
, serializer
, manager
=None, authkey
=None,
881 exposed
=None, incref
=True):
883 Return an auto-proxy for `token`
885 _Client
= listener_client
[serializer
][1]
888 conn
= _Client(token
.address
, authkey
=authkey
)
890 exposed
= dispatch(conn
, None, 'get_methods', (token
,))
894 if authkey
is None and manager
is not None:
895 authkey
= manager
._authkey
897 authkey
= current_process().authkey
899 ProxyType
= MakeProxyType('AutoProxy[%s]' % token
.typeid
, exposed
)
900 proxy
= ProxyType(token
, serializer
, manager
=manager
, authkey
=authkey
,
906 # Types/callables which we will register with SyncManager
909 class Namespace(object):
910 def __init__(self
, **kwds
):
911 self
.__dict
__.update(kwds
)
913 items
= self
.__dict
__.items()
915 for name
, value
in items
:
916 if not name
.startswith('_'):
917 temp
.append('%s=%r' % (name
, value
))
919 return 'Namespace(%s)' % str.join(', ', temp
)
922 def __init__(self
, typecode
, value
, lock
=True):
923 self
._typecode
= typecode
927 def set(self
, value
):
930 return '%s(%r, %r)'%(type(self
).__name
__, self
._typecode
, self
._value
)
931 value
= property(get
, set)
933 def Array(typecode
, sequence
, lock
=True):
934 return array
.array(typecode
, sequence
)
937 # Proxy types used by SyncManager
940 class IteratorProxy(BaseProxy
):
941 # XXX remove methods for Py3.0 and Py2.6
942 _exposed_
= ('__next__', 'next', 'send', 'throw', 'close')
945 def __next__(self
, *args
):
946 return self
._callmethod
('__next__', args
)
947 def next(self
, *args
):
948 return self
._callmethod
('next', args
)
949 def send(self
, *args
):
950 return self
._callmethod
('send', args
)
951 def throw(self
, *args
):
952 return self
._callmethod
('throw', args
)
953 def close(self
, *args
):
954 return self
._callmethod
('close', args
)
957 class AcquirerProxy(BaseProxy
):
958 _exposed_
= ('acquire', 'release')
959 def acquire(self
, blocking
=True):
960 return self
._callmethod
('acquire', (blocking
,))
962 return self
._callmethod
('release')
964 return self
._callmethod
('acquire')
965 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
966 return self
._callmethod
('release')
969 class ConditionProxy(AcquirerProxy
):
970 # XXX will Condition.notfyAll() name be available in Py3.0?
971 _exposed_
= ('acquire', 'release', 'wait', 'notify', 'notify_all')
972 def wait(self
, timeout
=None):
973 return self
._callmethod
('wait', (timeout
,))
975 return self
._callmethod
('notify')
976 def notify_all(self
):
977 return self
._callmethod
('notify_all')
979 class EventProxy(BaseProxy
):
980 _exposed_
= ('is_set', 'set', 'clear', 'wait')
982 return self
._callmethod
('is_set')
984 return self
._callmethod
('set')
986 return self
._callmethod
('clear')
987 def wait(self
, timeout
=None):
988 return self
._callmethod
('wait', (timeout
,))
990 class NamespaceProxy(BaseProxy
):
991 _exposed_
= ('__getattribute__', '__setattr__', '__delattr__')
992 def __getattr__(self
, key
):
994 return object.__getattribute
__(self
, key
)
995 callmethod
= object.__getattribute
__(self
, '_callmethod')
996 return callmethod('__getattribute__', (key
,))
997 def __setattr__(self
, key
, value
):
999 return object.__setattr
__(self
, key
, value
)
1000 callmethod
= object.__getattribute
__(self
, '_callmethod')
1001 return callmethod('__setattr__', (key
, value
))
1002 def __delattr__(self
, key
):
1004 return object.__delattr
__(self
, key
)
1005 callmethod
= object.__getattribute
__(self
, '_callmethod')
1006 return callmethod('__delattr__', (key
,))
1009 class ValueProxy(BaseProxy
):
1010 _exposed_
= ('get', 'set')
1012 return self
._callmethod
('get')
1013 def set(self
, value
):
1014 return self
._callmethod
('set', (value
,))
1015 value
= property(get
, set)
1018 BaseListProxy
= MakeProxyType('BaseListProxy', (
1019 '__add__', '__contains__', '__delitem__', '__delslice__',
1020 '__getitem__', '__getslice__', '__len__', '__mul__',
1021 '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1022 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1023 'reverse', 'sort', '__imul__'
1024 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1025 class ListProxy(BaseListProxy
):
1026 def __iadd__(self
, value
):
1027 self
._callmethod
('extend', (value
,))
1029 def __imul__(self
, value
):
1030 self
._callmethod
('__imul__', (value
,))
1034 DictProxy
= MakeProxyType('DictProxy', (
1035 '__contains__', '__delitem__', '__getitem__', '__len__',
1036 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1037 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1041 ArrayProxy
= MakeProxyType('ArrayProxy', (
1042 '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1043 )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
1046 PoolProxy
= MakeProxyType('PoolProxy', (
1047 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1048 'map', 'map_async', 'terminate'
1050 PoolProxy
._method
_to
_typeid
_ = {
1051 'apply_async': 'AsyncResult',
1052 'map_async': 'AsyncResult',
1054 'imap_unordered': 'Iterator'
1058 # Definition of SyncManager
1061 class SyncManager(BaseManager
):
1063 Subclass of `BaseManager` which supports a number of shared object types.
1065 The types registered are those intended for the synchronization
1066 of threads, plus `dict`, `list` and `Namespace`.
1068 The `multiprocessing.Manager()` function creates started instances of
1072 SyncManager
.register('Queue', Queue
.Queue
)
1073 SyncManager
.register('JoinableQueue', Queue
.Queue
)
1074 SyncManager
.register('Event', threading
.Event
, EventProxy
)
1075 SyncManager
.register('Lock', threading
.Lock
, AcquirerProxy
)
1076 SyncManager
.register('RLock', threading
.RLock
, AcquirerProxy
)
1077 SyncManager
.register('Semaphore', threading
.Semaphore
, AcquirerProxy
)
1078 SyncManager
.register('BoundedSemaphore', threading
.BoundedSemaphore
,
1080 SyncManager
.register('Condition', threading
.Condition
, ConditionProxy
)
1081 SyncManager
.register('Pool', Pool
, PoolProxy
)
1082 SyncManager
.register('list', list, ListProxy
)
1083 SyncManager
.register('dict', dict, DictProxy
)
1084 SyncManager
.register('Value', Value
, ValueProxy
)
1085 SyncManager
.register('Array', Array
, ArrayProxy
)
1086 SyncManager
.register('Namespace', Namespace
, NamespaceProxy
)
1088 # types returned by methods of PoolProxy
1089 SyncManager
.register('Iterator', proxytype
=IteratorProxy
, create_method
=False)
1090 SyncManager
.register('AsyncResult', create_method
=False)