3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Stub version of the memcache API, keeping all data in process memory."""
32 from google
.appengine
.api
import apiproxy_stub
33 from google
.appengine
.api
import memcache
34 from google
.appengine
.api
.memcache
import memcache_service_pb
35 from google
.appengine
.runtime
import apiproxy_errors
38 MemcacheSetResponse
= memcache_service_pb
.MemcacheSetResponse
39 MemcacheSetRequest
= memcache_service_pb
.MemcacheSetRequest
40 MemcacheIncrementRequest
= memcache_service_pb
.MemcacheIncrementRequest
41 MemcacheIncrementResponse
= memcache_service_pb
.MemcacheIncrementResponse
42 MemcacheDeleteResponse
= memcache_service_pb
.MemcacheDeleteResponse
45 MAX_REQUEST_SIZE
= 32 << 20
48 class CacheEntry(object):
49 """An entry in the cache."""
51 def __init__(self
, value
, expiration
, flags
, cas_id
, gettime
):
55 value: String containing the data for this entry.
56 expiration: Number containing the expiration time or offset in seconds
58 flags: Opaque flags used by the memcache implementation.
59 cas_id: Unique Compare-And-Set ID.
60 gettime: Used for testing. Function that works like time.time().
62 assert isinstance(value
, basestring
)
63 assert len(value
) <= memcache
.MAX_VALUE_SIZE
64 assert isinstance(expiration
, (int, long))
66 self
._gettime
= gettime
70 self
.created_time
= self
._gettime
()
71 self
.will_expire
= expiration
!= 0
73 self
._SetExpiration
(expiration
)
75 def _SetExpiration(self
, expiration
):
76 """Sets the expiration for this entry.
79 expiration: Number containing the expiration time or offset in seconds
80 for this entry. If expiration is above one month, then it's considered
81 an absolute time since the UNIX epoch.
83 if expiration
> (86400 * 30):
84 self
.expiration_time
= expiration
86 self
.expiration_time
= self
._gettime
() + expiration
88 def CheckExpired(self
):
89 """Returns True if this entry has expired; False otherwise."""
90 return self
.will_expire
and self
._gettime
() >= self
.expiration_time
92 def ExpireAndLock(self
, timeout
):
93 """Marks this entry as deleted and locks it for the expiration time.
95 Used to implement memcache's delete timeout behavior.
98 timeout: Parameter originally passed to memcache.delete or
99 memcache.delete_multi to control deletion timeout.
101 self
.will_expire
= True
103 self
._SetExpiration
(timeout
)
105 def CheckLocked(self
):
106 """Returns True if this entry was deleted but has not yet timed out."""
107 return self
.locked
and not self
.CheckExpired()
110 class MemcacheServiceStub(apiproxy_stub
.APIProxyStub
):
111 """Python only memcache service stub.
113 This stub keeps all data in the local process' memory, not in any
119 def __init__(self
, gettime
=time
.time
, service_name
='memcache'):
123 gettime: time.time()-like function used for testing.
124 service_name: Service name expected for all calls.
126 super(MemcacheServiceStub
, self
).__init
__(service_name
,
127 max_request_size
=MAX_REQUEST_SIZE
)
128 self
._next
_cas
_id
= 1
129 self
._gettime
= lambda: int(gettime())
140 def _ResetStats(self
):
141 """Resets statistics information.
143 Must be called while the current thread holds self._mutex (with an exception
151 self
._cache
_creation
_time
= self
._gettime
()
153 @apiproxy_stub.Synchronized
154 def _GetKey(self
, namespace
, key
):
155 """Retrieves a CacheEntry from the cache if it hasn't expired.
157 Does not take deletion timeout into account.
160 namespace: The namespace that keys are stored under.
161 key: The key to retrieve from the cache.
164 The corresponding CacheEntry instance, or None if it was not found or
167 namespace_dict
= self
._the
_cache
.get(namespace
, None)
168 if namespace_dict
is None:
170 entry
= namespace_dict
.get(key
, None)
173 elif entry
.CheckExpired():
174 del namespace_dict
[key
]
179 @apiproxy_stub.Synchronized
180 def _Dynamic_Get(self
, request
, response
):
181 """Implementation of MemcacheService::Get().
184 request: A MemcacheGetRequest.
185 response: A MemcacheGetResponse.
187 namespace
= request
.name_space()
188 keys
= set(request
.key_list())
190 entry
= self
._GetKey
(namespace
, key
)
191 if entry
is None or entry
.CheckLocked():
195 self
._byte
_hits
+= len(entry
.value
)
196 item
= response
.add_item()
198 item
.set_value(entry
.value
)
199 item
.set_flags(entry
.flags
)
200 if request
.for_cas():
201 item
.set_cas_id(entry
.cas_id
)
203 @apiproxy_stub.Synchronized
204 def _Dynamic_Set(self
, request
, response
):
205 """Implementation of MemcacheService::Set().
208 request: A MemcacheSetRequest.
209 response: A MemcacheSetResponse.
211 namespace
= request
.name_space()
212 for item
in request
.item_list():
214 set_policy
= item
.set_policy()
215 old_entry
= self
._GetKey
(namespace
, key
)
217 set_status
= MemcacheSetResponse
.NOT_STORED
218 if ((set_policy
== MemcacheSetRequest
.SET
) or
219 (set_policy
== MemcacheSetRequest
.ADD
and old_entry
is None) or
220 (set_policy
== MemcacheSetRequest
.REPLACE
and old_entry
is not None)):
223 if (old_entry
is None or
224 set_policy
== MemcacheSetRequest
.SET
225 or not old_entry
.CheckLocked()):
226 set_status
= MemcacheSetResponse
.STORED
228 elif (set_policy
== MemcacheSetRequest
.CAS
and item
.has_cas_id()):
229 if old_entry
is None or old_entry
.CheckLocked():
230 set_status
= MemcacheSetResponse
.NOT_STORED
231 elif old_entry
.cas_id
!= item
.cas_id():
232 set_status
= MemcacheSetResponse
.EXISTS
234 set_status
= MemcacheSetResponse
.STORED
236 if set_status
== MemcacheSetResponse
.STORED
:
237 if namespace
not in self
._the
_cache
:
238 self
._the
_cache
[namespace
] = {}
239 self
._the
_cache
[namespace
][key
] = CacheEntry(item
.value(),
240 item
.expiration_time(),
243 gettime
=self
._gettime
)
244 self
._next
_cas
_id
+= 1
246 response
.add_set_status(set_status
)
248 @apiproxy_stub.Synchronized
249 def _Dynamic_Delete(self
, request
, response
):
250 """Implementation of MemcacheService::Delete().
253 request: A MemcacheDeleteRequest.
254 response: A MemcacheDeleteResponse.
256 namespace
= request
.name_space()
257 for item
in request
.item_list():
259 entry
= self
._GetKey
(namespace
, key
)
261 delete_status
= MemcacheDeleteResponse
.DELETED
263 delete_status
= MemcacheDeleteResponse
.NOT_FOUND
264 elif item
.delete_time() == 0:
265 del self
._the
_cache
[namespace
][key
]
268 entry
.ExpireAndLock(item
.delete_time())
270 response
.add_delete_status(delete_status
)
272 @apiproxy_stub.Synchronized
273 def _internal_increment(self
, namespace
, request
):
274 """Internal function for incrementing from a MemcacheIncrementRequest.
277 namespace: A string containing the namespace for the request, if any.
278 Pass an empty string if there is no namespace.
279 request: A MemcacheIncrementRequest instance.
282 An integer or long if the offset was successful, None on error.
285 entry
= self
._GetKey
(namespace
, key
)
286 if entry
is None or entry
.CheckLocked():
287 if not request
.has_initial_value():
289 if namespace
not in self
._the
_cache
:
290 self
._the
_cache
[namespace
] = {}
292 if request
.has_initial_flags():
293 flags
= request
.initial_flags()
294 self
._the
_cache
[namespace
][key
] = CacheEntry(str(request
.initial_value()),
297 cas_id
=self
._next
_cas
_id
,
298 gettime
=self
._gettime
)
299 self
._next
_cas
_id
+= 1
300 entry
= self
._GetKey
(namespace
, key
)
301 assert entry
is not None
304 old_value
= long(entry
.value
)
312 logging
.error('Increment/decrement failed: Could not interpret '
313 'value for key = "%s" as an unsigned integer.', key
)
316 delta
= request
.delta()
317 if request
.direction() == MemcacheIncrementRequest
.DECREMENT
:
321 new_value
= max(old_value
+ delta
, 0) % (2**64)
323 entry
.value
= str(new_value
)
326 def _Dynamic_Increment(self
, request
, response
):
327 """Implementation of MemcacheService::Increment().
330 request: A MemcacheIncrementRequest.
331 response: A MemcacheIncrementResponse.
333 namespace
= request
.name_space()
334 new_value
= self
._internal
_increment
(namespace
, request
)
335 if new_value
is None:
336 raise apiproxy_errors
.ApplicationError(
337 memcache_service_pb
.MemcacheServiceError
.UNSPECIFIED_ERROR
)
338 response
.set_new_value(new_value
)
340 @apiproxy_stub.Synchronized
341 def _Dynamic_BatchIncrement(self
, request
, response
):
342 """Implementation of MemcacheService::BatchIncrement().
345 request: A MemcacheBatchIncrementRequest.
346 response: A MemcacheBatchIncrementResponse.
348 namespace
= request
.name_space()
349 for request_item
in request
.item_list():
350 new_value
= self
._internal
_increment
(namespace
, request_item
)
351 item
= response
.add_item()
352 if new_value
is None:
353 item
.set_increment_status(MemcacheIncrementResponse
.NOT_CHANGED
)
355 item
.set_increment_status(MemcacheIncrementResponse
.OK
)
356 item
.set_new_value(new_value
)
358 @apiproxy_stub.Synchronized
359 def _Dynamic_FlushAll(self
, request
, response
):
360 """Implementation of MemcacheService::FlushAll().
363 request: A MemcacheFlushRequest.
364 response: A MemcacheFlushResponse.
366 self
._the
_cache
.clear()
369 @apiproxy_stub.Synchronized
370 def _Dynamic_Stats(self
, request
, response
):
371 """Implementation of MemcacheService::Stats().
374 request: A MemcacheStatsRequest.
375 response: A MemcacheStatsResponse.
377 stats
= response
.mutable_stats()
378 stats
.set_hits(self
._hits
)
379 stats
.set_misses(self
._misses
)
380 stats
.set_byte_hits(self
._byte
_hits
)
383 for namespace
in self
._the
_cache
.itervalues():
384 items
+= len(namespace
)
385 for entry
in namespace
.itervalues():
386 total_bytes
+= len(entry
.value
)
387 stats
.set_items(items
)
388 stats
.set_bytes(total_bytes
)
391 stats
.set_oldest_item_age(self
._gettime
() - self
._cache
_creation
_time
)