Credit Nir Aides for r77288
[python.git] / Lib / multiprocessing / forking.py
blob7eda99180ac6ee93bd9b8ebaf9ac1149e32c8820
2 # Module for starting a process object using os.fork() or CreateProcess()
4 # multiprocessing/forking.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 import os
10 import sys
11 import signal
13 from multiprocessing import util, process
15 __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
18 # Check that the current thread is spawning a child process
21 def assert_spawning(self):
22 if not Popen.thread_is_spawning():
23 raise RuntimeError(
24 '%s objects should only be shared between processes'
25 ' through inheritance' % type(self).__name__
29 # Try making some callable types picklable
32 from pickle import Pickler
33 class ForkingPickler(Pickler):
34 dispatch = Pickler.dispatch.copy()
36 @classmethod
37 def register(cls, type, reduce):
38 def dispatcher(self, obj):
39 rv = reduce(obj)
40 self.save_reduce(obj=obj, *rv)
41 cls.dispatch[type] = dispatcher
43 def _reduce_method(m):
44 if m.im_self is None:
45 return getattr, (m.im_class, m.im_func.func_name)
46 else:
47 return getattr, (m.im_self, m.im_func.func_name)
48 ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
50 def _reduce_method_descriptor(m):
51 return getattr, (m.__objclass__, m.__name__)
52 ForkingPickler.register(type(list.append), _reduce_method_descriptor)
53 ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
55 #def _reduce_builtin_function_or_method(m):
56 # return getattr, (m.__self__, m.__name__)
57 #ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
58 #ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
60 try:
61 from functools import partial
62 except ImportError:
63 pass
64 else:
65 def _reduce_partial(p):
66 return _rebuild_partial, (p.func, p.args, p.keywords or {})
67 def _rebuild_partial(func, args, keywords):
68 return partial(func, *args, **keywords)
69 ForkingPickler.register(partial, _reduce_partial)
72 # Unix
75 if sys.platform != 'win32':
76 import time
78 exit = os._exit
79 duplicate = os.dup
80 close = os.close
83 # We define a Popen class similar to the one from subprocess, but
84 # whose constructor takes a process object as its argument.
87 class Popen(object):
89 def __init__(self, process_obj):
90 sys.stdout.flush()
91 sys.stderr.flush()
92 self.returncode = None
94 self.pid = os.fork()
95 if self.pid == 0:
96 if 'random' in sys.modules:
97 import random
98 random.seed()
99 code = process_obj._bootstrap()
100 sys.stdout.flush()
101 sys.stderr.flush()
102 os._exit(code)
104 def poll(self, flag=os.WNOHANG):
105 if self.returncode is None:
106 pid, sts = os.waitpid(self.pid, flag)
107 if pid == self.pid:
108 if os.WIFSIGNALED(sts):
109 self.returncode = -os.WTERMSIG(sts)
110 else:
111 assert os.WIFEXITED(sts)
112 self.returncode = os.WEXITSTATUS(sts)
113 return self.returncode
115 def wait(self, timeout=None):
116 if timeout is None:
117 return self.poll(0)
118 deadline = time.time() + timeout
119 delay = 0.0005
120 while 1:
121 res = self.poll()
122 if res is not None:
123 break
124 remaining = deadline - time.time()
125 if remaining <= 0:
126 break
127 delay = min(delay * 2, remaining, 0.05)
128 time.sleep(delay)
129 return res
131 def terminate(self):
132 if self.returncode is None:
133 try:
134 os.kill(self.pid, signal.SIGTERM)
135 except OSError, e:
136 if self.wait(timeout=0.1) is None:
137 raise
139 @staticmethod
140 def thread_is_spawning():
141 return False
144 # Windows
147 else:
148 import thread
149 import msvcrt
150 import _subprocess
151 import time
153 from ._multiprocessing import win32, Connection, PipeConnection
154 from .util import Finalize
156 #try:
157 # from cPickle import dump, load, HIGHEST_PROTOCOL
158 #except ImportError:
159 from pickle import load, HIGHEST_PROTOCOL
161 def dump(obj, file, protocol=None):
162 ForkingPickler(file, protocol).dump(obj)
168 TERMINATE = 0x10000
169 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
171 exit = win32.ExitProcess
172 close = win32.CloseHandle
175 # _python_exe is the assumed path to the python executable.
176 # People embedding Python want to modify it.
179 if sys.executable.lower().endswith('pythonservice.exe'):
180 _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
181 else:
182 _python_exe = sys.executable
184 def set_executable(exe):
185 global _python_exe
186 _python_exe = exe
192 def duplicate(handle, target_process=None, inheritable=False):
193 if target_process is None:
194 target_process = _subprocess.GetCurrentProcess()
195 return _subprocess.DuplicateHandle(
196 _subprocess.GetCurrentProcess(), handle, target_process,
197 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
198 ).Detach()
201 # We define a Popen class similar to the one from subprocess, but
202 # whose constructor takes a process object as its argument.
205 class Popen(object):
207 Start a subprocess to run the code of a process object
209 _tls = thread._local()
211 def __init__(self, process_obj):
212 # create pipe for communication with child
213 rfd, wfd = os.pipe()
215 # get handle for read end of the pipe and make it inheritable
216 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
217 os.close(rfd)
219 # start process
220 cmd = get_command_line() + [rhandle]
221 cmd = ' '.join('"%s"' % x for x in cmd)
222 hp, ht, pid, tid = _subprocess.CreateProcess(
223 _python_exe, cmd, None, None, 1, 0, None, None, None
225 ht.Close()
226 close(rhandle)
228 # set attributes of self
229 self.pid = pid
230 self.returncode = None
231 self._handle = hp
233 # send information to child
234 prep_data = get_preparation_data(process_obj._name)
235 to_child = os.fdopen(wfd, 'wb')
236 Popen._tls.process_handle = int(hp)
237 try:
238 dump(prep_data, to_child, HIGHEST_PROTOCOL)
239 dump(process_obj, to_child, HIGHEST_PROTOCOL)
240 finally:
241 del Popen._tls.process_handle
242 to_child.close()
244 @staticmethod
245 def thread_is_spawning():
246 return getattr(Popen._tls, 'process_handle', None) is not None
248 @staticmethod
249 def duplicate_for_child(handle):
250 return duplicate(handle, Popen._tls.process_handle)
252 def wait(self, timeout=None):
253 if self.returncode is None:
254 if timeout is None:
255 msecs = _subprocess.INFINITE
256 else:
257 msecs = max(0, int(timeout * 1000 + 0.5))
259 res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
260 if res == _subprocess.WAIT_OBJECT_0:
261 code = _subprocess.GetExitCodeProcess(self._handle)
262 if code == TERMINATE:
263 code = -signal.SIGTERM
264 self.returncode = code
266 return self.returncode
268 def poll(self):
269 return self.wait(timeout=0)
271 def terminate(self):
272 if self.returncode is None:
273 try:
274 _subprocess.TerminateProcess(int(self._handle), TERMINATE)
275 except WindowsError:
276 if self.wait(timeout=0.1) is None:
277 raise
283 def is_forking(argv):
285 Return whether commandline indicates we are forking
287 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
288 assert len(argv) == 3
289 return True
290 else:
291 return False
294 def freeze_support():
296 Run code for process object if this in not the main process
298 if is_forking(sys.argv):
299 main()
300 sys.exit()
303 def get_command_line():
305 Returns prefix of command line used for spawning a child process
307 if process.current_process()._identity==() and is_forking(sys.argv):
308 raise RuntimeError('''
309 Attempt to start a new process before the current process
310 has finished its bootstrapping phase.
312 This probably means that you are on Windows and you have
313 forgotten to use the proper idiom in the main module:
315 if __name__ == '__main__':
316 freeze_support()
319 The "freeze_support()" line can be omitted if the program
320 is not going to be frozen to produce a Windows executable.''')
322 if getattr(sys, 'frozen', False):
323 return [sys.executable, '--multiprocessing-fork']
324 else:
325 prog = 'from multiprocessing.forking import main; main()'
326 return [_python_exe, '-c', prog, '--multiprocessing-fork']
329 def main():
331 Run code specifed by data received over pipe
333 assert is_forking(sys.argv)
335 handle = int(sys.argv[-1])
336 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
337 from_parent = os.fdopen(fd, 'rb')
339 process.current_process()._inheriting = True
340 preparation_data = load(from_parent)
341 prepare(preparation_data)
342 self = load(from_parent)
343 process.current_process()._inheriting = False
345 from_parent.close()
347 exitcode = self._bootstrap()
348 exit(exitcode)
351 def get_preparation_data(name):
353 Return info about parent needed by child to unpickle process object
355 from .util import _logger, _log_to_stderr
357 d = dict(
358 name=name,
359 sys_path=sys.path,
360 sys_argv=sys.argv,
361 log_to_stderr=_log_to_stderr,
362 orig_dir=process.ORIGINAL_DIR,
363 authkey=process.current_process().authkey,
366 if _logger is not None:
367 d['log_level'] = _logger.getEffectiveLevel()
369 if not WINEXE:
370 main_path = getattr(sys.modules['__main__'], '__file__', None)
371 if not main_path and sys.argv[0] not in ('', '-c'):
372 main_path = sys.argv[0]
373 if main_path is not None:
374 if not os.path.isabs(main_path) and \
375 process.ORIGINAL_DIR is not None:
376 main_path = os.path.join(process.ORIGINAL_DIR, main_path)
377 d['main_path'] = os.path.normpath(main_path)
379 return d
382 # Make (Pipe)Connection picklable
385 def reduce_connection(conn):
386 if not Popen.thread_is_spawning():
387 raise RuntimeError(
388 'By default %s objects can only be shared between processes\n'
389 'using inheritance' % type(conn).__name__
391 return type(conn), (Popen.duplicate_for_child(conn.fileno()),
392 conn.readable, conn.writable)
394 ForkingPickler.register(Connection, reduce_connection)
395 ForkingPickler.register(PipeConnection, reduce_connection)
398 # Prepare current process
401 old_main_modules = []
403 def prepare(data):
405 Try to get current process ready to unpickle process object
407 old_main_modules.append(sys.modules['__main__'])
409 if 'name' in data:
410 process.current_process().name = data['name']
412 if 'authkey' in data:
413 process.current_process()._authkey = data['authkey']
415 if 'log_to_stderr' in data and data['log_to_stderr']:
416 util.log_to_stderr()
418 if 'log_level' in data:
419 util.get_logger().setLevel(data['log_level'])
421 if 'sys_path' in data:
422 sys.path = data['sys_path']
424 if 'sys_argv' in data:
425 sys.argv = data['sys_argv']
427 if 'dir' in data:
428 os.chdir(data['dir'])
430 if 'orig_dir' in data:
431 process.ORIGINAL_DIR = data['orig_dir']
433 if 'main_path' in data:
434 main_path = data['main_path']
435 main_name = os.path.splitext(os.path.basename(main_path))[0]
436 if main_name == '__init__':
437 main_name = os.path.basename(os.path.dirname(main_path))
439 if main_name != 'ipython':
440 import imp
442 if main_path is None:
443 dirs = None
444 elif os.path.basename(main_path).startswith('__init__.py'):
445 dirs = [os.path.dirname(os.path.dirname(main_path))]
446 else:
447 dirs = [os.path.dirname(main_path)]
449 assert main_name not in sys.modules, main_name
450 file, path_name, etc = imp.find_module(main_name, dirs)
451 try:
452 # We would like to do "imp.load_module('__main__', ...)"
453 # here. However, that would cause 'if __name__ ==
454 # "__main__"' clauses to be executed.
455 main_module = imp.load_module(
456 '__parents_main__', file, path_name, etc
458 finally:
459 if file:
460 file.close()
462 sys.modules['__main__'] = main_module
463 main_module.__name__ = '__main__'
465 # Try to make the potentially picklable objects in
466 # sys.modules['__main__'] realize they are in the main
467 # module -- somewhat ugly.
468 for obj in main_module.__dict__.values():
469 try:
470 if obj.__module__ == '__parents_main__':
471 obj.__module__ = '__main__'
472 except Exception:
473 pass