Some platforms have rl_completion_append_character but not rl_completion_suppress_append.
[python.git] / Doc / includes / mp_distributing.py
blob063fef9cba85fe81b0a877a0fb16e29b86284b91
2 # Module to allow spawning of processes on foreign host
4 # Depends on `multiprocessing` package -- tested with `processing-0.60`
6 # Copyright (c) 2006-2008, R Oudkerk
7 # All rights reserved.
10 __all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
13 # Imports
16 import sys
17 import os
18 import tarfile
19 import shutil
20 import subprocess
21 import logging
22 import itertools
23 import Queue
25 try:
26 import cPickle as pickle
27 except ImportError:
28 import pickle
30 from multiprocessing import Process, current_process, cpu_count
31 from multiprocessing import util, managers, connection, forking, pool
34 # Logging
37 def get_logger():
38 return _logger
40 _logger = logging.getLogger('distributing')
41 _logger.propagate = 0
43 _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
44 _handler = logging.StreamHandler()
45 _handler.setFormatter(_formatter)
46 _logger.addHandler(_handler)
48 info = _logger.info
49 debug = _logger.debug
52 # Get number of cpus
55 try:
56 slot_count = cpu_count()
57 except NotImplemented:
58 slot_count = 1
61 # Manager type which spawns subprocesses
64 class HostManager(managers.SyncManager):
65 '''
66 Manager type used for spawning processes on a (presumably) foreign host
67 '''
68 def __init__(self, address, authkey):
69 managers.SyncManager.__init__(self, address, authkey)
70 self._name = 'Host-unknown'
72 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
73 if hasattr(sys.modules['__main__'], '__file__'):
74 main_path = os.path.basename(sys.modules['__main__'].__file__)
75 else:
76 main_path = None
77 data = pickle.dumps((target, args, kwargs))
78 p = self._RemoteProcess(data, main_path)
79 if name is None:
80 temp = self._name.split('Host-')[-1] + '/Process-%s'
81 name = temp % ':'.join(map(str, p.get_identity()))
82 p.set_name(name)
83 return p
85 @classmethod
86 def from_address(cls, address, authkey):
87 manager = cls(address, authkey)
88 managers.transact(address, authkey, 'dummy')
89 manager._state.value = managers.State.STARTED
90 manager._name = 'Host-%s:%s' % manager.address
91 manager.shutdown = util.Finalize(
92 manager, HostManager._finalize_host,
93 args=(manager._address, manager._authkey, manager._name),
94 exitpriority=-10
96 return manager
98 @staticmethod
99 def _finalize_host(address, authkey, name):
100 managers.transact(address, authkey, 'shutdown')
102 def __repr__(self):
103 return '<Host(%s)>' % self._name
106 # Process subclass representing a process on (possibly) a remote machine
109 class RemoteProcess(Process):
111 Represents a process started on a remote host
113 def __init__(self, data, main_path):
114 assert not main_path or os.path.basename(main_path) == main_path
115 Process.__init__(self)
116 self._data = data
117 self._main_path = main_path
119 def _bootstrap(self):
120 forking.prepare({'main_path': self._main_path})
121 self._target, self._args, self._kwargs = pickle.loads(self._data)
122 return Process._bootstrap(self)
124 def get_identity(self):
125 return self._identity
127 HostManager.register('_RemoteProcess', RemoteProcess)
130 # A Pool class that uses a cluster
133 class DistributedPool(pool.Pool):
135 def __init__(self, cluster, processes=None, initializer=None, initargs=()):
136 self._cluster = cluster
137 self.Process = cluster.Process
138 pool.Pool.__init__(self, processes or len(cluster),
139 initializer, initargs)
141 def _setup_queues(self):
142 self._inqueue = self._cluster._SettableQueue()
143 self._outqueue = self._cluster._SettableQueue()
144 self._quick_put = self._inqueue.put
145 self._quick_get = self._outqueue.get
147 @staticmethod
148 def _help_stuff_finish(inqueue, task_handler, size):
149 inqueue.set_contents([None] * size)
152 # Manager type which starts host managers on other machines
155 def LocalProcess(**kwds):
156 p = Process(**kwds)
157 p.set_name('localhost/' + p.name)
158 return p
160 class Cluster(managers.SyncManager):
162 Represents collection of slots running on various hosts.
164 `Cluster` is a subclass of `SyncManager` so it allows creation of
165 various types of shared objects.
167 def __init__(self, hostlist, modules):
168 managers.SyncManager.__init__(self, address=('localhost', 0))
169 self._hostlist = hostlist
170 self._modules = modules
171 if __name__ not in modules:
172 modules.append(__name__)
173 files = [sys.modules[name].__file__ for name in modules]
174 for i, file in enumerate(files):
175 if file.endswith('.pyc') or file.endswith('.pyo'):
176 files[i] = file[:-4] + '.py'
177 self._files = [os.path.abspath(file) for file in files]
179 def start(self):
180 managers.SyncManager.start(self)
182 l = connection.Listener(family='AF_INET', authkey=self._authkey)
184 for i, host in enumerate(self._hostlist):
185 host._start_manager(i, self._authkey, l.address, self._files)
187 for host in self._hostlist:
188 if host.hostname != 'localhost':
189 conn = l.accept()
190 i, address, cpus = conn.recv()
191 conn.close()
192 other_host = self._hostlist[i]
193 other_host.manager = HostManager.from_address(address,
194 self._authkey)
195 other_host.slots = other_host.slots or cpus
196 other_host.Process = other_host.manager.Process
197 else:
198 host.slots = host.slots or slot_count
199 host.Process = LocalProcess
201 self._slotlist = [
202 Slot(host) for host in self._hostlist for i in range(host.slots)
204 self._slot_iterator = itertools.cycle(self._slotlist)
205 self._base_shutdown = self.shutdown
206 del self.shutdown
208 def shutdown(self):
209 for host in self._hostlist:
210 if host.hostname != 'localhost':
211 host.manager.shutdown()
212 self._base_shutdown()
214 def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
215 slot = self._slot_iterator.next()
216 return slot.Process(
217 group=group, target=target, name=name, args=args, kwargs=kwargs
220 def Pool(self, processes=None, initializer=None, initargs=()):
221 return DistributedPool(self, processes, initializer, initargs)
223 def __getitem__(self, i):
224 return self._slotlist[i]
226 def __len__(self):
227 return len(self._slotlist)
229 def __iter__(self):
230 return iter(self._slotlist)
233 # Queue subclass used by distributed pool
236 class SettableQueue(Queue.Queue):
237 def empty(self):
238 return not self.queue
239 def full(self):
240 return self.maxsize > 0 and len(self.queue) == self.maxsize
241 def set_contents(self, contents):
242 # length of contents must be at least as large as the number of
243 # threads which have potentially called get()
244 self.not_empty.acquire()
245 try:
246 self.queue.clear()
247 self.queue.extend(contents)
248 self.not_empty.notifyAll()
249 finally:
250 self.not_empty.release()
252 Cluster.register('_SettableQueue', SettableQueue)
255 # Class representing a notional cpu in the cluster
258 class Slot(object):
259 def __init__(self, host):
260 self.host = host
261 self.Process = host.Process
264 # Host
267 class Host(object):
269 Represents a host to use as a node in a cluster.
271 `hostname` gives the name of the host. If hostname is not
272 "localhost" then ssh is used to log in to the host. To log in as
273 a different user use a host name of the form
274 "username@somewhere.org"
276 `slots` is used to specify the number of slots for processes on
277 the host. This affects how often processes will be allocated to
278 this host. Normally this should be equal to the number of cpus on
279 that host.
281 def __init__(self, hostname, slots=None):
282 self.hostname = hostname
283 self.slots = slots
285 def _start_manager(self, index, authkey, address, files):
286 if self.hostname != 'localhost':
287 tempdir = copy_to_remote_temporary_directory(self.hostname, files)
288 debug('startup files copied to %s:%s', self.hostname, tempdir)
289 p = subprocess.Popen(
290 ['ssh', self.hostname, 'python', '-c',
291 '"import os; os.chdir(%r); '
292 'from distributing import main; main()"' % tempdir],
293 stdin=subprocess.PIPE
295 data = dict(
296 name='BoostrappingHost', index=index,
297 dist_log_level=_logger.getEffectiveLevel(),
298 dir=tempdir, authkey=str(authkey), parent_address=address
300 pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
301 p.stdin.close()
304 # Copy files to remote directory, returning name of directory
307 unzip_code = '''"
308 import tempfile, os, sys, tarfile
309 tempdir = tempfile.mkdtemp(prefix='distrib-')
310 os.chdir(tempdir)
311 tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
312 for ti in tf:
313 tf.extract(ti)
314 print tempdir
315 "'''
317 def copy_to_remote_temporary_directory(host, files):
318 p = subprocess.Popen(
319 ['ssh', host, 'python', '-c', unzip_code],
320 stdout=subprocess.PIPE, stdin=subprocess.PIPE
322 tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
323 for name in files:
324 tf.add(name, os.path.basename(name))
325 tf.close()
326 p.stdin.close()
327 return p.stdout.read().rstrip()
330 # Code which runs a host manager
333 def main():
334 # get data from parent over stdin
335 data = pickle.load(sys.stdin)
336 sys.stdin.close()
338 # set some stuff
339 _logger.setLevel(data['dist_log_level'])
340 forking.prepare(data)
342 # create server for a `HostManager` object
343 server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
344 current_process()._server = server
346 # report server address and number of cpus back to parent
347 conn = connection.Client(data['parent_address'], authkey=data['authkey'])
348 conn.send((data['index'], server.address, slot_count))
349 conn.close()
351 # set name etc
352 current_process().set_name('Host-%s:%s' % server.address)
353 util._run_after_forkers()
355 # register a cleanup function
356 def cleanup(directory):
357 debug('removing directory %s', directory)
358 shutil.rmtree(directory)
359 debug('shutting down host manager')
360 util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
362 # start host manager
363 debug('remote host manager starting in %s', data['dir'])
364 server.serve_forever()