App Engine Python SDK version 1.7.4 (2)
[gae.git] / python / google / appengine / api / remote_socket / _remote_socket_stub.py
blob9c070fb7a9884a157b3bab66e4e95285798a5c26
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub version of the Remote Socket API.
19 A stub version of the Remote Socket API for the dev_appserver.
20 """
22 from __future__ import with_statement
25 import errno
26 import os
27 import select
28 import socket
29 import threading
30 import time
31 import uuid
33 from google.appengine.api import apiproxy_stub
34 from google.appengine.api.remote_socket import _remote_socket_addr
35 from google.appengine.api.remote_socket import remote_socket_service_pb
36 from google.appengine.runtime import apiproxy_errors
39 def TranslateSystemErrors(method):
40 """Decorator to catch and translate socket.error to ApplicationError.
42 Args:
43 method: An unbound method of APIProxyStub or a subclass.
45 Returns:
46 The method, altered such it catches socket.error, socket.timeout and
47 socket.gaierror and re-raises the required apiproxy_errors.ApplicationError.
48 """
50 def WrappedMethod(self, *args, **kwargs):
51 try:
52 return method(self, *args, **kwargs)
53 except socket.gaierror, e:
54 raise apiproxy_errors.ApplicationError(
55 remote_socket_service_pb.RemoteSocketServiceError.GAI_ERROR,
56 'system_error:%u error_detail:"%s"' % (e.errno, e.strerror))
57 except socket.timeout, e:
58 raise apiproxy_errors.ApplicationError(
59 remote_socket_service_pb.RemoteSocketServiceError.SYSTEM_ERROR,
60 'system_error:%u error_detail:"%s"' % (errno.EAGAIN,
61 os.strerror(errno.EAGAIN)))
62 except socket.error, e:
63 raise apiproxy_errors.ApplicationError(
64 remote_socket_service_pb.RemoteSocketServiceError.SYSTEM_ERROR,
65 'system_error:%u error_detail:"%s"' % (e.errno, e.strerror))
67 return WrappedMethod
70 class SocketState(object):
71 def __init__(self, family, protocol, sock, last_accessed_time):
72 self.family = family
73 self.protocol = protocol
74 self.sock = sock
77 self.last_accessed_time = last_accessed_time
80 self.mutex = threading.RLock()
81 self.timeout = None
82 self.stream_offset = 0
84 def SetTimeout(self, timeout):
85 if timeout < 0:
86 timeout = None
87 if self.timeout != timeout:
88 self.sock.settimeout(timeout)
89 self.timeout = timeout
92 class RemoteSocketServiceStub(apiproxy_stub.APIProxyStub):
93 """Stub implementation of the Remote Socket API."""
95 _AF_MAP = {
96 socket.AF_INET: remote_socket_service_pb.CreateSocketRequest.IPv4,
97 socket.AF_INET6: remote_socket_service_pb.CreateSocketRequest.IPv6,
102 _TRANSLATED_AF_MAP = {
103 socket.AF_INET: _remote_socket_addr.AF_INET,
104 socket.AF_INET6: _remote_socket_addr.AF_INET6,
107 _HOW_MAP = {
108 remote_socket_service_pb.ShutDownRequest.SOCKET_SHUT_RD: socket.SHUT_RD,
109 remote_socket_service_pb.ShutDownRequest.SOCKET_SHUT_WR: socket.SHUT_WR,
110 remote_socket_service_pb.ShutDownRequest.SOCKET_SHUT_RDWR: (
111 socket.SHUT_RDWR),
114 def __init__(self, service_name='remote_socket', get_time=time.time):
115 """Initializer.
117 Args:
118 log: where to log messages
119 service_name: service name expected for all calls
120 get_time: Used for testing. Function that works like time.time().
122 super(RemoteSocketServiceStub, self).__init__(service_name)
123 self._descriptor_to_socket_state = {}
124 self._time = get_time
126 def _LookupSocket(self, descriptor):
127 with self._mutex:
128 val = self._descriptor_to_socket_state.get(descriptor)
129 if not val:
130 raise apiproxy_errors.ApplicationError(
131 remote_socket_service_pb.RemoteSocketServiceError.SOCKET_CLOSED)
133 now = self._time()
134 if val.last_accessed_time < now - 120:
135 del self._descriptor_to_socket_state[descriptor]
136 raise apiproxy_errors.ApplicationError(
137 remote_socket_service_pb.RemoteSocketServiceError.SOCKET_CLOSED)
139 val.last_accessed_time = now
140 return val
142 def _AddressPortTupleFromProto(self, family, ap_proto):
143 """Converts an AddressPort proto into a python (addrstr, port) tuple."""
144 try:
145 addr = _remote_socket_addr.inet_ntop(
146 self._TRANSLATED_AF_MAP[family], ap_proto.packed_address())
147 except ValueError:
148 raise apiproxy_errors.ApplicationError(
149 remote_socket_service_pb.RemoteSocketServiceError.INVALID_REQUEST,
150 'Invalid Address.')
151 return (addr, ap_proto.port())
153 def _AddressPortTupleToProto(self, family, ap_tuple, ap_proto):
154 """Converts a python (addrstr, port) tuple into an AddressPort proto."""
155 ap_proto.set_packed_address(
156 _remote_socket_addr.inet_pton(
157 self._TRANSLATED_AF_MAP[family], ap_tuple[0]))
158 ap_proto.set_port(ap_tuple[1])
160 def _BindAllowed(self, addr, port):
161 if addr in ('0.0.0.0', '::') and port == 0:
162 return True
163 return False
165 @TranslateSystemErrors
166 def _Dynamic_CreateSocket(self, request, response):
167 family = socket.AF_INET
168 if request.family() == remote_socket_service_pb.CreateSocketRequest.IPv6:
169 family = socket.AF_INET6
170 protocol = socket.SOCK_STREAM
171 if request.protocol() == remote_socket_service_pb.CreateSocketRequest.UDP:
172 protocol = socket.SOCK_DGRAM
173 sock = socket.socket(family, protocol)
174 if request.has_proxy_external_ip():
175 addr, port = self._AddressPortTupleFromProto(
176 family, request.proxy_external_ip())
177 if not self._BindAllowed(addr, port):
178 raise apiproxy_errors.ApplicationError(
179 remote_socket_service_pb.RemoteSocketServiceError.PERMISSION_DENIED,
180 'Attempt to bind port without permission.')
181 sock.bind((addr, port))
182 if request.has_remote_ip():
183 sock.connect(self._AddressPortTupleFromProto(family, request.remote_ip()))
185 descriptor = str(uuid.uuid1())
186 state = SocketState(family, protocol, sock, self._time())
187 with self._mutex:
188 self._descriptor_to_socket_state[descriptor] = state
190 response.set_socket_descriptor(descriptor)
191 if request.has_proxy_external_ip() or request.has_remote_ip():
192 self._AddressPortTupleToProto(family, sock.getsockname(),
193 response.mutable_proxy_external_ip())
195 @TranslateSystemErrors
196 def _Dynamic_Bind(self, request, response):
197 state = self._LookupSocket(request.socket_descriptor())
198 addr, port = self._AddressPortTupleFromProto(
199 state.family, request.proxy_external_ip())
200 if not self._BindAllowed(addr, port):
201 raise apiproxy_errors.ApplicationError(
202 remote_socket_service_pb.RemoteSocketServiceError.PERMISSION_DENIED,
203 'Attempt to bind port without permission.')
204 state.sock.bind((addr, port))
205 self._AddressPortTupleToProto(state.family, state.sock.getsockname(),
206 response.mutable_proxy_external_ip())
208 def _Dynamic_Listen(self, request, response):
209 raise NotImplementedError()
211 def _Dynamic_Accept(self, request, response):
212 raise NotImplementedError()
214 @TranslateSystemErrors
215 def _Dynamic_Connect(self, request, response):
216 state = self._LookupSocket(request.socket_descriptor())
217 with state.mutex:
218 state.SetTimeout(request.timeout_seconds())
219 state.sock.connect(
220 self._AddressPortTupleFromProto(state.family, request.remote_ip()))
222 @TranslateSystemErrors
223 def _Dynamic_GetSocketOptions(self, request, response):
224 state = self._LookupSocket(request.socket_descriptor())
225 for opt in request.options_list():
226 if (opt.level() ==
227 remote_socket_service_pb.SocketOption.SOCKET_SOL_SOCKET and
228 opt.option() ==
229 remote_socket_service_pb.SocketOption.SOCKET_SO_ERROR):
230 ret = response.add_options()
231 ret.set_level(opt.level())
232 ret.set_option(opt.option())
233 ret.set_value(
234 state.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR, 1024))
235 else:
236 raise apiproxy_errors.ApplicationError(
237 remote_socket_service_pb.RemoteSocketServiceError.INVALID_REQUEST,
238 'Invalid GetSocketOption level/option.')
240 def _Dynamic_SetSocketOptions(self, request, response):
241 raise NotImplementedError()
243 @TranslateSystemErrors
244 def _Dynamic_GetSocketName(self, request, response):
245 state = self._LookupSocket(request.socket_descriptor())
246 self._AddressPortTupleToProto(state.family,
247 state.sock.getsockname(),
248 response.mutable_proxy_external_ip())
250 @TranslateSystemErrors
251 def _Dynamic_GetPeerName(self, request, response):
252 state = self._LookupSocket(request.socket_descriptor())
253 self._AddressPortTupleToProto(state.family,
254 state.sock.getpeername(),
255 response.mutable_peer_ip())
257 @TranslateSystemErrors
258 def _Dynamic_Send(self, request, response):
259 state = self._LookupSocket(request.socket_descriptor())
260 with state.mutex:
261 state.SetTimeout(request.timeout_seconds())
262 if state.protocol == socket.SOCK_STREAM:
263 if request.stream_offset() != state.stream_offset:
264 raise apiproxy_errors.ApplicationError(
265 remote_socket_service_pb.RemoteSocketServiceError.INVALID_REQUEST,
266 'Invalid stream_offset.')
267 flags = request.flags()
268 if flags != 0:
269 raise apiproxy_errors.ApplicationError(
270 remote_socket_service_pb.RemoteSocketServiceError.INVALID_REQUEST,
271 'Invalid flags.')
272 if request.has_send_to():
273 data_sent = state.sock.sendto(
274 request.data(),
275 flags,
276 self._AddressPortTupleFromProto(state.family, request.send_to()))
277 else:
278 data_sent = state.sock.send(request.data(), flags)
279 response.set_data_sent(data_sent)
280 state.stream_offset += data_sent
282 @TranslateSystemErrors
283 def _Dynamic_Receive(self, request, response):
284 state = self._LookupSocket(request.socket_descriptor())
285 with state.mutex:
286 state.SetTimeout(request.timeout_seconds())
287 flags = 0
288 if request.flags() & remote_socket_service_pb.ReceiveRequest.MSG_PEEK:
289 flags |= socket.MSG_PEEK
290 received_from = None
291 if state.protocol == socket.SOCK_DGRAM:
292 data, received_from = state.sock.recvfrom(request.data_size(), flags)
293 else:
294 data = state.sock.recv(request.data_size(), flags)
295 response.set_data(data)
296 if received_from:
297 self._AddressPortTupleToProto(state.family, received_from,
298 response.mutable_received_from())
300 @TranslateSystemErrors
301 def _Dynamic_ShutDown(self, request, response):
302 state = self._LookupSocket(request.socket_descriptor())
303 state.sock.shutdown(self._HOW_MAP[request.how()])
305 @TranslateSystemErrors
306 def _Dynamic_Close(self, request, response):
307 with self._mutex:
308 try:
309 state = self._LookupSocket(request.socket_descriptor())
310 except apiproxy_errors.ApplicationError:
311 return
312 state.sock.close()
313 del self._descriptor_to_socket_state[request.socket_descriptor()]
315 @TranslateSystemErrors
316 def _Dynamic_Resolve(self, request, response):
317 addrs = {}
318 for family, _, _, canonname, sa in socket.getaddrinfo(
319 request.name(), 0, 0, socket.SOCK_STREAM, 0, socket.AI_CANONNAME):
320 addrs.setdefault(self._AF_MAP.get(family), set()).add(
321 _remote_socket_addr.inet_pton(self._TRANSLATED_AF_MAP[family], sa[0]))
322 response.set_canonical_name(canonname)
328 if canonname and canonname.lower() != request.name().lower():
329 if not response.aliases_size():
330 response.add_aliases(request.name())
331 for af in request.address_families_list():
332 for packed_addr in addrs.get(af, set()):
333 response.add_packed_address(packed_addr)
335 @TranslateSystemErrors
336 def _Dynamic_Poll(self, request, response):
337 timeout = request.timeout_seconds()
338 if timeout < 0:
339 timeout = None
340 rfds, wfds, efds = [], [], []
341 sock_map = {}
342 for e in request.events_list():
343 state = self._LookupSocket(e.socket_descriptor())
344 events = e.requested_events()
345 if events & ~(remote_socket_service_pb.PollEvent.SOCKET_POLLIN|
346 remote_socket_service_pb.PollEvent.SOCKET_POLLOUT):
347 raise apiproxy_errors.ApplicationError(
348 remote_socket_service_pb.RemoteSocketServiceError.INVALID_REQUEST,
349 'Invalid requested_events.')
350 if events & remote_socket_service_pb.PollEvent.SOCKET_POLLIN:
351 rfds.append(state.sock)
352 if events & remote_socket_service_pb.PollEvent.SOCKET_POLLOUT:
353 wfds.append(state.sock)
354 o = response.add_events()
355 o.set_socket_descriptor(e.socket_descriptor())
356 o.set_requested_events(e.requested_events())
357 o.set_observed_events(0)
358 sock_map.setdefault(state.sock, []).append(o)
359 rfds, wfds, _ = select.select(rfds, wfds, efds, timeout)
360 for sock in rfds:
361 for o in sock_map[sock]:
362 o.set_observed_events(
363 o.observed_events()|
364 remote_socket_service_pb.PollEvent.SOCKET_POLLIN)
365 for sock in wfds:
366 for o in sock_map[sock]:
367 o.set_observed_events(
368 o.observed_events()|
369 remote_socket_service_pb.PollEvent.SOCKET_POLLOUT)