2 # A higher level module for using sockets (or Windows named pipes)
4 # multiprocessing/connection.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__
= [ 'Client', 'Listener', 'Pipe' ]
19 import _multiprocessing
20 from multiprocessing
import current_process
, AuthenticationError
21 from multiprocessing
.util
import get_temp_dir
, Finalize
, sub_debug
, debug
22 from multiprocessing
.forking
import duplicate
, close
31 _mmap_counter
= itertools
.count()
33 default_family
= 'AF_INET'
34 families
= ['AF_INET']
36 if hasattr(socket
, 'AF_UNIX'):
37 default_family
= 'AF_UNIX'
38 families
+= ['AF_UNIX']
40 if sys
.platform
== 'win32':
41 default_family
= 'AF_PIPE'
42 families
+= ['AF_PIPE']
48 def arbitrary_address(family
):
50 Return an arbitrary free address for the given family
52 if family
== 'AF_INET':
53 return ('localhost', 0)
54 elif family
== 'AF_UNIX':
55 return tempfile
.mktemp(prefix
='listener-', dir=get_temp_dir())
56 elif family
== 'AF_PIPE':
57 return tempfile
.mktemp(prefix
=r
'\\.\pipe\pyc-%d-%d-' %
58 (os
.getpid(), _mmap_counter
.next()))
60 raise ValueError('unrecognized family')
63 def address_type(address
):
65 Return the types of the address
67 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
69 if type(address
) == tuple:
71 elif type(address
) is str and address
.startswith('\\\\'):
73 elif type(address
) is str:
76 raise ValueError('address type of %r unrecognized' % address
)
82 class Listener(object):
84 Returns a listener object.
86 This is a wrapper for a bound socket which is 'listening' for
87 connections, or for a Windows named pipe.
89 def __init__(self
, address
=None, family
=None, backlog
=1, authkey
=None):
90 family
= family
or (address
and address_type(address
)) \
92 address
= address
or arbitrary_address(family
)
94 if family
== 'AF_PIPE':
95 self
._listener
= PipeListener(address
, backlog
)
97 self
._listener
= SocketListener(address
, family
, backlog
)
99 if authkey
is not None and not isinstance(authkey
, bytes
):
100 raise TypeError, 'authkey should be a byte string'
102 self
._authkey
= authkey
106 Accept a connection on the bound socket or named pipe of `self`.
108 Returns a `Connection` object.
110 c
= self
._listener
.accept()
112 deliver_challenge(c
, self
._authkey
)
113 answer_challenge(c
, self
._authkey
)
118 Close the bound socket or named pipe of `self`.
120 return self
._listener
.close()
122 address
= property(lambda self
: self
._listener
._address
)
123 last_accepted
= property(lambda self
: self
._listener
._last
_accepted
)
126 def Client(address
, family
=None, authkey
=None):
128 Returns a connection to the address of a `Listener`
130 family
= family
or address_type(address
)
131 if family
== 'AF_PIPE':
132 c
= PipeClient(address
)
134 c
= SocketClient(address
)
136 if authkey
is not None and not isinstance(authkey
, bytes
):
137 raise TypeError, 'authkey should be a byte string'
139 if authkey
is not None:
140 answer_challenge(c
, authkey
)
141 deliver_challenge(c
, authkey
)
146 if sys
.platform
!= 'win32':
148 def Pipe(duplex
=True):
150 Returns pair of connection objects at either end of a pipe
153 s1
, s2
= socket
.socketpair()
154 c1
= _multiprocessing
.Connection(os
.dup(s1
.fileno()))
155 c2
= _multiprocessing
.Connection(os
.dup(s2
.fileno()))
160 c1
= _multiprocessing
.Connection(fd1
, writable
=False)
161 c2
= _multiprocessing
.Connection(fd2
, readable
=False)
167 from ._multiprocessing
import win32
169 def Pipe(duplex
=True):
171 Returns pair of connection objects at either end of a pipe
173 address
= arbitrary_address('AF_PIPE')
175 openmode
= win32
.PIPE_ACCESS_DUPLEX
176 access
= win32
.GENERIC_READ | win32
.GENERIC_WRITE
177 obsize
, ibsize
= BUFSIZE
, BUFSIZE
179 openmode
= win32
.PIPE_ACCESS_INBOUND
180 access
= win32
.GENERIC_WRITE
181 obsize
, ibsize
= 0, BUFSIZE
183 h1
= win32
.CreateNamedPipe(
185 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
187 1, obsize
, ibsize
, win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
189 h2
= win32
.CreateFile(
190 address
, access
, 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
192 win32
.SetNamedPipeHandleState(
193 h2
, win32
.PIPE_READMODE_MESSAGE
, None, None
197 win32
.ConnectNamedPipe(h1
, win32
.NULL
)
198 except WindowsError, e
:
199 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
202 c1
= _multiprocessing
.PipeConnection(h1
, writable
=duplex
)
203 c2
= _multiprocessing
.PipeConnection(h2
, readable
=duplex
)
208 # Definitions for connections based on sockets
211 class SocketListener(object):
213 Representation of a socket which is bound to an address and listening
215 def __init__(self
, address
, family
, backlog
=1):
216 self
._socket
= socket
.socket(getattr(socket
, family
))
217 self
._socket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
218 self
._socket
.bind(address
)
219 self
._socket
.listen(backlog
)
220 self
._address
= self
._socket
.getsockname()
221 self
._family
= family
222 self
._last
_accepted
= None
224 if family
== 'AF_UNIX':
225 self
._unlink
= Finalize(
226 self
, os
.unlink
, args
=(address
,), exitpriority
=0
232 s
, self
._last
_accepted
= self
._socket
.accept()
233 fd
= duplicate(s
.fileno())
234 conn
= _multiprocessing
.Connection(fd
)
240 if self
._unlink
is not None:
244 def SocketClient(address
):
246 Return a connection object connected to the socket given by `address`
248 family
= address_type(address
)
249 s
= socket
.socket( getattr(socket
, family
) )
254 except socket
.error
, e
:
255 if e
.args
[0] != errno
.ECONNREFUSED
: # connection refused
256 debug('failed to connect to address %s', address
)
264 fd
= duplicate(s
.fileno())
265 conn
= _multiprocessing
.Connection(fd
)
270 # Definitions for connections based on named pipes
273 if sys
.platform
== 'win32':
275 class PipeListener(object):
277 Representation of a named pipe
279 def __init__(self
, address
, backlog
=None):
280 self
._address
= address
281 handle
= win32
.CreateNamedPipe(
282 address
, win32
.PIPE_ACCESS_DUPLEX
,
283 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
285 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
286 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
288 self
._handle
_queue
= [handle
]
289 self
._last
_accepted
= None
291 sub_debug('listener created with address=%r', self
._address
)
293 self
.close
= Finalize(
294 self
, PipeListener
._finalize
_pipe
_listener
,
295 args
=(self
._handle
_queue
, self
._address
), exitpriority
=0
299 newhandle
= win32
.CreateNamedPipe(
300 self
._address
, win32
.PIPE_ACCESS_DUPLEX
,
301 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
303 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
304 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
306 self
._handle
_queue
.append(newhandle
)
307 handle
= self
._handle
_queue
.pop(0)
309 win32
.ConnectNamedPipe(handle
, win32
.NULL
)
310 except WindowsError, e
:
311 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
313 return _multiprocessing
.PipeConnection(handle
)
316 def _finalize_pipe_listener(queue
, address
):
317 sub_debug('closing listener with address=%r', address
)
321 def PipeClient(address
):
323 Return a connection object connected to the pipe given by `address`
327 win32
.WaitNamedPipe(address
, 1000)
328 h
= win32
.CreateFile(
329 address
, win32
.GENERIC_READ | win32
.GENERIC_WRITE
,
330 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
332 except WindowsError, e
:
333 if e
.args
[0] not in (win32
.ERROR_SEM_TIMEOUT
,
334 win32
.ERROR_PIPE_BUSY
):
341 win32
.SetNamedPipeHandleState(
342 h
, win32
.PIPE_READMODE_MESSAGE
, None, None
344 return _multiprocessing
.PipeConnection(h
)
347 # Authentication stuff
352 CHALLENGE
= b
'#CHALLENGE#'
353 WELCOME
= b
'#WELCOME#'
354 FAILURE
= b
'#FAILURE#'
356 def deliver_challenge(connection
, authkey
):
358 assert isinstance(authkey
, bytes
)
359 message
= os
.urandom(MESSAGE_LENGTH
)
360 connection
.send_bytes(CHALLENGE
+ message
)
361 digest
= hmac
.new(authkey
, message
).digest()
362 response
= connection
.recv_bytes(256) # reject large message
363 if response
== digest
:
364 connection
.send_bytes(WELCOME
)
366 connection
.send_bytes(FAILURE
)
367 raise AuthenticationError('digest received was wrong')
369 def answer_challenge(connection
, authkey
):
371 assert isinstance(authkey
, bytes
)
372 message
= connection
.recv_bytes(256) # reject large message
373 assert message
[:len(CHALLENGE
)] == CHALLENGE
, 'message = %r' % message
374 message
= message
[len(CHALLENGE
):]
375 digest
= hmac
.new(authkey
, message
).digest()
376 connection
.send_bytes(digest
)
377 response
= connection
.recv_bytes(256) # reject large message
378 if response
!= WELCOME
:
379 raise AuthenticationError('digest sent was rejected')
382 # Support for using xmlrpclib for serialization
385 class ConnectionWrapper(object):
386 def __init__(self
, conn
, dumps
, loads
):
390 for attr
in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
391 obj
= getattr(conn
, attr
)
392 setattr(self
, attr
, obj
)
395 self
._conn
.send_bytes(s
)
397 s
= self
._conn
.recv_bytes()
398 return self
._loads
(s
)
401 return xmlrpclib
.dumps((obj
,), None, None, None, 1).encode('utf8')
404 (obj
,), method
= xmlrpclib
.loads(s
.decode('utf8'))
407 class XmlListener(Listener
):
411 obj
= Listener
.accept(self
)
412 return ConnectionWrapper(obj
, _xml_dumps
, _xml_loads
)
414 def XmlClient(*args
, **kwds
):
417 return ConnectionWrapper(Client(*args
, **kwds
), _xml_dumps
, _xml_loads
)