Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / memcache / AsyncMemcacheServiceImpl.java
blob5e0fd9ddcbac69226e0ecf24d6fcac2722293255
1 // Copyright 2011 Google Inc. All rights reserved
3 package com.google.appengine.api.memcache;
5 import static com.google.appengine.api.memcache.MemcacheServiceApiHelper.makeAsyncCall;
7 import com.google.appengine.api.NamespaceManager;
8 import com.google.appengine.api.memcache.MemcacheSerialization.ValueAndFlags;
9 import com.google.appengine.api.memcache.MemcacheService.CasValues;
10 import com.google.appengine.api.memcache.MemcacheService.IdentifiableValue;
11 import com.google.appengine.api.memcache.MemcacheService.SetPolicy;
12 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.Provider;
13 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.RpcResponseHandler;
14 import com.google.appengine.api.memcache.MemcacheServiceApiHelper.Transformer;
15 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheBatchIncrementRequest;
16 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheBatchIncrementResponse;
17 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteRequest;
18 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteResponse;
19 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheDeleteResponse.DeleteStatusCode;
20 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheFlushRequest;
21 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheFlushResponse;
22 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetRequest;
23 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheGetResponse;
24 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementRequest;
25 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementRequest.Direction;
26 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementResponse;
27 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheIncrementResponse.IncrementStatusCode;
28 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheServiceError;
29 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetRequest;
30 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetResponse;
31 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheSetResponse.SetStatusCode;
32 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheStatsRequest;
33 import com.google.appengine.api.memcache.MemcacheServicePb.MemcacheStatsResponse;
34 import com.google.appengine.api.memcache.MemcacheServicePb.MergedNamespaceStats;
35 import com.google.appengine.api.utils.FutureWrapper;
36 import com.google.apphosting.api.ApiProxy;
37 import com.google.common.base.Joiner;
38 import com.google.common.base.StringUtil;
39 import com.google.common.primitives.Bytes;
40 import com.google.protobuf.ByteString;
41 import com.google.protobuf.Message;
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.Arrays;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.Iterator;
50 import java.util.LinkedHashMap;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Objects;
55 import java.util.Set;
56 import java.util.concurrent.Future;
57 import java.util.concurrent.TimeUnit;
59 /**
60 * Java bindings for the AsyncMemcache service.
63 class AsyncMemcacheServiceImpl extends BaseMemcacheServiceImpl implements AsyncMemcacheService {
65 static class StatsImpl implements Stats {
66 private final long hits, misses, bytesFetched, items, bytesStored;
67 private final int maxCachedTime;
69 StatsImpl(MergedNamespaceStats stats) {
70 if (stats != null) {
71 hits = stats.getHits();
72 misses = stats.getMisses();
73 bytesFetched = stats.getByteHits();
74 items = stats.getItems();
75 bytesStored = stats.getBytes();
76 maxCachedTime = stats.getOldestItemAge();
77 } else {
78 hits = misses = bytesFetched = items = bytesStored = 0;
79 maxCachedTime = 0;
83 @Override
84 public long getHitCount() {
85 return hits;
88 @Override
89 public long getMissCount() {
90 return misses;
93 @Override
94 public long getBytesReturnedForHits() {
95 return bytesFetched;
98 @Override
99 public long getItemCount() {
100 return items;
103 @Override
104 public long getTotalItemBytes() {
105 return bytesStored;
108 @Override
109 public int getMaxTimeWithoutAccess() {
110 return maxCachedTime;
113 @Override
114 public String toString() {
115 StringBuilder builder = new StringBuilder();
116 builder.append("Hits: ").append(hits).append('\n');
117 builder.append("Misses: ").append(misses).append('\n');
118 builder.append("Bytes Fetched: ").append(bytesFetched).append('\n');
119 builder.append("Bytes Stored: ").append(bytesStored).append('\n');
120 builder.append("Items: ").append(items).append('\n');
121 builder.append("Max Cached Time: ").append(maxCachedTime).append('\n');
122 return builder.toString();
126 static final class IdentifiableValueImpl implements IdentifiableValue {
127 private final Object value;
128 private final long casId;
130 IdentifiableValueImpl(Object value, long casId) {
131 this.value = value;
132 this.casId = casId;
135 @Override
136 public Object getValue() {
137 return value;
140 long getCasId() {
141 return casId;
144 @Override
145 public boolean equals(Object otherObj) {
146 if (this == otherObj) {
147 return true;
149 if ((otherObj == null) || (getClass() != otherObj.getClass())) {
150 return false;
152 IdentifiableValueImpl otherIdentifiableValue = (IdentifiableValueImpl) otherObj;
153 return Objects.equals(value, otherIdentifiableValue.value) &&
154 (casId == otherIdentifiableValue.casId);
157 @Override
158 public int hashCode() {
159 return Objects.hash(value, casId);
163 private static class DefaultValueProviders {
165 @SuppressWarnings("rawtypes")
166 private static final Provider NULL_PROVIDER = new Provider() {
167 @Override public Object get() {
168 return null;
172 private static final Provider<Boolean> FALSE_PROVIDER = new Provider<Boolean>() {
173 @Override public Boolean get() {
174 return Boolean.FALSE;
178 @SuppressWarnings("rawtypes")
179 private static final Provider SET_PROVIDER = new Provider<Set<?>>() {
180 @Override public Set<?> get() {
181 return new HashSet(0, 1);
185 @SuppressWarnings("rawtypes")
186 private static final Provider MAP_PROVIDER = new Provider<Map<?, ?>>() {
187 @Override public Map<?, ?> get() {
188 return new HashMap(0, 1);
192 private static final Provider<Stats> STATS_PROVIDER = new Provider<Stats>() {
193 final Stats emptyStats = new AsyncMemcacheServiceImpl.StatsImpl(null);
195 @Override public Stats get() {
196 return emptyStats;
200 static Provider<Boolean> falseValue() {
201 return FALSE_PROVIDER;
204 @SuppressWarnings("unchecked")
205 static <T> Provider<T> nullValue() {
206 return NULL_PROVIDER;
209 @SuppressWarnings("unchecked")
210 static <T> Provider<Set<T>> emptySet() {
211 return SET_PROVIDER;
214 @SuppressWarnings("unchecked")
215 static <K, V> Provider<Map<K, V>> emptyMap() {
216 return MAP_PROVIDER;
219 static Provider<Stats> emptyStats() {
220 return STATS_PROVIDER;
224 private static class VoidFutureWrapper<K> extends FutureWrapper<K, Void> {
226 private VoidFutureWrapper(Future<K> parent) {
227 super(parent);
230 private static <K> Future<Void> wrap(Future<K> parent) {
231 return new VoidFutureWrapper<K>(parent);
234 @Override
235 protected Void wrap(K value) {
236 return null;
239 @Override
240 protected Throwable convertException(Throwable cause) {
241 return cause;
245 private static final class KeyValuePair<K, V> {
246 private final K key;
247 private final V value;
249 private KeyValuePair(K key, V value) {
250 this.key = key;
251 this.value = value;
254 static <K, V> KeyValuePair<K, V> of(K key, V value) {
255 return new KeyValuePair<K, V>(key, value);
259 AsyncMemcacheServiceImpl(String namespace) {
260 super(namespace);
263 static <T, V> Map<T, V> makeMap(Collection<T> keys, V value) {
264 Map<T, V> map = new LinkedHashMap<T, V>(keys.size(), 1);
265 for (T key : keys) {
266 map.put(key, value);
268 return map;
271 private static ByteString makePbKey(Object key) {
272 try {
273 return ByteString.copyFrom(MemcacheSerialization.makePbKey(key));
274 } catch (IOException ex) {
275 throw new IllegalArgumentException("Cannot use as a key: '" + key + "'", ex);
279 private static ValueAndFlags serializeValue(Object value) {
280 try {
281 return MemcacheSerialization.serialize(value);
282 } catch (IOException ex) {
283 throw new IllegalArgumentException("Cannot use as value: '" + value + "'", ex);
287 private Object deserializeItem(Object key, MemcacheGetResponse.Item item) {
288 try {
289 return MemcacheSerialization.deserialize(item.getValue().toByteArray(), item.getFlags());
290 } catch (ClassNotFoundException ex) {
291 getErrorHandler().handleDeserializationError(
292 new InvalidValueException("Can't find class for value of key '" + key + "'", ex));
293 } catch (IOException ex) {
294 getErrorHandler().handleDeserializationError(
295 new InvalidValueException("Failed to parse the value for '" + key + "'", ex));
297 return null;
300 private <M extends Message, T> RpcResponseHandler<M, T> createRpcResponseHandler(
301 M response, String errorText, Transformer<M, T> responseTransformer) {
302 return new RpcResponseHandler<M, T>(
303 response, errorText, responseTransformer, getErrorHandler());
306 private <T> RpcResponseHandlerForPut<T> createRpcResponseHandlerForPut(
307 Iterable<MemcacheSetRequest.Item.Builder> itemBuilders, String namespace,
308 MemcacheSetResponse response, String errorText,
309 Transformer<MemcacheSetResponse, T> responseTransformer) {
310 return new RpcResponseHandlerForPut<T>(
311 itemBuilders, namespace, response, errorText, responseTransformer);
314 private class RpcResponseHandlerForPut<T> extends RpcResponseHandler<MemcacheSetResponse, T> {
317 * We remember what was sent to the backend, so that if we get an error response we can test to
318 * see if the error was caused by the API being given invalid values.
320 private final Iterable<MemcacheSetRequest.Item.Builder> itemsSentToBackend;
321 private final String namespace;
323 RpcResponseHandlerForPut(Iterable<MemcacheSetRequest.Item.Builder> itemsSentToBackend,
324 String namespace,
325 MemcacheSetResponse response, String errorText,
326 Transformer<MemcacheSetResponse, T> responseTransfomer) {
327 super(response, errorText, responseTransfomer, getErrorHandler());
328 this.itemsSentToBackend = itemsSentToBackend;
329 this.namespace = namespace;
333 * When we get an error from the backend we check to see if it could possibly have been caused
334 * by an invalid key or value being passed into the API from the app. If so we test the original
335 * key and value passed in and if we find them to be invalid we throw an exception that is more
336 * informative to the app writer than the default exception.
338 @Override
339 void handleApiProxyException(Throwable cause) throws Exception {
340 if (cause instanceof ApiProxy.ApplicationException) {
341 ApiProxy.ApplicationException applicationException = (ApiProxy.ApplicationException) cause;
342 if (applicationException.getApplicationError()
343 == MemcacheServiceError.ErrorCode.UNSPECIFIED_ERROR_VALUE) {
344 handleApiProxyException(applicationException.getErrorDetail());
346 } else if (cause instanceof MemcacheServiceException){
347 handleApiProxyException(cause.getMessage());
349 super.handleApiProxyException(cause);
352 private void handleApiProxyException(String msg) throws MemcacheServiceException {
353 for (MemcacheSetRequest.Item.Builder itemBuilder : itemsSentToBackend) {
354 ByteString pbKey = itemBuilder.getKey();
355 ByteString value = itemBuilder.getValue();
356 if (value.size() + pbKey.size() > ITEM_SIZE_LIMIT) {
357 maybeThrow("Key+value is bigger than maximum allowed. " + msg);
359 if (Bytes.contains(pbKey.toByteArray(), (byte) 0)) {
360 maybeThrow("Key contains embedded null byte. " + msg);
362 if (namespace != null) {
363 try {
364 NamespaceManager.validateNamespace(namespace);
365 } catch (IllegalArgumentException ex) {
366 maybeThrow(ex.toString());
372 /** Will throw exception or just log it, depending on what error handler is set.*/
373 private void maybeThrow(String msg) {
374 getErrorHandler().handleServiceError(new MemcacheServiceException(msg));
379 * Matches limit documented in
380 * https://developers.google.com/appengine/docs/python/memcache/#Python_Limits
382 * TODO(user) This is too conservative; need to revisit. The docs are conflating item size
383 * limit with total cache size accounting.
385 static final int ITEM_SIZE_LIMIT = 1024 * 1024 - 96;
387 @Override
388 public Future<Boolean> contains(Object key) {
389 return doGet(key, false, "Memcache contains: exception testing contains (" + key + ")",
390 new Transformer<MemcacheGetResponse, Boolean>() {
391 @Override public Boolean transform(MemcacheGetResponse response) {
392 return response.getItemCount() > 0;
394 }, DefaultValueProviders.falseValue());
397 private <T> Future<T> doGet(Object key, boolean forCas, String errorText,
398 final Transformer<MemcacheGetResponse, T> responseTransfomer, Provider<T> defaultValue) {
399 MemcacheGetRequest.Builder requestBuilder = MemcacheGetRequest.newBuilder();
400 requestBuilder.addKey(makePbKey(key));
401 requestBuilder.setNameSpace(getEffectiveNamespace());
402 if (forCas) {
403 requestBuilder.setForCas(true);
405 return makeAsyncCall("Get", requestBuilder.build(),
406 createRpcResponseHandler(MemcacheGetResponse.getDefaultInstance(), errorText,
407 responseTransfomer), defaultValue);
410 @Override
411 public Future<Object> get(final Object key) {
412 return doGet(key, false, "Memcache get: exception getting 1 key (" + key + ")",
413 new Transformer<MemcacheGetResponse, Object>() {
414 @Override public Object transform(MemcacheGetResponse response) {
415 return response.getItemCount() == 0 ? null : deserializeItem(key, response.getItem(0));
417 }, DefaultValueProviders.nullValue());
420 @Override
421 public Future<IdentifiableValue> getIdentifiable(final Object key) {
422 return doGet(key, true, "Memcache getIdentifiable: exception getting 1 key (" + key + ")",
423 new Transformer<MemcacheGetResponse, IdentifiableValue>() {
424 @Override public IdentifiableValue transform(MemcacheGetResponse response) {
425 if (response.getItemCount() == 0) {
426 return null;
428 MemcacheGetResponse.Item item = response.getItem(0);
429 return new IdentifiableValueImpl(deserializeItem(key, item), item.getCasId());
431 }, DefaultValueProviders.<IdentifiableValue>nullValue());
434 @Override
435 public <K> Future<Map<K, IdentifiableValue>> getIdentifiables(Collection<K> keys) {
436 return doGetAll(keys, true,
437 "Memcache getIdentifiables: exception getting multiple keys",
438 new Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, IdentifiableValue>() {
439 @Override
440 public IdentifiableValue transform(KeyValuePair<K, MemcacheGetResponse.Item> pair) {
441 MemcacheGetResponse.Item item = pair.value;
442 Object value = deserializeItem(pair.key, item);
443 return new IdentifiableValueImpl(value, item.getCasId());
445 }, DefaultValueProviders.<K, IdentifiableValue>emptyMap());
448 @Override
449 public <K> Future<Map<K, Object>> getAll(Collection<K> keys) {
450 return doGetAll(keys, false,
451 "Memcache getAll: exception getting multiple keys",
452 new Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, Object>() {
453 @Override
454 public Object transform(KeyValuePair<K, MemcacheGetResponse.Item> pair) {
455 return deserializeItem(pair.key, pair.value);
457 }, DefaultValueProviders.<K, Object>emptyMap());
460 private <K, V> Future<Map<K, V>> doGetAll(Collection<K> keys, boolean forCas,
461 String errorText,
462 final Transformer<KeyValuePair<K, MemcacheGetResponse.Item>, V> responseTransfomer,
463 Provider<Map<K, V>> defaultValue) {
464 MemcacheGetRequest.Builder requestBuilder = MemcacheGetRequest.newBuilder();
465 requestBuilder.setNameSpace(getEffectiveNamespace());
466 final Map<ByteString, K> byteStringToKey = new HashMap<ByteString, K>(keys.size(), 1);
467 for (K key : keys) {
468 ByteString pbKey = makePbKey(key);
469 byteStringToKey.put(pbKey, key);
470 requestBuilder.addKey(pbKey);
472 if (forCas) {
473 requestBuilder.setForCas(forCas);
475 return makeAsyncCall("Get", requestBuilder.build(), createRpcResponseHandler(
476 MemcacheGetResponse.getDefaultInstance(), errorText,
477 new Transformer<MemcacheGetResponse, Map<K, V>>() {
478 @Override
479 public Map<K, V> transform(MemcacheGetResponse response) {
480 Map<K, V> result = new HashMap<K, V>();
481 for (MemcacheGetResponse.Item item : response.getItemList()) {
482 K key = byteStringToKey.get(item.getKey());
483 V obj = responseTransfomer.transform(KeyValuePair.of(key, item));
484 result.put(key, obj);
486 return result;
488 }), defaultValue);
491 /** Use this to make sure we don't write arbitrarily big log messages. */
492 private static final int MAX_LOGGED_VALUE_SIZE = 100;
495 * Represents the overhead to an item as it is stored in memcacheg
496 * if it has a value bigger than 65535 bytes and all the optional fields are set.
497 * This number was determined by fiddling with cacheserving/memcacheg/server:item_test
498 * and analyzing CalculateItemSize from cacheserving/memcacheg/server/item.cc.
500 * <p>The overhead can be between 61 and 73 bytes depending on whether optional
501 * fields (flag, expiration and CAS) are set or not, adding 4 bytes for each of
502 * them.
504 private static final int MEMCACHEG_OVERHEAD = 73;
505 private static final int ONE_MEGABYTE = 1024 * 1024;
506 public static final int MAX_ITEM_SIZE = ONE_MEGABYTE - MEMCACHEG_OVERHEAD;
509 * Note: non-null oldValue implies Compare-and-Swap operation.
511 private Future<Boolean> doPut(final Object key, IdentifiableValue oldValue, Object value,
512 Expiration expires, MemcacheSetRequest.SetPolicy policy) {
513 MemcacheSetRequest.Builder requestBuilder = MemcacheSetRequest.newBuilder();
514 requestBuilder.setNameSpace(getEffectiveNamespace());
515 MemcacheSetRequest.Item.Builder itemBuilder = MemcacheSetRequest.Item.newBuilder();
516 ValueAndFlags vaf = serializeValue(value);
517 itemBuilder.setValue(ByteString.copyFrom(vaf.value));
518 itemBuilder.setFlags(vaf.flags.ordinal());
519 itemBuilder.setKey(makePbKey(key));
520 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
521 itemBuilder.setSetPolicy(policy);
522 if (policy == MemcacheSetRequest.SetPolicy.CAS) {
523 if (oldValue == null) {
524 throw new IllegalArgumentException("oldValue must not be null.");
526 if (!(oldValue instanceof IdentifiableValueImpl)) {
527 throw new IllegalArgumentException(
528 "oldValue is an instance of an unapproved IdentifiableValue implementation. " +
529 "Perhaps you implemented your own version of IdentifiableValue? " +
530 "If so, don't do this.");
532 itemBuilder.setCasId(((IdentifiableValueImpl) oldValue).getCasId());
534 final int itemSize = itemBuilder.getKey().size() + itemBuilder.getValue().size();
535 requestBuilder.addItem(itemBuilder);
537 String valueAsString =
538 StringUtil.truncateAtMaxLength(String.valueOf(value), MAX_LOGGED_VALUE_SIZE, true);
539 return makeAsyncCall("Set", requestBuilder.build(), createRpcResponseHandlerForPut(
540 Arrays.asList(itemBuilder),
541 requestBuilder.getNameSpace(),
542 MemcacheSetResponse.getDefaultInstance(),
543 String.format("Memcache put: exception setting 1 key (%s) to '%s'", key, valueAsString),
544 new Transformer<MemcacheSetResponse, Boolean>() {
545 @Override public Boolean transform(MemcacheSetResponse response) {
546 if (response.getSetStatusCount() != 1) {
547 throw new MemcacheServiceException("Memcache put: Set one item, got "
548 + response.getSetStatusCount() + " response statuses");
550 SetStatusCode status = response.getSetStatus(0);
551 if (status == SetStatusCode.ERROR) {
552 if (itemSize > MAX_ITEM_SIZE) {
553 throw new MemcacheServiceException(
554 String.format("Memcache put: Item may not be more than %d bytes in length; "
555 + "received %d bytes.", MAX_ITEM_SIZE, itemSize));
557 throw new MemcacheServiceException(
558 "Memcache put: Error setting single item (" + key + ")");
560 return status == SetStatusCode.STORED;
562 }), DefaultValueProviders.falseValue());
565 private static MemcacheSetRequest.SetPolicy convertSetPolicyToPb(SetPolicy policy) {
566 switch (policy) {
567 case SET_ALWAYS:
568 return MemcacheSetRequest.SetPolicy.SET;
569 case ADD_ONLY_IF_NOT_PRESENT:
570 return MemcacheSetRequest.SetPolicy.ADD;
571 case REPLACE_ONLY_IF_PRESENT:
572 return MemcacheSetRequest.SetPolicy.REPLACE;
574 throw new IllegalArgumentException("Unknown policy: " + policy);
577 @Override
578 public Future<Boolean> put(Object key, Object value, Expiration expires, SetPolicy policy) {
579 return doPut(key, null, value, expires, convertSetPolicyToPb(policy));
582 @Override
583 public Future<Void> put(Object key, Object value, Expiration expires) {
584 return VoidFutureWrapper.wrap(
585 doPut(key, null, value, expires, MemcacheSetRequest.SetPolicy.SET));
588 @Override
589 public Future<Void> put(Object key, Object value) {
590 return VoidFutureWrapper.wrap(
591 doPut(key, null, value, null, MemcacheSetRequest.SetPolicy.SET));
594 @Override
595 public Future<Boolean> putIfUntouched(Object key, IdentifiableValue oldValue,
596 Object newValue, Expiration expires) {
597 return doPut(key, oldValue, newValue, expires, MemcacheSetRequest.SetPolicy.CAS);
600 @Override
601 public Future<Boolean> putIfUntouched(Object key, IdentifiableValue oldValue, Object newValue) {
602 return doPut(key, oldValue, newValue, null, MemcacheSetRequest.SetPolicy.CAS);
605 @Override
606 public <T> Future<Set<T>> putIfUntouched(Map<T, CasValues> values) {
607 return doPutAll(values, null, MemcacheSetRequest.SetPolicy.CAS, "putIfUntouched");
610 @Override
611 public <T> Future<Set<T>> putIfUntouched(Map<T, CasValues> values, Expiration expiration) {
612 return doPutAll(values, expiration, MemcacheSetRequest.SetPolicy.CAS, "putIfUntouched");
615 private <T> Future<Set<T>> doPutAll(Map<T, ?> values, Expiration expires,
616 MemcacheSetRequest.SetPolicy policy, String operation) {
617 MemcacheSetRequest.Builder requestBuilder = MemcacheSetRequest.newBuilder();
618 requestBuilder.setNameSpace(getEffectiveNamespace());
620 final List<T> requestedKeys = new ArrayList<T>(values.size());
621 Set<Integer> oversized = new HashSet<>();
622 int itemIndex = 0;
624 for (Map.Entry<T, ?> entry : values.entrySet()) {
625 MemcacheSetRequest.Item.Builder itemBuilder = MemcacheSetRequest.Item.newBuilder();
626 requestedKeys.add(entry.getKey());
627 itemBuilder.setKey(makePbKey(entry.getKey()));
628 ValueAndFlags vaf;
629 if (policy == MemcacheSetRequest.SetPolicy.CAS) {
630 CasValues value = (CasValues) entry.getValue();
631 if (value == null) {
632 throw new IllegalArgumentException(entry.getKey() + " has a null for CasValues");
634 vaf = serializeValue(value.getNewValue());
635 if (!(value.getOldValue() instanceof IdentifiableValueImpl)) {
636 throw new IllegalArgumentException(
637 entry.getKey() + " CasValues has an oldValue instance of an unapproved " +
638 "IdentifiableValue implementation. Perhaps you implemented your own " +
639 "version of IdentifiableValue? If so, don't do this.");
641 itemBuilder.setCasId(((IdentifiableValueImpl) value.getOldValue()).getCasId());
642 if (value.getExipration() != null) {
643 itemBuilder.setExpirationTime(value.getExipration().getSecondsValue());
644 } else {
645 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
647 } else {
648 vaf = serializeValue(entry.getValue());
649 itemBuilder.setExpirationTime(expires == null ? 0 : expires.getSecondsValue());
651 itemBuilder.setValue(ByteString.copyFrom(vaf.value));
652 itemBuilder.setFlags(vaf.flags.ordinal());
653 itemBuilder.setSetPolicy(policy);
654 requestBuilder.addItem(itemBuilder);
656 int itemSize = itemBuilder.getKey().size() + itemBuilder.getValue().size();
657 if (itemSize > MAX_ITEM_SIZE) {
658 oversized.add(itemIndex);
660 itemIndex++;
663 final Set<Integer> oversizedFinal = new HashSet(oversized);
665 return makeAsyncCall("Set", requestBuilder.build(), createRpcResponseHandlerForPut(
666 requestBuilder.getItemBuilderList(),
667 requestBuilder.getNameSpace(),
668 MemcacheSetResponse.getDefaultInstance(),
669 "Memcache " + operation + ": Unknown exception setting " + values.size() + " keys",
670 new Transformer<MemcacheSetResponse, Set<T>>() {
671 @Override public Set<T> transform(MemcacheSetResponse response) {
672 if (response.getSetStatusCount() != requestedKeys.size()) {
673 throw new MemcacheServiceException(String.format(
674 "Memcache put: Set %d items, got %d response statuses",
675 requestedKeys.size(),
676 response.getSetStatusCount()));
678 HashSet<T> result = new HashSet<>();
679 HashSet<T> sizeErrors = new HashSet<>();
680 HashSet<T> otherErrors = new HashSet<>();
681 Iterator<SetStatusCode> statusIter = response.getSetStatusList().iterator();
682 int itemIndex = 0;
684 for (T requestedKey : requestedKeys) {
685 SetStatusCode status = statusIter.next();
686 if (status == MemcacheSetResponse.SetStatusCode.ERROR) {
687 if (oversizedFinal.contains(itemIndex)) {
688 sizeErrors.add(requestedKey);
689 } else {
690 otherErrors.add(requestedKey);
692 } else if (status == MemcacheSetResponse.SetStatusCode.STORED) {
693 result.add(requestedKey);
695 itemIndex++;
697 if (!sizeErrors.isEmpty() || !otherErrors.isEmpty()) {
698 StringBuilder builder = new StringBuilder("Memcache put: ");
699 if (!sizeErrors.isEmpty()) {
700 builder.append(sizeErrors.size());
701 builder.append(" items failed for exceeding ");
702 builder.append(MAX_ITEM_SIZE).append(" bytes; keys: ");
703 builder.append(Joiner.on(", ").join(sizeErrors));
704 builder.append(". ");
706 if (!otherErrors.isEmpty()) {
707 builder.append("Set failed to set ");
708 builder.append(otherErrors.size()).append(" keys: ");
709 builder.append(Joiner.on(", ").join(otherErrors));
711 throw new MemcacheServiceException(builder.toString());
713 return result;
715 }), DefaultValueProviders.<T>emptySet());
718 @Override
719 public <T> Future<Set<T>> putAll(Map<T, ?> values, Expiration expires, SetPolicy policy) {
720 return doPutAll(values, expires, convertSetPolicyToPb(policy), "putAll");
723 @Override
724 public Future<Void> putAll(Map<?, ?> values, Expiration expires) {
725 return VoidFutureWrapper.wrap(
726 doPutAll(values, expires, MemcacheSetRequest.SetPolicy.SET, "putAll"));
729 @Override
730 public Future<Void> putAll(Map<?, ?> values) {
731 return VoidFutureWrapper.wrap(
732 doPutAll(values, null, MemcacheSetRequest.SetPolicy.SET, "putAll"));
735 @Override
736 public Future<Boolean> delete(Object key) {
737 return delete(key, 0);
740 @Override
741 public Future<Boolean> delete(Object key, long millisNoReAdd) {
742 MemcacheDeleteRequest request = MemcacheDeleteRequest.newBuilder()
743 .setNameSpace(getEffectiveNamespace())
744 .addItem(MemcacheDeleteRequest.Item.newBuilder()
745 .setKey(makePbKey(key))
746 .setDeleteTime((int) TimeUnit.SECONDS.convert(millisNoReAdd, TimeUnit.MILLISECONDS)))
747 .build();
748 return makeAsyncCall("Delete", request, createRpcResponseHandler(
749 MemcacheDeleteResponse.getDefaultInstance(),
750 "Memcache delete: Unknown exception deleting key: " + key,
751 new Transformer<MemcacheDeleteResponse, Boolean>() {
752 @Override public Boolean transform(MemcacheDeleteResponse response) {
753 return response.getDeleteStatus(0) == DeleteStatusCode.DELETED;
755 }), DefaultValueProviders.falseValue());
758 @Override
759 public <T> Future<Set<T>> deleteAll(Collection<T> keys) {
760 return deleteAll(keys, 0);
763 @Override
764 public <T> Future<Set<T>> deleteAll(Collection<T> keys, long millisNoReAdd) {
765 MemcacheDeleteRequest.Builder requestBuilder =
766 MemcacheDeleteRequest.newBuilder().setNameSpace(getEffectiveNamespace());
767 final List<T> requestedKeys = new ArrayList<T>(keys.size());
768 for (T key : keys) {
769 requestedKeys.add(key);
770 requestBuilder.addItem(MemcacheDeleteRequest.Item.newBuilder()
771 .setDeleteTime((int) (millisNoReAdd / 1000))
772 .setKey(makePbKey(key)));
774 return makeAsyncCall("Delete", requestBuilder.build(), createRpcResponseHandler(
775 MemcacheDeleteResponse.getDefaultInstance(),
776 "Memcache deleteAll: Unknown exception deleting multiple keys",
777 new Transformer<MemcacheDeleteResponse, Set<T>>() {
778 @Override public Set<T> transform(MemcacheDeleteResponse response) {
779 Set<T> retval = new LinkedHashSet<T>();
780 Iterator<T> requestedKeysIter = requestedKeys.iterator();
781 for (DeleteStatusCode deleteStatus : response.getDeleteStatusList()) {
782 T requestedKey = requestedKeysIter.next();
783 if (deleteStatus == DeleteStatusCode.DELETED) {
784 retval.add(requestedKey);
787 return retval;
789 }), DefaultValueProviders.<T>emptySet());
792 private static MemcacheIncrementRequest.Builder newIncrementRequestBuilder(
793 Object key, long delta, Long initialValue) {
794 MemcacheIncrementRequest.Builder requestBuilder = MemcacheIncrementRequest.newBuilder();
795 requestBuilder.setKey(makePbKey(key));
796 if (delta > 0) {
797 requestBuilder.setDirection(Direction.INCREMENT);
798 requestBuilder.setDelta(delta);
799 } else {
800 requestBuilder.setDirection(Direction.DECREMENT);
801 requestBuilder.setDelta(-delta);
803 if (initialValue != null) {
804 requestBuilder.setInitialValue(initialValue);
805 requestBuilder.setInitialFlags(MemcacheSerialization.Flag.LONG.ordinal());
807 return requestBuilder;
810 @Override
811 public Future<Long> increment(Object key, long delta) {
812 return increment(key, delta, null);
815 @Override
816 public Future<Long> increment(final Object key, long delta, Long initialValue) {
817 MemcacheIncrementRequest request = newIncrementRequestBuilder(key, delta, initialValue)
818 .setNameSpace(getEffectiveNamespace())
819 .build();
820 return makeAsyncCall("Increment", request,
821 new RpcResponseHandler<MemcacheIncrementResponse, Long>(
822 MemcacheIncrementResponse.getDefaultInstance(),
823 "Memcache increment: exception when incrementing key '" + key + "'",
824 new Transformer<MemcacheIncrementResponse, Long>() {
825 @Override public Long transform(MemcacheIncrementResponse response) {
826 return response.hasNewValue() ? response.getNewValue() : null;
828 }, getErrorHandler()) {
829 @Override void handleApiProxyException(Throwable cause) throws Exception {
830 if (cause instanceof ApiProxy.ApplicationException) {
831 ApiProxy.ApplicationException applicationException =
832 (ApiProxy.ApplicationException) cause;
833 getLogger().info(applicationException.getErrorDetail());
834 if (applicationException.getApplicationError() ==
835 MemcacheServiceError.ErrorCode.INVALID_VALUE_VALUE) {
836 throw new InvalidValueException("Non-incrementable value for key '" + key + "'");
839 super.handleApiProxyException(cause);
841 }, DefaultValueProviders.<Long>nullValue());
844 @Override
845 public <T> Future<Map<T, Long>> incrementAll(Collection<T> keys, long delta) {
846 return incrementAll(keys, delta, null);
849 @Override
850 public <T> Future<Map<T, Long>> incrementAll(Collection<T> keys, long delta, Long initialValue) {
851 return incrementAll(makeMap(keys, delta), initialValue);
854 @Override
855 public <T> Future<Map<T, Long>> incrementAll(Map<T, Long> offsets) {
856 return incrementAll(offsets, null);
859 @Override
860 public <T> Future<Map<T, Long>> incrementAll(Map<T, Long> offsets, Long initialValue) {
861 MemcacheBatchIncrementRequest.Builder requestBuilder =
862 MemcacheBatchIncrementRequest.newBuilder().setNameSpace(getEffectiveNamespace());
863 final List<T> requestedKeys = new ArrayList<T>(offsets.size());
864 for (Map.Entry<T, Long> entry : offsets.entrySet()) {
865 requestedKeys.add(entry.getKey());
866 requestBuilder.addItem(
867 newIncrementRequestBuilder(entry.getKey(), entry.getValue(), initialValue));
869 return makeAsyncCall("BatchIncrement", requestBuilder.build(), createRpcResponseHandler(
870 MemcacheBatchIncrementResponse.getDefaultInstance(),
871 "Memcache incrmentAll: exception incrementing multiple keys",
872 new Transformer<MemcacheBatchIncrementResponse, Map<T, Long>>() {
873 @Override public Map<T, Long> transform(MemcacheBatchIncrementResponse response) {
874 Map<T, Long> result = new LinkedHashMap<T, Long>(requestedKeys.size(), 1);
875 Iterator<MemcacheIncrementResponse> items = response.getItemList().iterator();
876 for (T requestedKey : requestedKeys) {
877 MemcacheIncrementResponse item = items.next();
878 if (item.getIncrementStatus().equals(IncrementStatusCode.OK) && item.hasNewValue()) {
879 result.put(requestedKey, item.getNewValue());
880 } else {
881 result.put(requestedKey, null);
884 return result;
887 new Provider<Map<T, Long>>() {
888 @Override public Map<T, Long> get() {
889 return makeMap(requestedKeys, null);
894 @Override
895 public Future<Void> clearAll() {
896 return makeAsyncCall("FlushAll", MemcacheFlushRequest.getDefaultInstance(),
897 createRpcResponseHandler(MemcacheFlushResponse.getDefaultInstance(),
898 "Memcache clearAll: exception",
899 new Transformer<MemcacheFlushResponse, Void>() {
900 @Override public Void transform(MemcacheFlushResponse response) {
901 return null;
903 }), DefaultValueProviders.<Void>nullValue());
906 @Override
907 public Future<Stats> getStatistics() {
908 return makeAsyncCall("Stats", MemcacheStatsRequest.getDefaultInstance(),
909 createRpcResponseHandler(MemcacheStatsResponse.getDefaultInstance(),
910 "Memcache getStatistics: exception",
911 new Transformer<MemcacheStatsResponse, Stats>() {
912 @Override public Stats transform(MemcacheStatsResponse response) {
913 return new StatsImpl(response.getStats());
915 }), DefaultValueProviders.emptyStats());