Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / EntityCache.java
blob710153cdd2ac0c3208de60287e7477563d11aea4
1 package com.google.appengine.api.datastore;
3 import static com.google.appengine.api.datastore.CacheValueUtil.createCacheValue;
4 import static com.google.appengine.api.datastore.CacheValueUtil.isDatastoreOpState;
5 import static com.google.appengine.api.datastore.EntityCacheTranslator.fromMemcacheIdentifiableValue;
6 import static com.google.appengine.api.datastore.EntityCacheTranslator.fromMemcacheKey;
7 import static com.google.appengine.api.datastore.EntityCacheTranslator.toMemcacheCasValues;
8 import static com.google.appengine.api.datastore.EntityCacheTranslator.toMemcacheKey;
9 import static com.google.appengine.api.datastore.EntityCacheTranslator.toMemcacheValue;
10 import static com.google.appengine.api.datastore.FutureHelper.quietGet;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
14 import com.google.appengine.api.memcache.Expiration;
15 import com.google.appengine.api.memcache.MemcacheService.CasValues;
16 import com.google.appengine.api.memcache.MemcacheService.IdentifiableValue;
17 import com.google.appengine.api.memcache.MemcacheService.SetPolicy;
18 import com.google.appengine.api.utils.FutureWrapper;
19 import com.google.apphosting.datastore.EntityStorage.CacheValue;
20 import com.google.apphosting.datastore.EntityStorage.CacheValue.State;
21 import com.google.common.base.Predicate;
22 import com.google.common.collect.Lists;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.Future;
33 /**
34 * A datastore entity cache backed by memcache.
37 class EntityCache {
39 static final double EXTRA_STATE_EXPIRATION_SECS = 5;
41 private final MemcacheServiceHelper memcacheServiceHelper;
42 private final double datastoreRpcDeadlineSecs;
44 /**
45 * Constructs an {@link EntityCache} object.
47 * @param memcacheServiceHelper the {@link MemcacheServiceHelper} to use for all memcache
48 * operations.
49 * @param datastoreRpcDeadlineSecs the datastore rpc deadline in seconds.
50 * @throws IllegalArgumentException if the specified {@code datastoreRpcDeadlineSecs} is less
51 * than or equal to zero.
53 public EntityCache(MemcacheServiceHelper memcacheServiceHelper, double datastoreRpcDeadlineSecs) {
54 checkArgument(datastoreRpcDeadlineSecs > 0,
55 "The datastoreRpcDeadlineSecs argument must be greater than 0");
56 this.memcacheServiceHelper = memcacheServiceHelper;
57 this.datastoreRpcDeadlineSecs = datastoreRpcDeadlineSecs;
60 /**
61 * Sets the cache value for each key specified to the provided datastore operation {@code state}
62 * if the update satisfies the cache update {@code policy}.
63 * <p>
64 * The cache values for the specified {@code state} will expire with an expiration time
65 * equal to the datastore rpc deadline.
67 * @param keys the keys to update in the cache.
68 * @param state the state to store in the cache for each key.
69 * @param policy what to do if the cache entry is or is not already present
70 * @return the keys successfully updated in the cache. Keys in {@code keys} may not be
71 * in the returned set because of the {@code policy} regarding pre-existing entries.
72 * @throws IllegalArgumentException if the {@code state} is not {@link State#READ_IN_PROGRESS} or
73 * {@link State#MUTATION_IN_PROGRESS}.
75 public Future<Set<Key>> putStateAsync(Collection<Key> keys, State state, SetPolicy policy) {
76 checkArgument(isDatastoreOpState(state),
77 "The state argument is not for a datastore operation: " + state);
78 Map<String, byte[]> memcacheValues = Maps.newHashMapWithExpectedSize(keys.size());
79 byte[] stateMemcacheValue = toMemcacheValue(createCacheValue(state, null));
80 for (Key key : keys) {
81 memcacheValues.put(toMemcacheKey(key), stateMemcacheValue);
83 return translateMemcacheKeySetResult(
84 memcacheServiceHelper.put(memcacheValues, policy, computeDatastoreOpStateExpiration(0)));
87 /**
88 * Sets the cache value for each key specified to the provided {@code state} if the update
89 * satisfies the cache update {@code policy}, and returns an {@link IdentifiableCacheValue} object
90 * for each successful update.
91 * <p>
92 * The cache values for the specified {@code state} will expire with an expiration time
93 * equal to the datastore rpc deadline.
95 * @param keys the keys to update in the cache.
96 * @param state the state to store in the cache for each key.
97 * @param policy what to do if the cache entry is or is not already present
98 * @return a mapping from keys to a {@link IdentifiableCacheValue} objects for each state
99 * for every key successfully updated in the cache. Keys in {@code keys} may not be
100 * in the returned map because of the {@code policy} regarding pre-existing entries.
101 * @throws IllegalArgumentException if the {@code state} is not {@link State#READ_IN_PROGRESS} or
102 * {@link State#MUTATION_IN_PROGRESS}.
104 public Map<Key, IdentifiableCacheValue> putStateIdentifiable(Collection<Key> keys,
105 final State state, SetPolicy policy) {
106 checkArgument(isDatastoreOpState(state),
107 "The state argument is not for a datastore operation: " + state);
108 Map<String, byte[]> memcacheValues = Maps.newHashMapWithExpectedSize(keys.size());
109 byte[] stateMemcacheValue = toMemcacheValue(createCacheValue(state, null));
110 for (Key key : keys) {
111 memcacheValues.put(toMemcacheKey(key), stateMemcacheValue);
113 Set<String> keysPut = quietGet(
114 memcacheServiceHelper.put(memcacheValues, policy, computeDatastoreOpStateExpiration(2)));
115 Predicate<CacheValue> validateState = new Predicate<CacheValue>() {
116 @Override
117 public boolean apply(CacheValue value) {
118 return value.getStateEnum() == state;
121 return quietGet(
122 getIdentifiableHelper(keysPut, validateState));
126 * Puts the entities specified into the cache if the cache update {@code policy} is satisfied.
128 * @param entityValues the key to entity mappings to add to the cache. {@code null} values can
129 * not be used for the entity values.
130 * @param policy what to do if the cache entry is or is not already present
131 * @return the keys successfully updated in the cache. Keys in {@code entityValues} may not be
132 * in the returned set because of the {@code policy} regarding pre-existing entries.
134 public Future<Set<Key>> putEntitiesAsync(Map<Key, EntityProto> entityValues, SetPolicy policy) {
135 Map<String, byte[]> memcacheValues = Maps.newHashMapWithExpectedSize(entityValues.size());
136 for (Map.Entry<Key, EntityProto> entityValueEntry : entityValues.entrySet()) {
137 EntityProto entity =
138 checkNotNull(entityValueEntry.getValue(), "The entity value can not be null\n");
139 memcacheValues.put(toMemcacheKey(entityValueEntry.getKey()),
140 toMemcacheValue(createCacheValue(State.ENTITY, entity)));
142 return translateMemcacheKeySetResult(
143 memcacheServiceHelper.put(memcacheValues, policy));
147 * Atomically stores the new value of each {@link CasCacheValues} object in the {@code values}
148 * map if no other value has been stored in the cache since each {@code CasValues} object's old
149 * value was retrieved and if the old value is still present in the cache.
151 * @param values the key to {@code CasCacheValues} mappings to compare and swap.
152 * @return the set of keys for which the new value was stored.
154 public Future<Set<Key>> putIfUntouchedAsync(Map<Key, CasCacheValues> values) {
155 Map<String, CasValues> memcacheValues = Maps.newHashMapWithExpectedSize(values.size());
156 for (Map.Entry<Key, CasCacheValues> valueEntry : values.entrySet()) {
157 memcacheValues.put(toMemcacheKey(valueEntry.getKey()),
158 toMemcacheCasValues(valueEntry.getValue()));
160 return translateMemcacheKeySetResult(
161 memcacheServiceHelper.putIfUntouched(memcacheValues));
165 * Fetches previously-stored cache values and encapsulates them in an
166 * {@link IdentifiableCacheValue} object.
168 * @param keys a collection of keys for which values should be retrieved
169 * @return a mapping from keys to values of any entries found. If a requested
170 * key is not found in the cache the key will not be in the returned {@link Map}.
172 public Future<Map<Key, IdentifiableCacheValue>> getIdentifiableAsync(Collection<Key> keys) {
173 List<String> memcacheKeys = Lists.newArrayListWithExpectedSize(keys.size());
174 for (Key key : keys) {
175 memcacheKeys.add(toMemcacheKey(key));
177 return getIdentifiableHelper(memcacheKeys, null);
181 * Invokes {@link MemcacheServiceHelper#getIdentifiable} and post-processes the results
182 * to remove EMPTY cache values and values that are filtered by the {@code valueFilter}.
184 * @param memcacheKeys a collection of memcache keys for which values should be retrieved.
185 * @param valueFilter an optional predicate to filter the fetched values.
186 * @return a mapping from keys to values of any entries found. If a requested
187 * key is not found in the cache or the value has been filtered the key will not be in
188 * the returned {@link Map}.
190 private Future<Map<Key, IdentifiableCacheValue>> getIdentifiableHelper(
191 Collection<String> memcacheKeys, final Predicate<CacheValue> valueFilter) {
192 Future<Map<String, IdentifiableValue>> getResults =
193 memcacheServiceHelper.getIdentifiable(memcacheKeys);
194 return new FutureWrapper<Map<String, IdentifiableValue>, Map<Key, IdentifiableCacheValue>>(
195 getResults) {
197 @Override
198 protected Map<Key, IdentifiableCacheValue> wrap(
199 Map<String, IdentifiableValue> memcacheResults) throws Exception {
200 Map<Key, IdentifiableCacheValue> results =
201 Maps.newHashMapWithExpectedSize(memcacheResults.size());
202 for (Map.Entry<String, IdentifiableValue> memcacheResult : memcacheResults.entrySet()) {
203 IdentifiableCacheValue identifiableCacheValue =
204 fromMemcacheIdentifiableValue(memcacheResult.getValue());
205 if (identifiableCacheValue == null) {
206 continue;
208 CacheValue cacheValue = identifiableCacheValue.getValue();
209 if ((valueFilter != null) && !valueFilter.apply(cacheValue)) {
210 continue;
212 results.put(fromMemcacheKey(memcacheResult.getKey()), identifiableCacheValue);
214 return results;
217 @Override
218 protected Throwable convertException(Throwable cause) {
219 return cause;
225 * Atomically deletes cache entries for each {@link IdentifiableCacheValue} object in the
226 * {@code values} map if no other value has been stored in the cache since the
227 * {@code IdentifiableCacheValue} object was retrieved and if the cache value is still present
228 * in the cache.
230 * @param values the key to {@code CasCacheValues} mappings to compare and swap.
231 * @return the set of keys for which the cache entries were deleted.
233 public Future<Set<Key>> evictIfUntouchedAsync(Map<Key, IdentifiableCacheValue> values) {
234 Map<String, CasValues> memcacheValues = Maps.newHashMapWithExpectedSize(values.size());
235 for (Map.Entry<Key, IdentifiableCacheValue> valueEntry : values.entrySet()) {
236 CasCacheValues casCacheValue = new CasCacheValues(valueEntry.getValue(), null);
237 memcacheValues.put(toMemcacheKey(valueEntry.getKey()),
238 toMemcacheCasValues(casCacheValue));
240 return translateMemcacheKeySetResult(
241 memcacheServiceHelper.putIfUntouched(memcacheValues));
245 * Evicts the cache entries for the specified keys.
247 * @param keys the keys to evict from the cache.
248 * @return the set of keys successfully deleted. Any keys in {@code keys} but not in the
249 * returned set were not found in the cache. The iteration order of the returned set matches
250 * the iteration order of the provided {@code keys}.
252 public Future<Set<Key>> evictAsync(Collection<Key> keys) {
253 List<String> memcacheKeys = Lists.newArrayListWithExpectedSize(keys.size());
254 for (Key key : keys) {
255 memcacheKeys.add(toMemcacheKey(key));
257 return translateMemcacheKeySetResult(
258 memcacheServiceHelper.delete(memcacheKeys));
262 * Evicts the cache entries that contain entities for the keys specified.
264 * @param keys the keys to evict from the cache.
265 * @return the set of keys successfully deleted. Any keys in {@code keys} but not in the
266 * returned set were either not found in the cache or had cache values that were not
267 * entity objects.
269 public Set<Key> evictEntitiesOnly(Collection<Key> keys) {
270 List<String> memcacheKeys = Lists.newArrayListWithExpectedSize(keys.size());
271 for (Key key : keys) {
272 memcacheKeys.add(toMemcacheKey(key));
274 Predicate<CacheValue> entitiesOnly = new Predicate<CacheValue>() {
275 @Override
276 public boolean apply(CacheValue value) {
277 return value.getStateEnum() == State.ENTITY;
280 Map<Key, IdentifiableCacheValue> getResults =
281 quietGet(getIdentifiableHelper(memcacheKeys, entitiesOnly));
282 return quietGet(evictIfUntouchedAsync(getResults));
286 * Translates a {@link Set} of memcache keys returned by a {@link Future} to a {@code Set} of
287 * {@link Key} objects.
289 private static Future<Set<Key>> translateMemcacheKeySetResult(Future<Set<String>> inputSet) {
290 return new FutureWrapper<Set<String>, Set<Key>>(inputSet) {
291 @Override
292 protected Set<Key> wrap(Set<String> memcacheKeys) throws Exception {
293 Set<Key> outputResults = Sets.newLinkedHashSetWithExpectedSize(memcacheKeys.size());
294 for (String memcacheKey : memcacheKeys) {
295 outputResults.add(fromMemcacheKey(memcacheKey));
297 return outputResults;
300 @Override
301 protected Throwable convertException(Throwable cause) {
302 return cause;
308 * Computes the expiration time for a cache value containing a state used to demarcate in progress
309 * datastore operations.
310 * <p>
311 * This computation assumes that after issuing the entity cache operation the invoker will
312 * complete any associated datastore operations within the time specified by the datastore rpc
313 * deadline. Meaning an invoker can issue one or more async datastore operations, but as soon
314 * as the invoker blocks on a single datastore operation the invoker should not issue any more
315 * datastore operations to the keys associated with the state being broadacast via the cache.
317 * @param numSyncMemcacheOps the number of synchronous memcache operations issued prior to
318 * returning control to the entity cache api invoker.
319 * @return the expiration time for the state cache value.
321 Expiration computeDatastoreOpStateExpiration(int numSyncMemcacheOps) {
322 double stateExpirationTimeSecs =
323 numSyncMemcacheOps * memcacheServiceHelper.getRpcDeadlineSecs() +
324 datastoreRpcDeadlineSecs +
325 EXTRA_STATE_EXPIRATION_SECS;
326 return Expiration.byDeltaMillis((int) (stateExpirationTimeSecs * 1000));
330 * Encapsulates a {@link CacheValue} object that was derived from a memcache
331 * {@link IdentifiableValue} object. The encapsulated object can later be used in compare and swap
332 * entity cache operations.
334 public static final class IdentifiableCacheValue {
335 private final IdentifiableValue memcacheValue;
336 private final CacheValue value;
339 * Constructs an {@link IdentifiableCacheValue} object.
341 * @param memcacheValue the identifiable memcache value.
342 * @param value the cache value derived from the {@code memcacheValue}.
344 public IdentifiableCacheValue(IdentifiableValue memcacheValue, CacheValue value) {
345 this.memcacheValue = memcacheValue;
346 this.value = value;
350 * @return the cache value.
352 public CacheValue getValue() {
353 return value;
357 * @return the identifiable memcache value.
359 public IdentifiableValue getMemcacheValue() {
360 return memcacheValue;
365 * A container for value arguments for compare and swap cache operations.
367 public static final class CasCacheValues {
369 private final IdentifiableCacheValue oldValue;
370 private final CacheValue newValue;
373 * Constructs a {@link CasCacheValues} object.
375 * @param oldValue the old value to swap out.
376 * @param newValue an entity {@code CacheValue} object or {@code null} to evict the old value
377 * from the cache.
379 public CasCacheValues(IdentifiableCacheValue oldValue, CacheValue newValue) {
380 this.oldValue =
381 checkNotNull(oldValue, "The oldValue argument can not be null");
382 if (newValue != null) {
383 checkArgument(newValue.getStateEnum() == State.ENTITY,
384 "The newValue argument does not contain an entity");
386 this.newValue = newValue;
390 * @return the old cache value.
392 public IdentifiableCacheValue getOldValue() {
393 return oldValue;
397 * @return the proposed new cache value. Or {@code null} if the old cache value is being
398 * evicted instead of being replaced.
400 public CacheValue getNewValue() {
401 return newValue;