2 # Module to allow connection and socket objects to be transferred
5 # multiprocessing/reduction.py
7 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
17 import _multiprocessing
18 from multiprocessing
import current_process
19 from multiprocessing
.forking
import Popen
, duplicate
, close
, ForkingPickler
20 from multiprocessing
.util
import register_after_fork
, debug
, sub_debug
21 from multiprocessing
.connection
import Client
, Listener
28 if not(sys
.platform
== 'win32' or hasattr(_multiprocessing
, 'recvfd')):
29 raise ImportError('pickling of connections not supported')
32 # Platform specific definitions
35 if sys
.platform
== 'win32':
37 from ._multiprocessing
import win32
39 def send_handle(conn
, handle
, destination_pid
):
40 process_handle
= win32
.OpenProcess(
41 win32
.PROCESS_ALL_ACCESS
, False, destination_pid
44 new_handle
= duplicate(handle
, process_handle
)
49 def recv_handle(conn
):
53 def send_handle(conn
, handle
, destination_pid
):
54 _multiprocessing
.sendfd(conn
.fileno(), handle
)
56 def recv_handle(conn
):
57 return _multiprocessing
.recvfd(conn
.fileno())
60 # Support for a per-process server thread which caches pickled handles
66 global _lock
, _listener
, _cache
70 _lock
= threading
.Lock()
74 register_after_fork(_reset
, _reset
)
83 debug('starting listener and thread for sending handles')
84 _listener
= Listener(authkey
=current_process().authkey
)
85 t
= threading
.Thread(target
=_serve
)
94 from .util
import is_exiting
, sub_warning
98 conn
= _listener
.accept()
99 handle_wanted
, destination_pid
= conn
.recv()
100 _cache
.remove(handle_wanted
)
101 send_handle(conn
, handle_wanted
, destination_pid
)
108 'thread for sharing handles raised exception :\n' +
109 '-'*79 + '\n' + traceback
.format_exc() + '-'*79
113 # Functions to be used for pickling/unpickling objects with handles
116 def reduce_handle(handle
):
117 if Popen
.thread_is_spawning():
118 return (None, Popen
.duplicate_for_child(handle
), True)
119 dup_handle
= duplicate(handle
)
120 _cache
.add(dup_handle
)
121 sub_debug('reducing handle %d', handle
)
122 return (_get_listener().address
, dup_handle
, False)
124 def rebuild_handle(pickled_data
):
125 address
, handle
, inherited
= pickled_data
128 sub_debug('rebuilding handle %d', handle
)
129 conn
= Client(address
, authkey
=current_process().authkey
)
130 conn
.send((handle
, os
.getpid()))
131 new_handle
= recv_handle(conn
)
136 # Register `_multiprocessing.Connection` with `ForkingPickler`
139 def reduce_connection(conn
):
140 rh
= reduce_handle(conn
.fileno())
141 return rebuild_connection
, (rh
, conn
.readable
, conn
.writable
)
143 def rebuild_connection(reduced_handle
, readable
, writable
):
144 handle
= rebuild_handle(reduced_handle
)
145 return _multiprocessing
.Connection(
146 handle
, readable
=readable
, writable
=writable
149 ForkingPickler
.register(_multiprocessing
.Connection
, reduce_connection
)
152 # Register `socket.socket` with `ForkingPickler`
155 def fromfd(fd
, family
, type_
, proto
=0):
156 s
= socket
.fromfd(fd
, family
, type_
, proto
)
157 if s
.__class
__ is not socket
.socket
:
158 s
= socket
.socket(_sock
=s
)
161 def reduce_socket(s
):
162 reduced_handle
= reduce_handle(s
.fileno())
163 return rebuild_socket
, (reduced_handle
, s
.family
, s
.type, s
.proto
)
165 def rebuild_socket(reduced_handle
, family
, type_
, proto
):
166 fd
= rebuild_handle(reduced_handle
)
167 _sock
= fromfd(fd
, family
, type_
, proto
)
171 ForkingPickler
.register(socket
.socket
, reduce_socket
)
174 # Register `_multiprocessing.PipeConnection` with `ForkingPickler`
177 if sys
.platform
== 'win32':
179 def reduce_pipe_connection(conn
):
180 rh
= reduce_handle(conn
.fileno())
181 return rebuild_pipe_connection
, (rh
, conn
.readable
, conn
.writable
)
183 def rebuild_pipe_connection(reduced_handle
, readable
, writable
):
184 handle
= rebuild_handle(reduced_handle
)
185 return _multiprocessing
.PipeConnection(
186 handle
, readable
=readable
, writable
=writable
189 ForkingPickler
.register(_multiprocessing
.PipeConnection
, reduce_pipe_connection
)