1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009 Ondřej Súkup
3 # Parallel Python Software: http://www.parallelpython.com
4 # Copyright (c) 2005-2009, Vitalii Vanovschi
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
8 # * Redistributions of source code must retain the above copyright notice,
9 # this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the author nor the names of its contributors
14 # may be used to endorse or promote products derived from this software
15 # without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27 # THE POSSIBILITY OF SUCH DAMAGE.
34 copyright
= "Copyright (c) 2009 Ondřej Súkup /n \
35 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
38 # compartibility with Python 2.6
41 sha_new
= hashlib
.sha1
50 class Transport(object):
53 raise NotImplemented("abstact function 'send' must be implemented "\
56 def receive(self
, preprocess
=None):
57 raise NotImplemented("abstact function 'receive' must be implemented "\
60 def authenticate(self
, secret
):
61 remote_version
= self
.receive()
62 if version
!= remote_version
:
63 logging
.error("PP version mismatch (local: pp-%s, remote: pp-%s)"
64 % (version
, remote_version
))
65 logging
.error("Please install the same version of PP on all nodes")
67 srandom
= self
.receive()
68 answer
= sha_new(srandom
+secret
).hexdigest()
70 response
= self
.receive()
79 def _connect(self
, host
, port
):
83 class CTransport(Transport
):
89 return md5_new(msg
).hexdigest()
92 hash1
= self
.hash(msg
)
93 if hash1
in self
.scache
:
94 self
.send("H" + hash1
)
97 self
.scache
[hash1
] = True
99 def creceive(self
, preprocess
=None):
105 hash1
= self
.hash(msg
)
106 self
.rcache
[hash1
] = map(preprocess
, (msg
, ))[0]
107 return self
.rcache
[hash1
]
110 class PipeTransport(Transport
):
112 def __init__(self
, r
, w
):
115 if isinstance(r
, file) and isinstance(w
, file):
119 raise TypeError("Both arguments of PipeTransport constructor " \
120 "must be file objects")
123 self
.w
.write(struct
.pack("!Q", len(msg
)))
128 def receive(self
, preprocess
=None):
129 size_packed
= self
.r
.read(struct
.calcsize("!Q"))
130 msg_len
= struct
.unpack("!Q", size_packed
)[0]
131 msg
= self
.r
.read(msg_len
)
132 return map(preprocess
, (msg
, ))[0]
139 class SocketTransport(Transport
):
141 def __init__(self
, socket1
=None):
143 self
.socket
= socket1
145 self
.socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
148 def send(self
, data
):
149 size
= struct
.pack("!Q", len(data
))
150 t_size
= struct
.calcsize("!Q")
152 while s_size
< t_size
:
153 p_size
= self
.socket
.send(size
[s_size
:])
155 raise RuntimeError("Socket connection is broken")
160 while s_size
< t_size
:
161 p_size
= self
.socket
.send(data
[s_size
:])
163 raise RuntimeError("Socket connection is broken")
166 def receive(self
, preprocess
=None):
167 e_size
= struct
.calcsize("!Q")
170 while r_size
< e_size
:
171 msg
= self
.socket
.recv(e_size
-r_size
)
173 raise RuntimeError("Socket connection is broken")
176 e_size
= struct
.unpack("!Q", data
)[0]
180 while r_size
< e_size
:
181 msg
= self
.socket
.recv(e_size
-r_size
)
183 raise RuntimeError("Socket connection is broken")
191 def _connect(self
, host
, port
):
192 self
.socket
.connect((host
, port
))
195 class CPipeTransport(PipeTransport
, CTransport
):
199 class CSocketTransport(SocketTransport
, CTransport
):
202 # Parallel Python Software: http://www.parallelpython.com