App Engine Python SDK version 1.8.1
[gae.git] / python / google / appengine / api / memcache / memcache_stub.py
blobe07cf6dfae1903222767d6a8ed501491b4b5ada0
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Stub version of the memcache API, keeping all data in process memory."""
29 import logging
30 import time
32 from google.appengine.api import apiproxy_stub
33 from google.appengine.api import memcache
34 from google.appengine.api.memcache import memcache_service_pb
35 from google.appengine.runtime import apiproxy_errors
38 MemcacheSetResponse = memcache_service_pb.MemcacheSetResponse
39 MemcacheSetRequest = memcache_service_pb.MemcacheSetRequest
40 MemcacheIncrementRequest = memcache_service_pb.MemcacheIncrementRequest
41 MemcacheIncrementResponse = memcache_service_pb.MemcacheIncrementResponse
42 MemcacheDeleteResponse = memcache_service_pb.MemcacheDeleteResponse
45 MAX_REQUEST_SIZE = 32 << 20
48 class CacheEntry(object):
49 """An entry in the cache."""
51 def __init__(self, value, expiration, flags, cas_id, gettime):
52 """Initializer.
54 Args:
55 value: String containing the data for this entry.
56 expiration: Number containing the expiration time or offset in seconds
57 for this entry.
58 flags: Opaque flags used by the memcache implementation.
59 cas_id: Unique Compare-And-Set ID.
60 gettime: Used for testing. Function that works like time.time().
61 """
62 assert isinstance(value, basestring)
63 assert len(value) <= memcache.MAX_VALUE_SIZE
64 assert isinstance(expiration, (int, long))
66 self._gettime = gettime
67 self.value = value
68 self.flags = flags
69 self.cas_id = cas_id
70 self.created_time = self._gettime()
71 self.will_expire = expiration != 0
72 self.locked = False
73 self._SetExpiration(expiration)
75 def _SetExpiration(self, expiration):
76 """Sets the expiration for this entry.
78 Args:
79 expiration: Number containing the expiration time or offset in seconds
80 for this entry. If expiration is above one month, then it's considered
81 an absolute time since the UNIX epoch.
82 """
83 if expiration > (86400 * 30):
84 self.expiration_time = expiration
85 else:
86 self.expiration_time = self._gettime() + expiration
88 def CheckExpired(self):
89 """Returns True if this entry has expired; False otherwise."""
90 return self.will_expire and self._gettime() >= self.expiration_time
92 def ExpireAndLock(self, timeout):
93 """Marks this entry as deleted and locks it for the expiration time.
95 Used to implement memcache's delete timeout behavior.
97 Args:
98 timeout: Parameter originally passed to memcache.delete or
99 memcache.delete_multi to control deletion timeout.
101 self.will_expire = True
102 self.locked = True
103 self._SetExpiration(timeout)
105 def CheckLocked(self):
106 """Returns True if this entry was deleted but has not yet timed out."""
107 return self.locked and not self.CheckExpired()
110 class MemcacheServiceStub(apiproxy_stub.APIProxyStub):
111 """Python only memcache service stub.
113 This stub keeps all data in the local process' memory, not in any
114 external servers.
117 THREADSAFE = True
119 def __init__(self, gettime=time.time, service_name='memcache'):
120 """Initializer.
122 Args:
123 gettime: time.time()-like function used for testing.
124 service_name: Service name expected for all calls.
126 super(MemcacheServiceStub, self).__init__(service_name,
127 max_request_size=MAX_REQUEST_SIZE)
128 self._next_cas_id = 1
129 self._gettime = lambda: int(gettime())
130 self._ResetStats()
133 self._the_cache = {}
140 def _ResetStats(self):
141 """Resets statistics information.
143 Must be called while the current thread holds self._mutex (with an exception
144 for __init__).
147 self._hits = 0
148 self._misses = 0
149 self._byte_hits = 0
151 self._cache_creation_time = self._gettime()
153 @apiproxy_stub.Synchronized
154 def _GetKey(self, namespace, key):
155 """Retrieves a CacheEntry from the cache if it hasn't expired.
157 Does not take deletion timeout into account.
159 Args:
160 namespace: The namespace that keys are stored under.
161 key: The key to retrieve from the cache.
163 Returns:
164 The corresponding CacheEntry instance, or None if it was not found or
165 has already expired.
167 namespace_dict = self._the_cache.get(namespace, None)
168 if namespace_dict is None:
169 return None
170 entry = namespace_dict.get(key, None)
171 if entry is None:
172 return None
173 elif entry.CheckExpired():
174 del namespace_dict[key]
175 return None
176 else:
177 return entry
179 @apiproxy_stub.Synchronized
180 def _Dynamic_Get(self, request, response):
181 """Implementation of MemcacheService::Get().
183 Args:
184 request: A MemcacheGetRequest.
185 response: A MemcacheGetResponse.
187 namespace = request.name_space()
188 keys = set(request.key_list())
189 for key in keys:
190 entry = self._GetKey(namespace, key)
191 if entry is None or entry.CheckLocked():
192 self._misses += 1
193 continue
194 self._hits += 1
195 self._byte_hits += len(entry.value)
196 item = response.add_item()
197 item.set_key(key)
198 item.set_value(entry.value)
199 item.set_flags(entry.flags)
200 if request.for_cas():
201 item.set_cas_id(entry.cas_id)
203 @apiproxy_stub.Synchronized
204 def _Dynamic_Set(self, request, response):
205 """Implementation of MemcacheService::Set().
207 Args:
208 request: A MemcacheSetRequest.
209 response: A MemcacheSetResponse.
211 namespace = request.name_space()
212 for item in request.item_list():
213 key = item.key()
214 set_policy = item.set_policy()
215 old_entry = self._GetKey(namespace, key)
217 set_status = MemcacheSetResponse.NOT_STORED
218 if ((set_policy == MemcacheSetRequest.SET) or
219 (set_policy == MemcacheSetRequest.ADD and old_entry is None) or
220 (set_policy == MemcacheSetRequest.REPLACE and old_entry is not None)):
223 if (old_entry is None or
224 set_policy == MemcacheSetRequest.SET
225 or not old_entry.CheckLocked()):
226 set_status = MemcacheSetResponse.STORED
228 elif (set_policy == MemcacheSetRequest.CAS and item.has_cas_id()):
229 if old_entry is None or old_entry.CheckLocked():
230 set_status = MemcacheSetResponse.NOT_STORED
231 elif old_entry.cas_id != item.cas_id():
232 set_status = MemcacheSetResponse.EXISTS
233 else:
234 set_status = MemcacheSetResponse.STORED
236 if set_status == MemcacheSetResponse.STORED:
237 if namespace not in self._the_cache:
238 self._the_cache[namespace] = {}
239 self._the_cache[namespace][key] = CacheEntry(item.value(),
240 item.expiration_time(),
241 item.flags(),
242 self._next_cas_id,
243 gettime=self._gettime)
244 self._next_cas_id += 1
246 response.add_set_status(set_status)
248 @apiproxy_stub.Synchronized
249 def _Dynamic_Delete(self, request, response):
250 """Implementation of MemcacheService::Delete().
252 Args:
253 request: A MemcacheDeleteRequest.
254 response: A MemcacheDeleteResponse.
256 namespace = request.name_space()
257 for item in request.item_list():
258 key = item.key()
259 entry = self._GetKey(namespace, key)
261 delete_status = MemcacheDeleteResponse.DELETED
262 if entry is None:
263 delete_status = MemcacheDeleteResponse.NOT_FOUND
264 elif item.delete_time() == 0:
265 del self._the_cache[namespace][key]
266 else:
268 entry.ExpireAndLock(item.delete_time())
270 response.add_delete_status(delete_status)
272 @apiproxy_stub.Synchronized
273 def _internal_increment(self, namespace, request):
274 """Internal function for incrementing from a MemcacheIncrementRequest.
276 Args:
277 namespace: A string containing the namespace for the request, if any.
278 Pass an empty string if there is no namespace.
279 request: A MemcacheIncrementRequest instance.
281 Returns:
282 An integer or long if the offset was successful, None on error.
284 key = request.key()
285 entry = self._GetKey(namespace, key)
286 if entry is None or entry.CheckLocked():
287 if not request.has_initial_value():
288 return None
289 if namespace not in self._the_cache:
290 self._the_cache[namespace] = {}
291 flags = 0
292 if request.has_initial_flags():
293 flags = request.initial_flags()
294 self._the_cache[namespace][key] = CacheEntry(str(request.initial_value()),
295 expiration=0,
296 flags=flags,
297 cas_id=self._next_cas_id,
298 gettime=self._gettime)
299 self._next_cas_id += 1
300 entry = self._GetKey(namespace, key)
301 assert entry is not None
303 try:
304 old_value = long(entry.value)
305 if old_value < 0:
310 raise ValueError
311 except ValueError:
312 logging.error('Increment/decrement failed: Could not interpret '
313 'value for key = "%s" as an unsigned integer.', key)
314 return None
316 delta = request.delta()
317 if request.direction() == MemcacheIncrementRequest.DECREMENT:
318 delta = -delta
321 new_value = max(old_value + delta, 0) % (2**64)
323 entry.value = str(new_value)
324 return new_value
326 def _Dynamic_Increment(self, request, response):
327 """Implementation of MemcacheService::Increment().
329 Args:
330 request: A MemcacheIncrementRequest.
331 response: A MemcacheIncrementResponse.
333 namespace = request.name_space()
334 new_value = self._internal_increment(namespace, request)
335 if new_value is None:
336 raise apiproxy_errors.ApplicationError(
337 memcache_service_pb.MemcacheServiceError.UNSPECIFIED_ERROR)
338 response.set_new_value(new_value)
340 @apiproxy_stub.Synchronized
341 def _Dynamic_BatchIncrement(self, request, response):
342 """Implementation of MemcacheService::BatchIncrement().
344 Args:
345 request: A MemcacheBatchIncrementRequest.
346 response: A MemcacheBatchIncrementResponse.
348 namespace = request.name_space()
349 for request_item in request.item_list():
350 new_value = self._internal_increment(namespace, request_item)
351 item = response.add_item()
352 if new_value is None:
353 item.set_increment_status(MemcacheIncrementResponse.NOT_CHANGED)
354 else:
355 item.set_increment_status(MemcacheIncrementResponse.OK)
356 item.set_new_value(new_value)
358 @apiproxy_stub.Synchronized
359 def _Dynamic_FlushAll(self, request, response):
360 """Implementation of MemcacheService::FlushAll().
362 Args:
363 request: A MemcacheFlushRequest.
364 response: A MemcacheFlushResponse.
366 self._the_cache.clear()
367 self._ResetStats()
369 @apiproxy_stub.Synchronized
370 def _Dynamic_Stats(self, request, response):
371 """Implementation of MemcacheService::Stats().
373 Args:
374 request: A MemcacheStatsRequest.
375 response: A MemcacheStatsResponse.
377 stats = response.mutable_stats()
378 stats.set_hits(self._hits)
379 stats.set_misses(self._misses)
380 stats.set_byte_hits(self._byte_hits)
381 items = 0
382 total_bytes = 0
383 for namespace in self._the_cache.itervalues():
384 items += len(namespace)
385 for entry in namespace.itervalues():
386 total_bytes += len(entry.value)
387 stats.set_items(items)
388 stats.set_bytes(total_bytes)
391 stats.set_oldest_item_age(self._gettime() - self._cache_creation_time)