Change a variable type to avoid signed overflow; replace repeated '19999' constant...
[python.git] / Lib / multiprocessing / connection.py
blobb1f5acd05e5938dea7c433f5743d0196baf314c1
2 # A higher level module for using sockets (or Windows named pipes)
4 # multiprocessing/connection.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__ = [ 'Client', 'Listener', 'Pipe' ]
11 import os
12 import sys
13 import socket
14 import errno
15 import time
16 import tempfile
17 import itertools
19 import _multiprocessing
20 from multiprocessing import current_process, AuthenticationError
21 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
22 from multiprocessing.forking import duplicate, close
29 BUFSIZE = 8192
30 # A very generous timeout when it comes to local connections...
31 CONNECTION_TIMEOUT = 20.
33 _mmap_counter = itertools.count()
35 default_family = 'AF_INET'
36 families = ['AF_INET']
38 if hasattr(socket, 'AF_UNIX'):
39 default_family = 'AF_UNIX'
40 families += ['AF_UNIX']
42 if sys.platform == 'win32':
43 default_family = 'AF_PIPE'
44 families += ['AF_PIPE']
47 def _init_timeout(timeout=CONNECTION_TIMEOUT):
48 return time.time() + timeout
50 def _check_timeout(t):
51 return time.time() > t
57 def arbitrary_address(family):
58 '''
59 Return an arbitrary free address for the given family
60 '''
61 if family == 'AF_INET':
62 return ('localhost', 0)
63 elif family == 'AF_UNIX':
64 return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
65 elif family == 'AF_PIPE':
66 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
67 (os.getpid(), _mmap_counter.next()))
68 else:
69 raise ValueError('unrecognized family')
72 def address_type(address):
73 '''
74 Return the types of the address
76 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
77 '''
78 if type(address) == tuple:
79 return 'AF_INET'
80 elif type(address) is str and address.startswith('\\\\'):
81 return 'AF_PIPE'
82 elif type(address) is str:
83 return 'AF_UNIX'
84 else:
85 raise ValueError('address type of %r unrecognized' % address)
88 # Public functions
91 class Listener(object):
92 '''
93 Returns a listener object.
95 This is a wrapper for a bound socket which is 'listening' for
96 connections, or for a Windows named pipe.
97 '''
98 def __init__(self, address=None, family=None, backlog=1, authkey=None):
99 family = family or (address and address_type(address)) \
100 or default_family
101 address = address or arbitrary_address(family)
103 if family == 'AF_PIPE':
104 self._listener = PipeListener(address, backlog)
105 else:
106 self._listener = SocketListener(address, family, backlog)
108 if authkey is not None and not isinstance(authkey, bytes):
109 raise TypeError, 'authkey should be a byte string'
111 self._authkey = authkey
113 def accept(self):
115 Accept a connection on the bound socket or named pipe of `self`.
117 Returns a `Connection` object.
119 c = self._listener.accept()
120 if self._authkey:
121 deliver_challenge(c, self._authkey)
122 answer_challenge(c, self._authkey)
123 return c
125 def close(self):
127 Close the bound socket or named pipe of `self`.
129 return self._listener.close()
131 address = property(lambda self: self._listener._address)
132 last_accepted = property(lambda self: self._listener._last_accepted)
135 def Client(address, family=None, authkey=None):
137 Returns a connection to the address of a `Listener`
139 family = family or address_type(address)
140 if family == 'AF_PIPE':
141 c = PipeClient(address)
142 else:
143 c = SocketClient(address)
145 if authkey is not None and not isinstance(authkey, bytes):
146 raise TypeError, 'authkey should be a byte string'
148 if authkey is not None:
149 answer_challenge(c, authkey)
150 deliver_challenge(c, authkey)
152 return c
155 if sys.platform != 'win32':
157 def Pipe(duplex=True):
159 Returns pair of connection objects at either end of a pipe
161 if duplex:
162 s1, s2 = socket.socketpair()
163 c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
164 c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
165 s1.close()
166 s2.close()
167 else:
168 fd1, fd2 = os.pipe()
169 c1 = _multiprocessing.Connection(fd1, writable=False)
170 c2 = _multiprocessing.Connection(fd2, readable=False)
172 return c1, c2
174 else:
176 from ._multiprocessing import win32
178 def Pipe(duplex=True):
180 Returns pair of connection objects at either end of a pipe
182 address = arbitrary_address('AF_PIPE')
183 if duplex:
184 openmode = win32.PIPE_ACCESS_DUPLEX
185 access = win32.GENERIC_READ | win32.GENERIC_WRITE
186 obsize, ibsize = BUFSIZE, BUFSIZE
187 else:
188 openmode = win32.PIPE_ACCESS_INBOUND
189 access = win32.GENERIC_WRITE
190 obsize, ibsize = 0, BUFSIZE
192 h1 = win32.CreateNamedPipe(
193 address, openmode,
194 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
195 win32.PIPE_WAIT,
196 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
198 h2 = win32.CreateFile(
199 address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
201 win32.SetNamedPipeHandleState(
202 h2, win32.PIPE_READMODE_MESSAGE, None, None
205 try:
206 win32.ConnectNamedPipe(h1, win32.NULL)
207 except WindowsError, e:
208 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
209 raise
211 c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
212 c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
214 return c1, c2
217 # Definitions for connections based on sockets
220 class SocketListener(object):
222 Representation of a socket which is bound to an address and listening
224 def __init__(self, address, family, backlog=1):
225 self._socket = socket.socket(getattr(socket, family))
226 self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227 self._socket.bind(address)
228 self._socket.listen(backlog)
229 self._address = self._socket.getsockname()
230 self._family = family
231 self._last_accepted = None
233 if family == 'AF_UNIX':
234 self._unlink = Finalize(
235 self, os.unlink, args=(address,), exitpriority=0
237 else:
238 self._unlink = None
240 def accept(self):
241 s, self._last_accepted = self._socket.accept()
242 fd = duplicate(s.fileno())
243 conn = _multiprocessing.Connection(fd)
244 s.close()
245 return conn
247 def close(self):
248 self._socket.close()
249 if self._unlink is not None:
250 self._unlink()
253 def SocketClient(address):
255 Return a connection object connected to the socket given by `address`
257 family = address_type(address)
258 s = socket.socket( getattr(socket, family) )
259 t = _init_timeout()
261 while 1:
262 try:
263 s.connect(address)
264 except socket.error, e:
265 if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
266 debug('failed to connect to address %s', address)
267 raise
268 time.sleep(0.01)
269 else:
270 break
271 else:
272 raise
274 fd = duplicate(s.fileno())
275 conn = _multiprocessing.Connection(fd)
276 s.close()
277 return conn
280 # Definitions for connections based on named pipes
283 if sys.platform == 'win32':
285 class PipeListener(object):
287 Representation of a named pipe
289 def __init__(self, address, backlog=None):
290 self._address = address
291 handle = win32.CreateNamedPipe(
292 address, win32.PIPE_ACCESS_DUPLEX,
293 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
294 win32.PIPE_WAIT,
295 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
296 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
298 self._handle_queue = [handle]
299 self._last_accepted = None
301 sub_debug('listener created with address=%r', self._address)
303 self.close = Finalize(
304 self, PipeListener._finalize_pipe_listener,
305 args=(self._handle_queue, self._address), exitpriority=0
308 def accept(self):
309 newhandle = win32.CreateNamedPipe(
310 self._address, win32.PIPE_ACCESS_DUPLEX,
311 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
312 win32.PIPE_WAIT,
313 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
314 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
316 self._handle_queue.append(newhandle)
317 handle = self._handle_queue.pop(0)
318 try:
319 win32.ConnectNamedPipe(handle, win32.NULL)
320 except WindowsError, e:
321 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
322 raise
323 return _multiprocessing.PipeConnection(handle)
325 @staticmethod
326 def _finalize_pipe_listener(queue, address):
327 sub_debug('closing listener with address=%r', address)
328 for handle in queue:
329 close(handle)
331 def PipeClient(address):
333 Return a connection object connected to the pipe given by `address`
335 t = _init_timeout()
336 while 1:
337 try:
338 win32.WaitNamedPipe(address, 1000)
339 h = win32.CreateFile(
340 address, win32.GENERIC_READ | win32.GENERIC_WRITE,
341 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
343 except WindowsError, e:
344 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
345 win32.ERROR_PIPE_BUSY) or _check_timeout(t):
346 raise
347 else:
348 break
349 else:
350 raise
352 win32.SetNamedPipeHandleState(
353 h, win32.PIPE_READMODE_MESSAGE, None, None
355 return _multiprocessing.PipeConnection(h)
358 # Authentication stuff
361 MESSAGE_LENGTH = 20
363 CHALLENGE = b'#CHALLENGE#'
364 WELCOME = b'#WELCOME#'
365 FAILURE = b'#FAILURE#'
367 def deliver_challenge(connection, authkey):
368 import hmac
369 assert isinstance(authkey, bytes)
370 message = os.urandom(MESSAGE_LENGTH)
371 connection.send_bytes(CHALLENGE + message)
372 digest = hmac.new(authkey, message).digest()
373 response = connection.recv_bytes(256) # reject large message
374 if response == digest:
375 connection.send_bytes(WELCOME)
376 else:
377 connection.send_bytes(FAILURE)
378 raise AuthenticationError('digest received was wrong')
380 def answer_challenge(connection, authkey):
381 import hmac
382 assert isinstance(authkey, bytes)
383 message = connection.recv_bytes(256) # reject large message
384 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
385 message = message[len(CHALLENGE):]
386 digest = hmac.new(authkey, message).digest()
387 connection.send_bytes(digest)
388 response = connection.recv_bytes(256) # reject large message
389 if response != WELCOME:
390 raise AuthenticationError('digest sent was rejected')
393 # Support for using xmlrpclib for serialization
396 class ConnectionWrapper(object):
397 def __init__(self, conn, dumps, loads):
398 self._conn = conn
399 self._dumps = dumps
400 self._loads = loads
401 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
402 obj = getattr(conn, attr)
403 setattr(self, attr, obj)
404 def send(self, obj):
405 s = self._dumps(obj)
406 self._conn.send_bytes(s)
407 def recv(self):
408 s = self._conn.recv_bytes()
409 return self._loads(s)
411 def _xml_dumps(obj):
412 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
414 def _xml_loads(s):
415 (obj,), method = xmlrpclib.loads(s.decode('utf8'))
416 return obj
418 class XmlListener(Listener):
419 def accept(self):
420 global xmlrpclib
421 import xmlrpclib
422 obj = Listener.accept(self)
423 return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
425 def XmlClient(*args, **kwds):
426 global xmlrpclib
427 import xmlrpclib
428 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)