2 # Module for starting a process object using os.fork() or CreateProcess()
4 # multiprocessing/forking.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
13 from multiprocessing
import util
, process
15 __all__
= ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
18 # Check that the current thread is spawning a child process
21 def assert_spawning(self
):
22 if not Popen
.thread_is_spawning():
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self
).__name
__
29 # Try making some callable types picklable
32 from pickle
import Pickler
33 class ForkingPickler(Pickler
):
34 dispatch
= Pickler
.dispatch
.copy()
37 def register(cls
, type, reduce):
38 def dispatcher(self
, obj
):
40 self
.save_reduce(obj
=obj
, *rv
)
41 cls
.dispatch
[type] = dispatcher
43 def _reduce_method(m
):
45 return getattr, (m
.im_class
, m
.im_func
.func_name
)
47 return getattr, (m
.im_self
, m
.im_func
.func_name
)
48 ForkingPickler
.register(type(ForkingPickler
.save
), _reduce_method
)
50 def _reduce_method_descriptor(m
):
51 return getattr, (m
.__objclass
__, m
.__name
__)
52 ForkingPickler
.register(type(list.append
), _reduce_method_descriptor
)
53 ForkingPickler
.register(type(int.__add
__), _reduce_method_descriptor
)
55 #def _reduce_builtin_function_or_method(m):
56 # return getattr, (m.__self__, m.__name__)
57 #ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
58 #ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
61 from functools
import partial
65 def _reduce_partial(p
):
66 return _rebuild_partial
, (p
.func
, p
.args
, p
.keywords
or {})
67 def _rebuild_partial(func
, args
, keywords
):
68 return partial(func
, *args
, **keywords
)
69 ForkingPickler
.register(partial
, _reduce_partial
)
75 if sys
.platform
!= 'win32':
83 # We define a Popen class similar to the one from subprocess, but
84 # whose constructor takes a process object as its argument.
89 def __init__(self
, process_obj
):
92 self
.returncode
= None
96 if 'random' in sys
.modules
:
99 code
= process_obj
._bootstrap
()
104 def poll(self
, flag
=os
.WNOHANG
):
105 if self
.returncode
is None:
106 pid
, sts
= os
.waitpid(self
.pid
, flag
)
108 if os
.WIFSIGNALED(sts
):
109 self
.returncode
= -os
.WTERMSIG(sts
)
111 assert os
.WIFEXITED(sts
)
112 self
.returncode
= os
.WEXITSTATUS(sts
)
113 return self
.returncode
115 def wait(self
, timeout
=None):
118 deadline
= time
.time() + timeout
124 remaining
= deadline
- time
.time()
127 delay
= min(delay
* 2, remaining
, 0.05)
132 if self
.returncode
is None:
134 os
.kill(self
.pid
, signal
.SIGTERM
)
136 if self
.wait(timeout
=0.1) is None:
140 def thread_is_spawning():
153 from ._multiprocessing
import win32
, Connection
, PipeConnection
154 from .util
import Finalize
157 # from cPickle import dump, load, HIGHEST_PROTOCOL
159 from pickle
import load
, HIGHEST_PROTOCOL
161 def dump(obj
, file, protocol
=None):
162 ForkingPickler(file, protocol
).dump(obj
)
169 WINEXE
= (sys
.platform
== 'win32' and getattr(sys
, 'frozen', False))
171 exit
= win32
.ExitProcess
172 close
= win32
.CloseHandle
175 # _python_exe is the assumed path to the python executable.
176 # People embedding Python want to modify it.
179 if sys
.executable
.lower().endswith('pythonservice.exe'):
180 _python_exe
= os
.path
.join(sys
.exec_prefix
, 'python.exe')
182 _python_exe
= sys
.executable
184 def set_executable(exe
):
192 def duplicate(handle
, target_process
=None, inheritable
=False):
193 if target_process
is None:
194 target_process
= _subprocess
.GetCurrentProcess()
195 return _subprocess
.DuplicateHandle(
196 _subprocess
.GetCurrentProcess(), handle
, target_process
,
197 0, inheritable
, _subprocess
.DUPLICATE_SAME_ACCESS
201 # We define a Popen class similar to the one from subprocess, but
202 # whose constructor takes a process object as its argument.
207 Start a subprocess to run the code of a process object
209 _tls
= thread
._local
()
211 def __init__(self
, process_obj
):
212 # create pipe for communication with child
215 # get handle for read end of the pipe and make it inheritable
216 rhandle
= duplicate(msvcrt
.get_osfhandle(rfd
), inheritable
=True)
220 cmd
= get_command_line() + [rhandle
]
221 cmd
= ' '.join('"%s"' % x
for x
in cmd
)
222 hp
, ht
, pid
, tid
= _subprocess
.CreateProcess(
223 _python_exe
, cmd
, None, None, 1, 0, None, None, None
228 # set attributes of self
230 self
.returncode
= None
233 # send information to child
234 prep_data
= get_preparation_data(process_obj
._name
)
235 to_child
= os
.fdopen(wfd
, 'wb')
236 Popen
._tls
.process_handle
= int(hp
)
238 dump(prep_data
, to_child
, HIGHEST_PROTOCOL
)
239 dump(process_obj
, to_child
, HIGHEST_PROTOCOL
)
241 del Popen
._tls
.process_handle
245 def thread_is_spawning():
246 return getattr(Popen
._tls
, 'process_handle', None) is not None
249 def duplicate_for_child(handle
):
250 return duplicate(handle
, Popen
._tls
.process_handle
)
252 def wait(self
, timeout
=None):
253 if self
.returncode
is None:
255 msecs
= _subprocess
.INFINITE
257 msecs
= max(0, int(timeout
* 1000 + 0.5))
259 res
= _subprocess
.WaitForSingleObject(int(self
._handle
), msecs
)
260 if res
== _subprocess
.WAIT_OBJECT_0
:
261 code
= _subprocess
.GetExitCodeProcess(self
._handle
)
262 if code
== TERMINATE
:
263 code
= -signal
.SIGTERM
264 self
.returncode
= code
266 return self
.returncode
269 return self
.wait(timeout
=0)
272 if self
.returncode
is None:
274 _subprocess
.TerminateProcess(int(self
._handle
), TERMINATE
)
276 if self
.wait(timeout
=0.1) is None:
283 def is_forking(argv
):
285 Return whether commandline indicates we are forking
287 if len(argv
) >= 2 and argv
[1] == '--multiprocessing-fork':
288 assert len(argv
) == 3
294 def freeze_support():
296 Run code for process object if this in not the main process
298 if is_forking(sys
.argv
):
303 def get_command_line():
305 Returns prefix of command line used for spawning a child process
307 if process
.current_process()._identity
==() and is_forking(sys
.argv
):
308 raise RuntimeError('''
309 Attempt to start a new process before the current process
310 has finished its bootstrapping phase.
312 This probably means that you are on Windows and you have
313 forgotten to use the proper idiom in the main module:
315 if __name__ == '__main__':
319 The "freeze_support()" line can be omitted if the program
320 is not going to be frozen to produce a Windows executable.''')
322 if getattr(sys
, 'frozen', False):
323 return [sys
.executable
, '--multiprocessing-fork']
325 prog
= 'from multiprocessing.forking import main; main()'
326 return [_python_exe
, '-c', prog
, '--multiprocessing-fork']
331 Run code specifed by data received over pipe
333 assert is_forking(sys
.argv
)
335 handle
= int(sys
.argv
[-1])
336 fd
= msvcrt
.open_osfhandle(handle
, os
.O_RDONLY
)
337 from_parent
= os
.fdopen(fd
, 'rb')
339 process
.current_process()._inheriting
= True
340 preparation_data
= load(from_parent
)
341 prepare(preparation_data
)
342 self
= load(from_parent
)
343 process
.current_process()._inheriting
= False
347 exitcode
= self
._bootstrap
()
351 def get_preparation_data(name
):
353 Return info about parent needed by child to unpickle process object
355 from .util
import _logger
, _log_to_stderr
361 log_to_stderr
=_log_to_stderr
,
362 orig_dir
=process
.ORIGINAL_DIR
,
363 authkey
=process
.current_process().authkey
,
366 if _logger
is not None:
367 d
['log_level'] = _logger
.getEffectiveLevel()
370 main_path
= getattr(sys
.modules
['__main__'], '__file__', None)
371 if not main_path
and sys
.argv
[0] not in ('', '-c'):
372 main_path
= sys
.argv
[0]
373 if main_path
is not None:
374 if not os
.path
.isabs(main_path
) and \
375 process
.ORIGINAL_DIR
is not None:
376 main_path
= os
.path
.join(process
.ORIGINAL_DIR
, main_path
)
377 d
['main_path'] = os
.path
.normpath(main_path
)
382 # Make (Pipe)Connection picklable
385 def reduce_connection(conn
):
386 if not Popen
.thread_is_spawning():
388 'By default %s objects can only be shared between processes\n'
389 'using inheritance' % type(conn
).__name
__
391 return type(conn
), (Popen
.duplicate_for_child(conn
.fileno()),
392 conn
.readable
, conn
.writable
)
394 ForkingPickler
.register(Connection
, reduce_connection
)
395 ForkingPickler
.register(PipeConnection
, reduce_connection
)
398 # Prepare current process
401 old_main_modules
= []
405 Try to get current process ready to unpickle process object
407 old_main_modules
.append(sys
.modules
['__main__'])
410 process
.current_process().name
= data
['name']
412 if 'authkey' in data
:
413 process
.current_process()._authkey
= data
['authkey']
415 if 'log_to_stderr' in data
and data
['log_to_stderr']:
418 if 'log_level' in data
:
419 util
.get_logger().setLevel(data
['log_level'])
421 if 'sys_path' in data
:
422 sys
.path
= data
['sys_path']
424 if 'sys_argv' in data
:
425 sys
.argv
= data
['sys_argv']
428 os
.chdir(data
['dir'])
430 if 'orig_dir' in data
:
431 process
.ORIGINAL_DIR
= data
['orig_dir']
433 if 'main_path' in data
:
434 main_path
= data
['main_path']
435 main_name
= os
.path
.splitext(os
.path
.basename(main_path
))[0]
436 if main_name
== '__init__':
437 main_name
= os
.path
.basename(os
.path
.dirname(main_path
))
439 if main_name
!= 'ipython':
442 if main_path
is None:
444 elif os
.path
.basename(main_path
).startswith('__init__.py'):
445 dirs
= [os
.path
.dirname(os
.path
.dirname(main_path
))]
447 dirs
= [os
.path
.dirname(main_path
)]
449 assert main_name
not in sys
.modules
, main_name
450 file, path_name
, etc
= imp
.find_module(main_name
, dirs
)
452 # We would like to do "imp.load_module('__main__', ...)"
453 # here. However, that would cause 'if __name__ ==
454 # "__main__"' clauses to be executed.
455 main_module
= imp
.load_module(
456 '__parents_main__', file, path_name
, etc
462 sys
.modules
['__main__'] = main_module
463 main_module
.__name
__ = '__main__'
465 # Try to make the potentially picklable objects in
466 # sys.modules['__main__'] realize they are in the main
467 # module -- somewhat ugly.
468 for obj
in main_module
.__dict
__.values():
470 if obj
.__module
__ == '__parents_main__':
471 obj
.__module
__ = '__main__'