2 # Module to allow spawning of processes on foreign host
4 # Depends on `multiprocessing` package -- tested with `processing-0.60`
6 # Copyright (c) 2006-2008, R Oudkerk
10 __all__
= ['Cluster', 'Host', 'get_logger', 'current_process']
26 import cPickle
as pickle
30 from multiprocessing
import Process
, current_process
, cpu_count
31 from multiprocessing
import util
, managers
, connection
, forking
, pool
40 _logger
= logging
.getLogger('distributing')
43 _formatter
= logging
.Formatter(util
.DEFAULT_LOGGING_FORMAT
)
44 _handler
= logging
.StreamHandler()
45 _handler
.setFormatter(_formatter
)
46 _logger
.addHandler(_handler
)
56 slot_count
= cpu_count()
57 except NotImplemented:
61 # Manager type which spawns subprocesses
64 class HostManager(managers
.SyncManager
):
66 Manager type used for spawning processes on a (presumably) foreign host
68 def __init__(self
, address
, authkey
):
69 managers
.SyncManager
.__init
__(self
, address
, authkey
)
70 self
._name
= 'Host-unknown'
72 def Process(self
, group
=None, target
=None, name
=None, args
=(), kwargs
={}):
73 if hasattr(sys
.modules
['__main__'], '__file__'):
74 main_path
= os
.path
.basename(sys
.modules
['__main__'].__file
__)
77 data
= pickle
.dumps((target
, args
, kwargs
))
78 p
= self
._RemoteProcess
(data
, main_path
)
80 temp
= self
._name
.split('Host-')[-1] + '/Process-%s'
81 name
= temp
% ':'.join(map(str, p
.get_identity()))
86 def from_address(cls
, address
, authkey
):
87 manager
= cls(address
, authkey
)
88 managers
.transact(address
, authkey
, 'dummy')
89 manager
._state
.value
= managers
.State
.STARTED
90 manager
._name
= 'Host-%s:%s' % manager
.address
91 manager
.shutdown
= util
.Finalize(
92 manager
, HostManager
._finalize
_host
,
93 args
=(manager
._address
, manager
._authkey
, manager
._name
),
99 def _finalize_host(address
, authkey
, name
):
100 managers
.transact(address
, authkey
, 'shutdown')
103 return '<Host(%s)>' % self
._name
106 # Process subclass representing a process on (possibly) a remote machine
109 class RemoteProcess(Process
):
111 Represents a process started on a remote host
113 def __init__(self
, data
, main_path
):
114 assert not main_path
or os
.path
.basename(main_path
) == main_path
115 Process
.__init
__(self
)
117 self
._main
_path
= main_path
119 def _bootstrap(self
):
120 forking
.prepare({'main_path': self
._main
_path
})
121 self
._target
, self
._args
, self
._kwargs
= pickle
.loads(self
._data
)
122 return Process
._bootstrap
(self
)
124 def get_identity(self
):
125 return self
._identity
127 HostManager
.register('_RemoteProcess', RemoteProcess
)
130 # A Pool class that uses a cluster
133 class DistributedPool(pool
.Pool
):
135 def __init__(self
, cluster
, processes
=None, initializer
=None, initargs
=()):
136 self
._cluster
= cluster
137 self
.Process
= cluster
.Process
138 pool
.Pool
.__init
__(self
, processes
or len(cluster
),
139 initializer
, initargs
)
141 def _setup_queues(self
):
142 self
._inqueue
= self
._cluster
._SettableQueue
()
143 self
._outqueue
= self
._cluster
._SettableQueue
()
144 self
._quick
_put
= self
._inqueue
.put
145 self
._quick
_get
= self
._outqueue
.get
148 def _help_stuff_finish(inqueue
, task_handler
, size
):
149 inqueue
.set_contents([None] * size
)
152 # Manager type which starts host managers on other machines
155 def LocalProcess(**kwds
):
157 p
.set_name('localhost/' + p
.name
)
160 class Cluster(managers
.SyncManager
):
162 Represents collection of slots running on various hosts.
164 `Cluster` is a subclass of `SyncManager` so it allows creation of
165 various types of shared objects.
167 def __init__(self
, hostlist
, modules
):
168 managers
.SyncManager
.__init
__(self
, address
=('localhost', 0))
169 self
._hostlist
= hostlist
170 self
._modules
= modules
171 if __name__
not in modules
:
172 modules
.append(__name__
)
173 files
= [sys
.modules
[name
].__file
__ for name
in modules
]
174 for i
, file in enumerate(files
):
175 if file.endswith('.pyc') or file.endswith('.pyo'):
176 files
[i
] = file[:-4] + '.py'
177 self
._files
= [os
.path
.abspath(file) for file in files
]
180 managers
.SyncManager
.start(self
)
182 l
= connection
.Listener(family
='AF_INET', authkey
=self
._authkey
)
184 for i
, host
in enumerate(self
._hostlist
):
185 host
._start
_manager
(i
, self
._authkey
, l
.address
, self
._files
)
187 for host
in self
._hostlist
:
188 if host
.hostname
!= 'localhost':
190 i
, address
, cpus
= conn
.recv()
192 other_host
= self
._hostlist
[i
]
193 other_host
.manager
= HostManager
.from_address(address
,
195 other_host
.slots
= other_host
.slots
or cpus
196 other_host
.Process
= other_host
.manager
.Process
198 host
.slots
= host
.slots
or slot_count
199 host
.Process
= LocalProcess
202 Slot(host
) for host
in self
._hostlist
for i
in range(host
.slots
)
204 self
._slot
_iterator
= itertools
.cycle(self
._slotlist
)
205 self
._base
_shutdown
= self
.shutdown
209 for host
in self
._hostlist
:
210 if host
.hostname
!= 'localhost':
211 host
.manager
.shutdown()
212 self
._base
_shutdown
()
214 def Process(self
, group
=None, target
=None, name
=None, args
=(), kwargs
={}):
215 slot
= self
._slot
_iterator
.next()
217 group
=group
, target
=target
, name
=name
, args
=args
, kwargs
=kwargs
220 def Pool(self
, processes
=None, initializer
=None, initargs
=()):
221 return DistributedPool(self
, processes
, initializer
, initargs
)
223 def __getitem__(self
, i
):
224 return self
._slotlist
[i
]
227 return len(self
._slotlist
)
230 return iter(self
._slotlist
)
233 # Queue subclass used by distributed pool
236 class SettableQueue(Queue
.Queue
):
238 return not self
.queue
240 return self
.maxsize
> 0 and len(self
.queue
) == self
.maxsize
241 def set_contents(self
, contents
):
242 # length of contents must be at least as large as the number of
243 # threads which have potentially called get()
244 self
.not_empty
.acquire()
247 self
.queue
.extend(contents
)
248 self
.not_empty
.notifyAll()
250 self
.not_empty
.release()
252 Cluster
.register('_SettableQueue', SettableQueue
)
255 # Class representing a notional cpu in the cluster
259 def __init__(self
, host
):
261 self
.Process
= host
.Process
269 Represents a host to use as a node in a cluster.
271 `hostname` gives the name of the host. If hostname is not
272 "localhost" then ssh is used to log in to the host. To log in as
273 a different user use a host name of the form
274 "username@somewhere.org"
276 `slots` is used to specify the number of slots for processes on
277 the host. This affects how often processes will be allocated to
278 this host. Normally this should be equal to the number of cpus on
281 def __init__(self
, hostname
, slots
=None):
282 self
.hostname
= hostname
285 def _start_manager(self
, index
, authkey
, address
, files
):
286 if self
.hostname
!= 'localhost':
287 tempdir
= copy_to_remote_temporary_directory(self
.hostname
, files
)
288 debug('startup files copied to %s:%s', self
.hostname
, tempdir
)
289 p
= subprocess
.Popen(
290 ['ssh', self
.hostname
, 'python', '-c',
291 '"import os; os.chdir(%r); '
292 'from distributing import main; main()"' % tempdir
],
293 stdin
=subprocess
.PIPE
296 name
='BoostrappingHost', index
=index
,
297 dist_log_level
=_logger
.getEffectiveLevel(),
298 dir=tempdir
, authkey
=str(authkey
), parent_address
=address
300 pickle
.dump(data
, p
.stdin
, pickle
.HIGHEST_PROTOCOL
)
304 # Copy files to remote directory, returning name of directory
308 import tempfile, os, sys, tarfile
309 tempdir = tempfile.mkdtemp(prefix='distrib-')
311 tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
317 def copy_to_remote_temporary_directory(host
, files
):
318 p
= subprocess
.Popen(
319 ['ssh', host
, 'python', '-c', unzip_code
],
320 stdout
=subprocess
.PIPE
, stdin
=subprocess
.PIPE
322 tf
= tarfile
.open(fileobj
=p
.stdin
, mode
='w|gz')
324 tf
.add(name
, os
.path
.basename(name
))
327 return p
.stdout
.read().rstrip()
330 # Code which runs a host manager
334 # get data from parent over stdin
335 data
= pickle
.load(sys
.stdin
)
339 _logger
.setLevel(data
['dist_log_level'])
340 forking
.prepare(data
)
342 # create server for a `HostManager` object
343 server
= managers
.Server(HostManager
._registry
, ('', 0), data
['authkey'])
344 current_process()._server
= server
346 # report server address and number of cpus back to parent
347 conn
= connection
.Client(data
['parent_address'], authkey
=data
['authkey'])
348 conn
.send((data
['index'], server
.address
, slot_count
))
352 current_process().set_name('Host-%s:%s' % server
.address
)
353 util
._run
_after
_forkers
()
355 # register a cleanup function
356 def cleanup(directory
):
357 debug('removing directory %s', directory
)
358 shutil
.rmtree(directory
)
359 debug('shutting down host manager')
360 util
.Finalize(None, cleanup
, args
=[data
['dir']], exitpriority
=0)
363 debug('remote host manager starting in %s', data
['dir'])
364 server
.serve_forever()