2 # A higher level module for using sockets (or Windows named pipes)
4 # multiprocessing/connection.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__
= [ 'Client', 'Listener', 'Pipe' ]
19 import _multiprocessing
20 from multiprocessing
import current_process
21 from multiprocessing
.util
import get_temp_dir
, Finalize
, sub_debug
, debug
22 from multiprocessing
.forking
import duplicate
, close
31 _mmap_counter
= itertools
.count()
33 default_family
= 'AF_INET'
34 families
= ['AF_INET']
36 if hasattr(socket
, 'AF_UNIX'):
37 default_family
= 'AF_UNIX'
38 families
+= ['AF_UNIX']
40 if sys
.platform
== 'win32':
41 default_family
= 'AF_PIPE'
42 families
+= ['AF_PIPE']
48 def arbitrary_address(family
):
50 Return an arbitrary free address for the given family
52 if family
== 'AF_INET':
53 return ('localhost', 0)
54 elif family
== 'AF_UNIX':
55 return tempfile
.mktemp(prefix
='listener-', dir=get_temp_dir())
56 elif family
== 'AF_PIPE':
57 return tempfile
.mktemp(prefix
=r
'\\.\pipe\pyc-%d-%d-' %
58 (os
.getpid(), _mmap_counter
.next()))
60 raise ValueError('unrecognized family')
63 def address_type(address
):
65 Return the types of the address
67 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
69 if type(address
) == tuple:
71 elif type(address
) is str and address
.startswith('\\\\'):
73 elif type(address
) is str:
76 raise ValueError('address type of %r unrecognized' % address
)
82 class Listener(object):
84 Returns a listener object.
86 This is a wrapper for a bound socket which is 'listening' for
87 connections, or for a Windows named pipe.
89 def __init__(self
, address
=None, family
=None, backlog
=1, authkey
=None):
90 family
= family
or (address
and address_type(address
)) \
92 address
= address
or arbitrary_address(family
)
94 if family
== 'AF_PIPE':
95 self
._listener
= PipeListener(address
, backlog
)
97 self
._listener
= SocketListener(address
, family
, backlog
)
99 if authkey
is not None and not isinstance(authkey
, bytes
):
100 raise TypeError, 'authkey should be a byte string'
102 self
._authkey
= authkey
106 Accept a connection on the bound socket or named pipe of `self`.
108 Returns a `Connection` object.
110 c
= self
._listener
.accept()
112 deliver_challenge(c
, self
._authkey
)
113 answer_challenge(c
, self
._authkey
)
118 Close the bound socket or named pipe of `self`.
120 return self
._listener
.close()
122 address
= property(lambda self
: self
._listener
._address
)
123 last_accepted
= property(lambda self
: self
._listener
._last
_accepted
)
126 def Client(address
, family
=None, authkey
=None):
128 Returns a connection to the address of a `Listener`
130 family
= family
or address_type(address
)
131 if family
== 'AF_PIPE':
132 c
= PipeClient(address
)
134 c
= SocketClient(address
)
136 if authkey
is not None and not isinstance(authkey
, bytes
):
137 raise TypeError, 'authkey should be a byte string'
139 if authkey
is not None:
140 answer_challenge(c
, authkey
)
141 deliver_challenge(c
, authkey
)
146 if sys
.platform
!= 'win32':
148 def Pipe(duplex
=True):
150 Returns pair of connection objects at either end of a pipe
153 s1
, s2
= socket
.socketpair()
154 c1
= _multiprocessing
.Connection(os
.dup(s1
.fileno()))
155 c2
= _multiprocessing
.Connection(os
.dup(s2
.fileno()))
160 c1
= _multiprocessing
.Connection(fd1
, writable
=False)
161 c2
= _multiprocessing
.Connection(fd2
, readable
=False)
167 from ._multiprocessing
import win32
169 def Pipe(duplex
=True):
171 Returns pair of connection objects at either end of a pipe
173 address
= arbitrary_address('AF_PIPE')
175 openmode
= win32
.PIPE_ACCESS_DUPLEX
176 access
= win32
.GENERIC_READ | win32
.GENERIC_WRITE
177 obsize
, ibsize
= BUFSIZE
, BUFSIZE
179 openmode
= win32
.PIPE_ACCESS_INBOUND
180 access
= win32
.GENERIC_WRITE
181 obsize
, ibsize
= 0, BUFSIZE
183 h1
= win32
.CreateNamedPipe(
185 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
187 1, obsize
, ibsize
, win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
189 h2
= win32
.CreateFile(
190 address
, access
, 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
192 win32
.SetNamedPipeHandleState(
193 h2
, win32
.PIPE_READMODE_MESSAGE
, None, None
197 win32
.ConnectNamedPipe(h1
, win32
.NULL
)
198 except WindowsError, e
:
199 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
202 c1
= _multiprocessing
.PipeConnection(h1
, writable
=duplex
)
203 c2
= _multiprocessing
.PipeConnection(h2
, readable
=duplex
)
208 # Definitions for connections based on sockets
211 class SocketListener(object):
213 Representation of a socket which is bound to an address and listening
215 def __init__(self
, address
, family
, backlog
=1):
216 self
._socket
= socket
.socket(getattr(socket
, family
))
217 self
._socket
.bind(address
)
218 self
._socket
.listen(backlog
)
219 self
._address
= self
._socket
.getsockname()
220 self
._family
= family
221 self
._last
_accepted
= None
223 if family
== 'AF_UNIX':
224 self
._unlink
= Finalize(
225 self
, os
.unlink
, args
=(address
,), exitpriority
=0
231 s
, self
._last
_accepted
= self
._socket
.accept()
232 fd
= duplicate(s
.fileno())
233 conn
= _multiprocessing
.Connection(fd
)
239 if self
._unlink
is not None:
243 def SocketClient(address
):
245 Return a connection object connected to the socket given by `address`
247 family
= address_type(address
)
248 s
= socket
.socket( getattr(socket
, family
) )
253 except socket
.error
, e
:
254 if e
.args
[0] != errno
.ECONNREFUSED
: # connection refused
255 debug('failed to connect to address %s', address
)
263 fd
= duplicate(s
.fileno())
264 conn
= _multiprocessing
.Connection(fd
)
269 # Definitions for connections based on named pipes
272 if sys
.platform
== 'win32':
274 class PipeListener(object):
276 Representation of a named pipe
278 def __init__(self
, address
, backlog
=None):
279 self
._address
= address
280 handle
= win32
.CreateNamedPipe(
281 address
, win32
.PIPE_ACCESS_DUPLEX
,
282 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
284 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
285 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
287 self
._handle
_queue
= [handle
]
288 self
._last
_accepted
= None
290 sub_debug('listener created with address=%r', self
._address
)
292 self
.close
= Finalize(
293 self
, PipeListener
._finalize
_pipe
_listener
,
294 args
=(self
._handle
_queue
, self
._address
), exitpriority
=0
298 newhandle
= win32
.CreateNamedPipe(
299 self
._address
, win32
.PIPE_ACCESS_DUPLEX
,
300 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
302 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
303 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
305 self
._handle
_queue
.append(newhandle
)
306 handle
= self
._handle
_queue
.pop(0)
308 win32
.ConnectNamedPipe(handle
, win32
.NULL
)
309 except WindowsError, e
:
310 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
312 return _multiprocessing
.PipeConnection(handle
)
315 def _finalize_pipe_listener(queue
, address
):
316 sub_debug('closing listener with address=%r', address
)
320 def PipeClient(address
):
322 Return a connection object connected to the pipe given by `address`
326 win32
.WaitNamedPipe(address
, 1000)
327 h
= win32
.CreateFile(
328 address
, win32
.GENERIC_READ | win32
.GENERIC_WRITE
,
329 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
331 except WindowsError, e
:
332 if e
.args
[0] not in (win32
.ERROR_SEM_TIMEOUT
,
333 win32
.ERROR_PIPE_BUSY
):
340 win32
.SetNamedPipeHandleState(
341 h
, win32
.PIPE_READMODE_MESSAGE
, None, None
343 return _multiprocessing
.PipeConnection(h
)
346 # Authentication stuff
351 CHALLENGE
= b
'#CHALLENGE#'
352 WELCOME
= b
'#WELCOME#'
353 FAILURE
= b
'#FAILURE#'
355 def deliver_challenge(connection
, authkey
):
357 assert isinstance(authkey
, bytes
)
358 message
= os
.urandom(MESSAGE_LENGTH
)
359 connection
.send_bytes(CHALLENGE
+ message
)
360 digest
= hmac
.new(authkey
, message
).digest()
361 response
= connection
.recv_bytes(256) # reject large message
362 if response
== digest
:
363 connection
.send_bytes(WELCOME
)
365 connection
.send_bytes(FAILURE
)
366 raise AuthenticationError('digest received was wrong')
368 def answer_challenge(connection
, authkey
):
370 assert isinstance(authkey
, bytes
)
371 message
= connection
.recv_bytes(256) # reject large message
372 assert message
[:len(CHALLENGE
)] == CHALLENGE
, 'message = %r' % message
373 message
= message
[len(CHALLENGE
):]
374 digest
= hmac
.new(authkey
, message
).digest()
375 connection
.send_bytes(digest
)
376 response
= connection
.recv_bytes(256) # reject large message
377 if response
!= WELCOME
:
378 raise AuthenticationError('digest sent was rejected')
381 # Support for using xmlrpclib for serialization
384 class ConnectionWrapper(object):
385 def __init__(self
, conn
, dumps
, loads
):
389 for attr
in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
390 obj
= getattr(conn
, attr
)
391 setattr(self
, attr
, obj
)
394 self
._conn
.send_bytes(s
)
396 s
= self
._conn
.recv_bytes()
397 return self
._loads
(s
)
400 return xmlrpclib
.dumps((obj
,), None, None, None, 1).encode('utf8')
403 (obj
,), method
= xmlrpclib
.loads(s
.decode('utf8'))
406 class XmlListener(Listener
):
410 obj
= Listener
.accept(self
)
411 return ConnectionWrapper(obj
, _xml_dumps
, _xml_loads
)
413 def XmlClient(*args
, **kwds
):
416 return ConnectionWrapper(Client(*args
, **kwds
), _xml_dumps
, _xml_loads
)