App Engine Python SDK version 1.8.1
[gae.git] / python / google / appengine / api / channel / channel_service_stub.py
blob2e774035936532541d4a9909dc8edcda4765d071
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Stub version of the Channel API, queues messages and writes them to a log."""
30 import logging
31 import random
32 import time
34 from google.appengine.api import apiproxy_stub
35 from google.appengine.api.channel import channel_service_pb
36 from google.appengine.runtime import apiproxy_errors
39 class Error(Exception):
40 pass
43 class InvalidTokenError(Error):
44 """A stub method was called with a syntactically invalid token."""
45 pass
48 class TokenTimedOutError(Error):
49 """A stub method was called with a token that has expired or never existed."""
50 pass
53 class ChannelServiceStub(apiproxy_stub.APIProxyStub):
54 """Python only channel service stub.
56 This stub does not use a browser channel to push messages to a client.
57 Instead it queues messages internally.
58 """
60 THREADSAFE = True
65 CHANNEL_TIMEOUT_SECONDS = 2
70 XMPP_PUBLIC_IP = '0.1.0.10'
73 CHANNEL_TOKEN_DEFAULT_DURATION = 120
76 CHANNEL_TOKEN_IDENTIFIER = 'channel'
78 def __init__(self, log=logging.debug, service_name='channel',
79 time_func=time.time, request_data=None):
80 """Initializer.
82 Args:
83 log: A logger, used for dependency injection.
84 service_name: Service name expected for all calls.
85 time_func: function to get the current time in seconds.
86 request_data: A request_info.RequestInfo instance. If None, a
87 request_info._LocalRequestInfo instance will be used.
88 """
89 apiproxy_stub.APIProxyStub.__init__(self, service_name,
90 request_data=request_data)
91 self._log = log
92 self._time_func = time_func
96 self._connected_channel_messages = {}
99 def _Dynamic_CreateChannel(self, request, response):
100 """Implementation of channel.create_channel.
102 Args:
103 request: A ChannelServiceRequest.
104 response: A ChannelServiceResponse
107 client_id = request.application_key()
108 if not client_id:
109 raise apiproxy_errors.ApplicationError(
110 channel_service_pb.ChannelServiceError.INVALID_CHANNEL_KEY)
112 if request.has_duration_minutes():
113 duration = request.duration_minutes()
114 else:
115 duration = ChannelServiceStub.CHANNEL_TOKEN_DEFAULT_DURATION
118 expiration_sec = long(self._time_func() + duration * 60) + 1
120 token = '-'.join([ChannelServiceStub.CHANNEL_TOKEN_IDENTIFIER,
121 str(random.randint(0, 2 ** 32)),
122 str(expiration_sec),
123 client_id])
125 self._log('Creating channel token %s with client id %s and duration %s',
126 token, request.application_key(), duration)
128 response.set_token(token)
131 @apiproxy_stub.Synchronized
132 def _Dynamic_SendChannelMessage(self, request, response):
133 """Implementation of channel.send_message.
135 Queues a message to be retrieved by the client when it polls.
137 Args:
138 request: A SendMessageRequest.
139 response: A VoidProto.
143 client_id = request.application_key()
145 if not request.message():
146 raise apiproxy_errors.ApplicationError(
147 channel_service_pb.ChannelServiceError.BAD_MESSAGE)
149 if client_id in self._connected_channel_messages:
150 self._log('Sending a message (%s) to channel with key (%s)',
151 request.message(), client_id)
152 self._connected_channel_messages[client_id].append(request.message())
153 else:
154 self._log('SKIPPING message (%s) to channel with key (%s): '
155 'no clients connected',
156 request.message(), client_id)
158 def client_id_from_token(self, token):
159 """Returns the client id from a given token.
161 Args:
162 token: A string representing an instance of a client connection to a
163 client id, returned by CreateChannel.
165 Returns:
166 A string representing the client id used to create this token,
167 or None if this token is incorrectly formed and doesn't map to a
168 client id.
170 pieces = token.split('-', 3)
171 if len(pieces) == 4:
172 return pieces[3]
173 else:
174 return None
177 def check_token_validity(self, token):
178 """Checks if a token is well-formed and its expiration status.
180 Args:
181 token: a token returned by CreateChannel.
183 Returns:
184 A tuple (syntax_valid, time_valid) where syntax_valid is true if the
185 token is well-formed and time_valid is true if the token is not expired.
186 In other words, a usable token will return (true, true).
188 pieces = token.split('-', 3)
189 if len(pieces) != 4:
190 return False, False
192 (constant_identifier, token_id, expiration_sec, clientid) = pieces
193 syntax_valid = (
194 constant_identifier == ChannelServiceStub.CHANNEL_TOKEN_IDENTIFIER
195 and expiration_sec.isdigit())
196 time_valid = syntax_valid and long(expiration_sec) > self._time_func()
197 return (syntax_valid, time_valid)
199 @apiproxy_stub.Synchronized
200 def get_channel_messages(self, token):
201 """Returns the pending messages for a given channel.
203 Args:
204 token: A string representing the channel. Note that this is the token
205 returned by CreateChannel, not the client id.
207 Returns:
208 List of messages, or None if the channel doesn't exist. The messages are
209 strings.
211 self._log('Received request for messages for channel: ' + token)
212 client_id = self.client_id_from_token(token)
213 if client_id in self._connected_channel_messages:
214 return self._connected_channel_messages[client_id]
216 return None
218 @apiproxy_stub.Synchronized
219 def has_channel_messages(self, token):
220 """Checks to see if the given channel has any pending messages.
222 Args:
223 token: A string representing the channel. Note that this is the token
224 returned by CreateChannel, not the client id.
226 Returns:
227 True if the channel exists and has pending messages.
229 client_id = self.client_id_from_token(token)
230 has_messages = (client_id in self._connected_channel_messages and
231 bool(self._connected_channel_messages[client_id]))
232 self._log('Checking for messages on channel (%s) (%s)',
233 token, has_messages)
234 return has_messages
236 @apiproxy_stub.Synchronized
237 def pop_first_message(self, token):
238 """Returns and clears the first message from the message queue.
240 Args:
241 token: A string representing the channel. Note that this is the token
242 returned by CreateChannel, not the client id.
244 Returns:
245 The first message in the queue (a string), or None if no messages.
247 if self.has_channel_messages(token):
248 client_id = self.client_id_from_token(token)
249 self._log('Popping first message of queue for channel (%s)', token)
250 return self._connected_channel_messages[client_id].pop(0)
252 return None
254 @apiproxy_stub.Synchronized
255 def clear_channel_messages(self, token):
256 """Clears all messages from the channel.
258 Args:
259 token: A string representing the channel. Note that this is the token
260 returned by CreateChannel, not the client id.
262 client_id = self.client_id_from_token(token)
263 if client_id:
264 self._log('Clearing messages on channel (' + client_id + ')')
265 if client_id in self._connected_channel_messages:
266 self._connected_channel_messages[client_id] = []
267 else:
268 self._log('Ignoring clear messages for nonexistent token (' +
269 token + ')')
271 def add_connect_event(self, client_id):
272 """Tell the application that the client has connected."""
273 self.request_data.get_dispatcher().add_async_request(
274 'POST', '/_ah/channel/connected/',
275 [('Content-Type', 'application/x-www-form-urlencoded')],
276 'from=%s' % client_id,
277 ChannelServiceStub.XMPP_PUBLIC_IP)
279 @apiproxy_stub.Synchronized
280 def disconnect_channel_event(self, client_id):
281 """Removes the channel from the list of connected channels."""
282 self._log('Removing channel %s', client_id)
283 if client_id in self._connected_channel_messages:
284 del self._connected_channel_messages[client_id]
285 self.request_data.get_dispatcher().add_async_request(
286 'POST', '/_ah/channel/disconnected/',
287 [('Content-Type', 'application/x-www-form-urlencoded')],
288 'from=%s' % client_id,
289 ChannelServiceStub.XMPP_PUBLIC_IP)
291 def add_disconnect_event(self, client_id):
292 """Add an event to notify the app if a client has disconnected.
294 Args:
295 client_id: A client ID used for a particular channel.
297 timeout = self._time_func() + ChannelServiceStub.CHANNEL_TIMEOUT_SECONDS
300 def DefineDisconnectCallback(client_id):
301 return lambda: self.disconnect_channel_event(client_id)
304 self.request_data.get_dispatcher().add_event(
305 DefineDisconnectCallback(client_id), timeout, 'channel-disconnect',
306 client_id)
308 @apiproxy_stub.Synchronized
309 def connect_channel(self, token):
310 """Marks the channel identified by the token (token) as connected.
312 If the channel has not yet been connected, this triggers a connection event
313 to let the application know that the channel has been connected to.
315 If the channel has already been connected, this refreshes the channel's
316 timeout so that it will not disconnect. This should be done at regular
317 intervals to avoid automatic disconnection.
319 Args:
320 token: A string representing the channel. Note that this is the token
321 returned by CreateChannel, not the client id.
323 Raises:
324 InvalidTokenError: The token is syntactically invalid.
325 TokenTimedOutError: The token expired or does not exist.
327 syntax_valid, time_valid = self.check_token_validity(token)
328 if not syntax_valid:
329 raise InvalidTokenError()
330 elif not time_valid:
331 raise TokenTimedOutError()
333 client_id = self.client_id_from_token(token)
337 if client_id in self._connected_channel_messages:
338 timeout = self._time_func() + ChannelServiceStub.CHANNEL_TIMEOUT_SECONDS
340 self.request_data.get_dispatcher().update_event(
341 timeout, 'channel-disconnect', client_id)
342 return
346 self._connected_channel_messages[client_id] = []
347 self.add_connect_event(client_id)
348 self.add_disconnect_event(client_id)
350 @apiproxy_stub.Synchronized
351 def connect_and_pop_first_message(self, token):
352 """Atomically performs a connect_channel and a pop_first_message.
354 This is designed to be called after the channel has already been connected,
355 so that it refreshes the channel's timeout, and retrieves a message, in a
356 single atomic operation.
358 Args:
359 token: A string representing the channel. Note that this is the token
360 returned by CreateChannel, not the client id.
362 Returns:
363 The first message in the queue (a string), or None if no messages.
365 Raises:
366 InvalidTokenError: The token is syntactically invalid.
367 TokenTimedOutError: The token expired or does not exist.
369 self.connect_channel(token)
370 return self.pop_first_message(token)