Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / memcache / AsyncMemcacheServiceImpl.java
blob331ce207fda3479552544370b8b1b533ea09cc55
1 // Copyright 2011 Google Inc. All rights reserved
3 package com.google.appengine.api.memcache;
5 import static com.google.appengine.api.memcache.MemcacheServiceApiHelper.makeAsyncCall;
7 import com.google.appengine.api.memcache.MemcacheSerialization.ValueAndFlags;
8 import com.google.appengine.api.memcache.MemcacheService.CasValues;
9 import com.google.appengine.api.memcache.MemcacheService.IdentifiableValue;
10 import com.google.appengine.api.memcache.MemcacheService.SetPolicy;
11 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.Provider;
12 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.RpcResponseHandler;
13 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.Transformer;
14 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheBatchIncrementRequest;
15 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheBatchIncrementResponse;
16 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteRequest;
17 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteResponse;
18 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteResponse.DeleteStatusCode;
19 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheFlushRequest;
20 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheFlushResponse;
21 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetRequest;
22 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetResponse;
23 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementRequest;
24 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementRequest.Direction;
25 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementResponse;
26 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementResponse.IncrementStatusCode;
27 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheServiceError;
28 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetRequest;
29 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetResponse;
30 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetResponse.SetStatusCode;
31 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheStatsRequest;
32 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheStatsResponse;
33 import com.google.appengine.api.memcache.MemcacheServicePb.MergedNamespaceStats;
34 import com.google.appengine.api.utils.FutureWrapper;
35 import com.google.apphosting.api.ApiProxy;
36 import com.google.protobuf.ByteString;
37 import com.google.protobuf.Message;
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.Iterator;
45 import java.util.LinkedHashMap;
46 import java.util.LinkedHashSet;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
53 /**
54 * Java bindings for the AsyncMemcache service.
57 class AsyncMemcacheServiceImpl extends BaseMemcacheServiceImpl implements AsyncMemcacheService {
59 static class StatsImpl implements Stats {
60 private final long hits, misses, bytesFetched, items, bytesStored;
61 private final int maxCachedTime;
63 StatsImpl(MergedNamespaceStats stats) {
64 if (stats != null) {
65 hits = stats.getHits();
66 misses = stats.getMisses();
67 bytesFetched = stats.getByteHits();
68 items = stats.getItems();
69 bytesStored = stats.getBytes();
70 maxCachedTime = stats.getOldestItemAge();
71 } else {
72 hits = misses = bytesFetched = items = bytesStored = 0;
73 maxCachedTime = 0;
77 @Override
78 public long getHitCount() {
79 return hits;
82 @Override
83 public long getMissCount() {
84 return misses;
87 @Override
88 public long getBytesReturnedForHits() {
89 return bytesFetched;
92 @Override
93 public long getItemCount() {
94 return items;
97 @Override
98 public long getTotalItemBytes() {
99 return bytesStored;
102 @Override
103 public int getMaxTimeWithoutAccess() {
104 return maxCachedTime;
107 @Override
108 public String toString() {
109 StringBuilder builder = new StringBuilder();
110 builder.append("Hits: ").append(hits).append('\n');
111 builder.append("Misses: ").append(misses).append('\n');
112 builder.append("Bytes Fetched: ").append(bytesFetched).append('\n');
113 builder.append("Bytes Stored: ").append(bytesStored).append('\n');
114 builder.append("Items: ").append(items).append('\n');
115 builder.append("Max Cached Time: ").append(maxCachedTime).append('\n');
116 return builder.toString();
120 static class IdentifiableValueImpl implements IdentifiableValue {
121 private final Object value;
122 private final long casId;
124 IdentifiableValueImpl(Object value, long casId) {
125 this.value = value;
126 this.casId = casId;
129 @Override
130 public Object getValue() {
131 return value;
134 long getCasId() {
135 return casId;
139 private static class DefaultValueProviders {
141 @SuppressWarnings("rawtypes")
142 private static final Provider NULL_PROVIDER = new Provider() {
143 @Override public Object get() {
144 return null;
148 private static final Provider<Boolean> FALSE_PROVIDER = new Provider<Boolean>() {
149 @Override public Boolean get() {
150 return Boolean.FALSE;
154 @SuppressWarnings("rawtypes")
155 private static final Provider SET_PROVIDER = new Provider<Set<?>>() {
156 @Override public Set<?> get() {
157 return new HashSet(0, 1);
161 @SuppressWarnings("rawtypes")
162 private static final Provider MAP_PROVIDER = new Provider<Map<?, ?>>() {
163 @Override public Map<?, ?> get() {
164 return new HashMap(0, 1);
168 private static final Provider<Stats> STATS_PROVIDER = new Provider<Stats>() {
169 final Stats emptyStats = new AsyncMemcacheServiceImpl.StatsImpl(null);
171 @Override public Stats get() {
172 return emptyStats;
176 static Provider<Boolean> falseValue() {
177 return FALSE_PROVIDER;
180 @SuppressWarnings("unchecked")
181 static <T> Provider<T> nullValue() {
182 return NULL_PROVIDER;
185 @SuppressWarnings("unchecked")
186 static <T> Provider<Set<T>> emptySet() {
187 return SET_PROVIDER;
190 @SuppressWarnings("unchecked")
191 static <K, V> Provider<Map<K, V>> emptyMap() {
192 return MAP_PROVIDER;
195 static Provider<Stats> emptyStats() {
196 return STATS_PROVIDER;
200 private static class VoidFutureWrapper<K> extends FutureWrapper<K, Void> {
202 private VoidFutureWrapper(Future<K> parent) {
203 super(parent);
206 private static <K> Future<Void> wrap(Future<K> parent) {
207 return new VoidFutureWrapper<K>(parent);
210 @Override
211 protected Void wrap(K value) {
212 return null;
215 @Override
216 protected Throwable convertException(Throwable cause) {
217 return cause;
221 private static final class KeyValuePair<K, V> {
222 private final K key;
223 private final V value;
225 private KeyValuePair(K key, V value) {
226 this.key = key;
227 this.value = value;
230 static <K, V> KeyValuePair<K, V> of(K key, V value) {
231 return new KeyValuePair<K, V>(key, value);
235 AsyncMemcacheServiceImpl(String namespace) {
236 super(namespace);
239 static <T, V> Map<T, V> makeMap(Collection<T> keys, V value) {
240 Map<T, V> map = new LinkedHashMap<T, V>(keys.size(), 1);
241 for (T key : keys) {
242 map.put(key, value);
244 return map;
247 private static ByteString makePbKey(Object key) {
248 try {
249 return ByteString.copyFrom(MemcacheSerialization.makePbKey(key));
250 } catch (IOException ex) {
251 throw new IllegalArgumentException("Cannot use as a key: '" + key + "'", ex);
255 private static ValueAndFlags serializeValue(Object value) {
256 try {
257 return MemcacheSerialization.serialize(value);
258 } catch (IOException ex) {
259 throw new IllegalArgumentException("Cannot use as value: '" + value + "'", ex);
263 private Object deserializeItem(Object key, MemcacheGetResponse.Item item) {
264 try {
265 return MemcacheSerialization.deserialize(item.getValue().toByteArray(), item.getFlags());
266 } catch (ClassNotFoundException ex) {
267 getErrorHandler().handleDeserializationError(
268 new InvalidValueException("Can't find class for value of key '" + key + "'", ex));
269 return null;
270 } catch (IOException ex) {
271 throw new InvalidValueException("IO exception parsing value of '" + key + "'", ex);
275 private <M extends Message, T> RpcResponseHandler<M, T> createRpcResponseHandler(
276 M response, String errorText, Transformer<M, T> responseTransformer) {
277 return new RpcResponseHandler<M, T>(
278 response, errorText, responseTransformer, getErrorHandler());
281 @Override
282 public Future<Boolean> contains(Object key) {
283 return doGet(key, false, "Memcache contains: exception testing contains (" + key + ")",
284 new Transformer<MemcacheGetResponse, Boolean>() {
285 @Override public Boolean transform(MemcacheGetResponse response) {
286 return response.getItemCount() > 0;
288 }, DefaultValueProviders.falseValue());
291 private <T> Future<T> doGet(Object key, boolean forCas, String errorText,
292 final Transformer<MemcacheGetResponse, T> responseTransfomer, Provider<T> defaultValue) {
293 MemcacheGetRequest.Builder requestBuilder = MemcacheGetRequest.newBuilder();
294 requestBuilder.addKey(makePbKey(key));
295 requestBuilder.setNameSpace(getEffectiveNamespace());
296 if (forCas) {
297 requestBuilder.setForCas(true);
299 return makeAsyncCall("Get", requestBuilder.build(),
300 createRpcResponseHandler(MemcacheGetResponse.getDefaultInstance(), errorText,
301 responseTransfomer), defaultValue);
304 @Override
305 public Future<Object> get(final Object key) {
306 return doGet(key, false, "Memcache get: exception getting 1 key (" + key + ")",
307 new Transformer<MemcacheGetResponse, Object>() {
308 @Override public Object transform(MemcacheGetResponse response) {
309 return response.getItemCount() == 0 ? null : deserializeItem(key, response.getItem(0));
311 }, DefaultValueProviders.nullValue());
314 @Override
315 public Future<IdentifiableValue> getIdentifiable(final Object key) {
316 return doGet(key, true, "Memcache getIdentifiable: exception getting 1 key (" + key + ")",
317 new Transformer<MemcacheGetResponse, IdentifiableValue>() {
318 @Override public IdentifiableValue transform(MemcacheGetResponse response) {
319 if (response.getItemCount() == 0) {
320 return null;
322 MemcacheGetResponse.Item item = response.getItem(0);
323 return new IdentifiableValueImpl(deserializeItem(key, item), item.getCasId());
325 }, DefaultValueProviders.<IdentifiableValue>nullValue());
328 @Override
329 public <K> Future<Map<K, IdentifiableValue>> getIdentifiables(Collection<K> keys) {
330 return doGetAll(keys, true,
331 "Memcache getIdentifiables: exception getting multiple keys",
332 new Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, IdentifiableValue>() {
333 @Override
334 public IdentifiableValue transform(KeyValuePair<K, MemcacheGetResponse.Item> pair) {
335 MemcacheGetResponse.Item item = pair.value;
336 Object value = deserializeItem(pair.key, item);
337 return new IdentifiableValueImpl(value, item.getCasId());
339 }, DefaultValueProviders.<K, IdentifiableValue>emptyMap());
342 @Override
343 public <K> Future<Map<K, Object>> getAll(Collection<K> keys) {
344 return doGetAll(keys, false,
345 "Memcache getAll: exception getting multiple keys",
346 new Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, Object>() {
347 @Override
348 public Object transform(KeyValuePair<K, MemcacheGetResponse.Item> pair) {
349 return deserializeItem(pair.key, pair.value);
351 }, DefaultValueProviders.<K, Object>emptyMap());
354 private <K, V> Future<Map<K, V>> doGetAll(Collection<K> keys, boolean forCas,
355 String errorText,
356 final Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, V> responseTransfomer,
357 Provider<Map<K, V>> defaultValue) {
358 MemcacheGetRequest.Builder requestBuilder = MemcacheGetRequest.newBuilder();
359 requestBuilder.setNameSpace(getEffectiveNamespace());
360 final Map<ByteString, K> byteStringToKey = new HashMap<ByteString, K>(keys.size(), 1);
361 for (K key : keys) {
362 ByteString pbKey = makePbKey(key);
363 byteStringToKey.put(pbKey, key);
364 requestBuilder.addKey(pbKey);
366 if (forCas) {
367 requestBuilder.setForCas(forCas);
369 return makeAsyncCall("Get", requestBuilder.build(), createRpcResponseHandler(
370 MemcacheGetResponse.getDefaultInstance(), errorText,
371 new Transformer<MemcacheGetResponse, Map<K, V>>() {
372 @Override
373 public Map<K, V> transform(MemcacheGetResponse response) {
374 Map<K, V> result = new HashMap<K, V>();
375 for (MemcacheGetResponse.Item item : response.getItemList()) {
376 K key = byteStringToKey.get(item.getKey());
377 V obj = responseTransfomer.transform(KeyValuePair.of(key, item));
378 result.put(key, obj);
380 return result;
382 }), defaultValue);
386 * Note: non-null oldValue implies Compare-and-Swap operation.
388 private Future<Boolean> doPut(final Object key, IdentifiableValue oldValue, Object value,
389 Expiration expires, MemcacheSetRequest.SetPolicy policy) {
390 MemcacheSetRequest.Builder requestBuilder = MemcacheSetRequest.newBuilder();
391 requestBuilder.setNameSpace(getEffectiveNamespace());
392 MemcacheSetRequest.Item.Builder itemBuilder = MemcacheSetRequest.Item.newBuilder();
393 ValueAndFlags vaf = serializeValue(value);
394 itemBuilder.setValue(ByteString.copyFrom(vaf.value));
395 itemBuilder.setFlags(vaf.flags.ordinal());
396 itemBuilder.setKey(makePbKey(key));
397 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
398 itemBuilder.setSetPolicy(policy);
399 if (policy == MemcacheSetRequest.SetPolicy.CAS) {
400 if (oldValue == null) {
401 throw new IllegalArgumentException("oldValue must not be null.");
403 if (!(oldValue instanceof IdentifiableValueImpl)) {
404 throw new IllegalArgumentException(
405 "oldValue is an instance of an unapproved IdentifiableValue implementation. " +
406 "Perhaps you implemented your own version of IdentifiableValue? " +
407 "If so, don't do this.");
409 itemBuilder.setCasId(((IdentifiableValueImpl) oldValue).getCasId());
411 requestBuilder.addItem(itemBuilder);
412 return makeAsyncCall("Set", requestBuilder.build(), createRpcResponseHandler(
413 MemcacheSetResponse.getDefaultInstance(),
414 String.format("Memcache put: exception setting 1 key (%s) to '%s'", key, value),
415 new Transformer<MemcacheSetResponse, Boolean>() {
416 @Override public Boolean transform(MemcacheSetResponse response) {
417 if (response.getSetStatusCount() != 1) {
418 throw new MemcacheServiceException("Memcache put: Set one item, got "
419 + response.getSetStatusCount() + " response statuses");
421 SetStatusCode status = response.getSetStatus(0);
422 if (status == SetStatusCode.ERROR) {
423 throw new MemcacheServiceException(
424 "Memcache put: Error setting single item (" + key + ")");
426 return status == SetStatusCode.STORED;
428 }), DefaultValueProviders.falseValue());
431 private static MemcacheSetRequest.SetPolicy convertSetPolicyToPb(SetPolicy policy) {
432 switch (policy) {
433 case SET_ALWAYS:
434 return MemcacheSetRequest.SetPolicy.SET;
435 case ADD_ONLY_IF_NOT_PRESENT:
436 return MemcacheSetRequest.SetPolicy.ADD;
437 case REPLACE_ONLY_IF_PRESENT:
438 return MemcacheSetRequest.SetPolicy.REPLACE;
440 throw new IllegalArgumentException("Unknown policy: " + policy);
443 @Override
444 public Future<Boolean> put(Object key, Object value, Expiration expires, SetPolicy policy) {
445 return doPut(key, null, value, expires, convertSetPolicyToPb(policy));
448 @Override
449 public Future<Void> put(Object key, Object value, Expiration expires) {
450 return VoidFutureWrapper.wrap(
451 doPut(key, null, value, expires, MemcacheSetRequest.SetPolicy.SET));
454 @Override
455 public Future<Void> put(Object key, Object value) {
456 return VoidFutureWrapper.wrap(
457 doPut(key, null, value, null, MemcacheSetRequest.SetPolicy.SET));
460 @Override
461 public Future<Boolean> putIfUntouched(Object key, IdentifiableValue oldValue,
462 Object newValue, Expiration expires) {
463 return doPut(key, oldValue, newValue, expires, MemcacheSetRequest.SetPolicy.CAS);
466 @Override
467 public Future<Boolean> putIfUntouched(Object key, IdentifiableValue oldValue, Object newValue) {
468 return doPut(key, oldValue, newValue, null, MemcacheSetRequest.SetPolicy.CAS);
471 @Override
472 public <T> Future<Set<T>> putIfUntouched(Map<T, CasValues> values) {
473 return doPutAll(values, null, MemcacheSetRequest.SetPolicy.CAS, "putIfUntouched");
476 @Override
477 public <T> Future<Set<T>> putIfUntouched(Map<T, CasValues> values, Expiration expiration) {
478 return doPutAll(values, expiration, MemcacheSetRequest.SetPolicy.CAS, "putIfUntouched");
481 private <T> Future<Set<T>> doPutAll(Map<T, ?> values, Expiration expires,
482 MemcacheSetRequest.SetPolicy policy, String operation) {
483 MemcacheSetRequest.Builder requestBuilder = MemcacheSetRequest.newBuilder();
484 requestBuilder.setNameSpace(getEffectiveNamespace());
485 final List<T> requestedKeys = new ArrayList<T>(values.size());
486 for (Map.Entry<T, ?> entry : values.entrySet()) {
487 MemcacheSetRequest.Item.Builder itemBuilder = MemcacheSetRequest.Item.newBuilder();
488 requestedKeys.add(entry.getKey());
489 itemBuilder.setKey(makePbKey(entry.getKey()));
490 ValueAndFlags vaf;
491 if (policy == MemcacheSetRequest.SetPolicy.CAS) {
492 CasValues value = (CasValues) entry.getValue();
493 if (value == null) {
494 throw new IllegalArgumentException(entry.getKey() + " has a null for CasValues");
496 vaf = serializeValue(value.getNewValue());
497 if (!(value.getOldValue() instanceof IdentifiableValueImpl)) {
498 throw new IllegalArgumentException(
499 entry.getKey() + " CasValues has an oldValue instance of an unapproved " +
500 "IdentifiableValue implementation. Perhaps you implemented your own " +
501 "version of IdentifiableValue? If so, don't do this.");
503 itemBuilder.setCasId(((IdentifiableValueImpl) value.getOldValue()).getCasId());
504 if (value.getExipration() != null) {
505 itemBuilder.setExpirationTime(value.getExipration().getSecondsValue());
506 } else {
507 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
509 } else {
510 vaf = serializeValue(entry.getValue());
511 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
513 itemBuilder.setValue(ByteString.copyFrom(vaf.value));
514 itemBuilder.setFlags(vaf.flags.ordinal());
515 itemBuilder.setSetPolicy(policy);
516 requestBuilder.addItem(itemBuilder);
518 return makeAsyncCall("Set", requestBuilder.build(), createRpcResponseHandler(
519 MemcacheSetResponse.getDefaultInstance(),
520 "Memcache " + operation + ": Unknown exception setting " + values.size() + " keys",
521 new Transformer<MemcacheSetResponse, Set<T>>() {
522 @Override public Set<T> transform(MemcacheSetResponse response) {
523 if (response.getSetStatusCount() != requestedKeys.size()) {
524 throw new MemcacheServiceException(String.format(
525 "Memcache put: Set %d items, got %d response statuses",
526 requestedKeys.size(),
527 response.getSetStatusCount()));
529 HashSet<T> result = new HashSet<T>();
530 HashSet<T> errors = new HashSet<T>();
531 Iterator<SetStatusCode> statusIter = response.getSetStatusList().iterator();
532 for (T requestedKey : requestedKeys) {
533 SetStatusCode status = statusIter.next();
534 if (status == MemcacheSetResponse.SetStatusCode.ERROR) {
535 errors.add(requestedKey);
536 } else if (status == MemcacheSetResponse.SetStatusCode.STORED) {
537 result.add(requestedKey);
540 if (!errors.isEmpty()) {
541 StringBuilder builder = new StringBuilder("Memcache put: Set failed to set ");
542 builder.append(errors.size()).append(" keys: ");
543 for (Object err : errors) {
544 builder.append(err).append(", ");
546 builder.setLength(builder.length() - 2);
547 throw new MemcacheServiceException(builder.toString());
549 return result;
551 }), DefaultValueProviders.<T>emptySet());
554 @Override
555 public <T> Future<Set<T>> putAll(Map<T, ?> values, Expiration expires, SetPolicy policy) {
556 return doPutAll(values, expires, convertSetPolicyToPb(policy), "putAll");
559 @Override
560 public Future<Void> putAll(Map<?, ?> values, Expiration expires) {
561 return VoidFutureWrapper.wrap(
562 doPutAll(values, expires, MemcacheSetRequest.SetPolicy.SET, "putAll"));
565 @Override
566 public Future<Void> putAll(Map<?, ?> values) {
567 return VoidFutureWrapper.wrap(
568 doPutAll(values, null, MemcacheSetRequest.SetPolicy.SET, "putAll"));
571 @Override
572 public Future<Boolean> delete(Object key) {
573 return delete(key, 0);
576 @Override
577 public Future<Boolean> delete(Object key, long millisNoReAdd) {
578 MemcacheDeleteRequest request = MemcacheDeleteRequest.newBuilder()
579 .setNameSpace(getEffectiveNamespace())
580 .addItem(MemcacheDeleteRequest.Item.newBuilder()
581 .setKey(makePbKey(key))
582 .setDeleteTime((int) TimeUnit.SECONDS.convert(millisNoReAdd, TimeUnit.MILLISECONDS)))
583 .build();
584 return makeAsyncCall("Delete", request, createRpcResponseHandler(
585 MemcacheDeleteResponse.getDefaultInstance(),
586 "Memcache delete: Unknown exception deleting key: " + key,
587 new Transformer<MemcacheDeleteResponse, Boolean>() {
588 @Override public Boolean transform(MemcacheDeleteResponse response) {
589 return response.getDeleteStatus(0) == DeleteStatusCode.DELETED;
591 }), DefaultValueProviders.falseValue());
594 @Override
595 public <T> Future<Set<T>> deleteAll(Collection<T> keys) {
596 return deleteAll(keys, 0);
599 @Override
600 public <T> Future<Set<T>> deleteAll(Collection<T> keys, long millisNoReAdd) {
601 MemcacheDeleteRequest.Builder requestBuilder =
602 MemcacheDeleteRequest.newBuilder().setNameSpace(getEffectiveNamespace());
603 final List<T> requestedKeys = new ArrayList<T>(keys.size());
604 for (T key : keys) {
605 requestedKeys.add(key);
606 requestBuilder.addItem(MemcacheDeleteRequest.Item.newBuilder()
607 .setDeleteTime((int) (millisNoReAdd / 1000))
608 .setKey(makePbKey(key)));
610 return makeAsyncCall("Delete", requestBuilder.build(), createRpcResponseHandler(
611 MemcacheDeleteResponse.getDefaultInstance(),
612 "Memcache deleteAll: Unknown exception deleting multiple keys",
613 new Transformer<MemcacheDeleteResponse, Set<T>>() {
614 @Override public Set<T> transform(MemcacheDeleteResponse response) {
615 Set<T> retval = new LinkedHashSet<T>();
616 Iterator<T> requestedKeysIter = requestedKeys.iterator();
617 for (DeleteStatusCode deleteStatus : response.getDeleteStatusList()) {
618 T requestedKey = requestedKeysIter.next();
619 if (deleteStatus == DeleteStatusCode.DELETED) {
620 retval.add(requestedKey);
623 return retval;
625 }), DefaultValueProviders.<T>emptySet());
628 private static MemcacheIncrementRequest.Builder newIncrementRequestBuilder(
629 Object key, long delta, Long initialValue) {
630 MemcacheIncrementRequest.Builder requestBuilder = MemcacheIncrementRequest.newBuilder();
631 requestBuilder.setKey(makePbKey(key));
632 if (delta > 0) {
633 requestBuilder.setDirection(Direction.INCREMENT);
634 requestBuilder.setDelta(delta);
635 } else {
636 requestBuilder.setDirection(Direction.DECREMENT);
637 requestBuilder.setDelta(-delta);
639 if (initialValue != null) {
640 requestBuilder.setInitialValue(initialValue);
641 requestBuilder.setInitialFlags(MemcacheSerialization.Flag.LONG.ordinal());
643 return requestBuilder;
646 @Override
647 public Future<Long> increment(Object key, long delta) {
648 return increment(key, delta, null);
651 @Override
652 public Future<Long> increment(final Object key, long delta, Long initialValue) {
653 MemcacheIncrementRequest request = newIncrementRequestBuilder(key, delta, initialValue)
654 .setNameSpace(getEffectiveNamespace())
655 .build();
656 return makeAsyncCall("Increment", request,
657 new RpcResponseHandler<MemcacheIncrementResponse, Long>(
658 MemcacheIncrementResponse.getDefaultInstance(),
659 "Memcache increment: exception when incrementing key '" + key + "'",
660 new Transformer<MemcacheIncrementResponse, Long>() {
661 @Override public Long transform(MemcacheIncrementResponse response) {
662 return response.hasNewValue() ? response.getNewValue() : null;
664 }, getErrorHandler()) {
665 @Override void handleApiProxyException(Throwable cause) throws Exception {
666 if (cause instanceof ApiProxy.ApplicationException) {
667 ApiProxy.ApplicationException applicationException =
668 (ApiProxy.ApplicationException) cause;
669 getLogger().info(applicationException.getErrorDetail());
670 if (applicationException.getApplicationError() ==
671 MemcacheServiceError.ErrorCode.INVALID_VALUE.ordinal()) {
672 throw new InvalidValueException("Non-incrementable value for key '" + key + "'");
675 super.handleApiProxyException(cause);
677 }, DefaultValueProviders.<Long>nullValue());
680 @Override
681 public <T> Future<Map<T, Long>> incrementAll(Collection<T> keys, long delta) {
682 return incrementAll(keys, delta, null);
685 @Override
686 public <T> Future<Map<T, Long>> incrementAll(Collection<T> keys, long delta, Long initialValue) {
687 return incrementAll(makeMap(keys, delta), initialValue);
690 @Override
691 public <T> Future<Map<T, Long>> incrementAll(Map<T, Long> offsets) {
692 return incrementAll(offsets, null);
695 @Override
696 public <T> Future<Map<T, Long>> incrementAll(Map<T, Long> offsets, Long initialValue) {
697 MemcacheBatchIncrementRequest.Builder requestBuilder =
698 MemcacheBatchIncrementRequest.newBuilder().setNameSpace(getEffectiveNamespace());
699 final List<T> requestedKeys = new ArrayList<T>(offsets.size());
700 for (Map.Entry<T, Long> entry : offsets.entrySet()) {
701 requestedKeys.add(entry.getKey());
702 requestBuilder.addItem(
703 newIncrementRequestBuilder(entry.getKey(), entry.getValue(), initialValue));
705 return makeAsyncCall("BatchIncrement", requestBuilder.build(), createRpcResponseHandler(
706 MemcacheBatchIncrementResponse.getDefaultInstance(),
707 "Memcache incrmentAll: exception incrementing multiple keys",
708 new Transformer<MemcacheBatchIncrementResponse, Map<T, Long>>() {
709 @Override public Map<T, Long> transform(MemcacheBatchIncrementResponse response) {
710 Map<T, Long> result = new LinkedHashMap<T, Long>(requestedKeys.size(), 1);
711 Iterator<MemcacheIncrementResponse> items = response.getItemList().iterator();
712 for (T requestedKey : requestedKeys) {
713 MemcacheIncrementResponse item = items.next();
714 if (item.getIncrementStatus().equals(IncrementStatusCode.OK) && item.hasNewValue()) {
715 result.put(requestedKey, item.getNewValue());
716 } else {
717 result.put(requestedKey, null);
720 return result;
723 new Provider<Map<T, Long>>() {
724 @Override public Map<T, Long> get() {
725 return makeMap(requestedKeys, null);
730 @Override
731 public Future<Void> clearAll() {
732 return makeAsyncCall("FlushAll", MemcacheFlushRequest.getDefaultInstance(),
733 createRpcResponseHandler(MemcacheFlushResponse.getDefaultInstance(),
734 "Memcache clearAll: exception",
735 new Transformer<MemcacheFlushResponse, Void>() {
736 @Override public Void transform(MemcacheFlushResponse response) {
737 return null;
739 }), DefaultValueProviders.<Void>nullValue());
742 @Override
743 public Future<Stats> getStatistics() {
744 return makeAsyncCall("Stats", MemcacheStatsRequest.getDefaultInstance(),
745 createRpcResponseHandler(MemcacheStatsResponse.getDefaultInstance(),
746 "Memcache getStatistics: exception",
747 new Transformer<MemcacheStatsResponse, Stats>() {
748 @Override public Stats transform(MemcacheStatsResponse response) {
749 return new StatsImpl(response.getStats());
751 }), DefaultValueProviders.emptyStats());