typo error
[pp3.git] / pptransport.py
blob2098e74b58161554b4a8584ca3b6cdd4c156f875
1 # -*- coding: utf-8 -*-
2 # Copyright (c) 2009 Ondřej Súkup
3 # Parallel Python Software: http://www.parallelpython.com
4 # Copyright (c) 2005-2009, Vitalii Vanovschi
5 # All rights reserved.
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
8 # * Redistributions of source code must retain the above copyright notice,
9 # this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above copyright
11 # notice, this list of conditions and the following disclaimer in the
12 # documentation and/or other materials provided with the distribution.
13 # * Neither the name of the author nor the names of its contributors
14 # may be used to endorse or promote products derived from this software
15 # without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
27 # THE POSSIBILITY OF SUCH DAMAGE.
29 import os
30 import struct
31 import socket
32 import logging
34 copyright = "Copyright (c) 2009 Ondřej Súkup /n \
35 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
36 version = "3.0.0"
38 # compartibility with Python 2.6
39 try:
40 import hashlib
41 sha_new = hashlib.sha1
42 md5_new = hashlib.md5
43 except ImportError:
44 import sha
45 import md5
46 sha_new = sha.new
47 md5_new = md5.new
50 class Transport(object):
52 def send(self, msg):
53 raise NotImplemented("abstact function 'send' must be implemented "\
54 "in a subclass")
56 def receive(self, preprocess=None):
57 raise NotImplemented("abstact function 'receive' must be implemented "\
58 "in a subclass")
60 def authenticate(self, secret):
61 remote_version = self.receive()
62 if version != remote_version:
63 logging.error("PP version mismatch (local: pp-%s, remote: pp-%s)"
64 % (version, remote_version))
65 logging.error("Please install the same version of PP on all nodes")
66 return False
67 srandom = self.receive()
68 answer = sha_new(srandom+secret).hexdigest()
69 self.send(answer)
70 response = self.receive()
71 if response == "OK":
72 return True
73 else:
74 return False
76 def close(self):
77 pass
79 def _connect(self, host, port):
80 pass
83 class CTransport(Transport):
84 """Cached transport
85 """
86 rcache = {}
88 def hash(self, msg):
89 return md5_new(msg).hexdigest()
91 def csend(self, msg):
92 hash1 = self.hash(msg)
93 if hash1 in self.scache:
94 self.send("H" + hash1)
95 else:
96 self.send("N" + msg)
97 self.scache[hash1] = True
99 def creceive(self, preprocess=None):
100 msg = self.receive()
101 if msg[0] == 'H':
102 hash1 = msg[1:]
103 else:
104 msg = msg[1:]
105 hash1 = self.hash(msg)
106 self.rcache[hash1] = map(preprocess, (msg, ))[0]
107 return self.rcache[hash1]
110 class PipeTransport(Transport):
112 def __init__(self, r, w):
113 self.scache = {}
114 self.exiting = False
115 if isinstance(r, file) and isinstance(w, file):
116 self.r = r
117 self.w = w
118 else:
119 raise TypeError("Both arguments of PipeTransport constructor " \
120 "must be file objects")
122 def send(self, msg):
123 self.w.write(struct.pack("!Q", len(msg)))
124 self.w.flush()
125 self.w.write(msg)
126 self.w.flush()
128 def receive(self, preprocess=None):
129 size_packed = self.r.read(struct.calcsize("!Q"))
130 msg_len = struct.unpack("!Q", size_packed)[0]
131 msg = self.r.read(msg_len)
132 return map(preprocess, (msg, ))[0]
134 def close(self):
135 self.w.close()
136 self.r.close()
139 class SocketTransport(Transport):
141 def __init__(self, socket1=None):
142 if socket1:
143 self.socket = socket1
144 else:
145 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
146 self.scache = {}
148 def send(self, data):
149 size = struct.pack("!Q", len(data))
150 t_size = struct.calcsize("!Q")
151 s_size = 0
152 while s_size < t_size:
153 p_size = self.socket.send(size[s_size:])
154 if p_size == 0:
155 raise RuntimeError("Socket connection is broken")
156 s_size += p_size
158 t_size = len(data)
159 s_size = 0
160 while s_size < t_size:
161 p_size = self.socket.send(data[s_size:])
162 if p_size == 0:
163 raise RuntimeError("Socket connection is broken")
164 s_size += p_size
166 def receive(self, preprocess=None):
167 e_size = struct.calcsize("!Q")
168 r_size = 0
169 data = ""
170 while r_size < e_size:
171 msg = self.socket.recv(e_size-r_size)
172 if msg == "":
173 raise RuntimeError("Socket connection is broken")
174 r_size += len(msg)
175 data += msg
176 e_size = struct.unpack("!Q", data)[0]
178 r_size = 0
179 data = ""
180 while r_size < e_size:
181 msg = self.socket.recv(e_size-r_size)
182 if msg == "":
183 raise RuntimeError("Socket connection is broken")
184 r_size += len(msg)
185 data += msg
186 return data
188 def close(self):
189 self.socket.close()
191 def _connect(self, host, port):
192 self.socket.connect((host, port))
195 class CPipeTransport(PipeTransport, CTransport):
196 pass
199 class CSocketTransport(SocketTransport, CTransport):
200 pass
202 # Parallel Python Software: http://www.parallelpython.com