1 // Copyright 2011 Google Inc. All rights reserved
3 package com
.google
.appengine
.api
.memcache
;
5 import static com
.google
.appengine
.api
.memcache
.MemcacheServiceApiHelper
.makeAsyncCall
;
7 import com
.google
.appengine
.api
.NamespaceManager
;
8 import com
.google
.appengine
.api
.memcache
.MemcacheSerialization
.ValueAndFlags
;
9 import com
.google
.appengine
.api
.memcache
.MemcacheService
.CasValues
;
10 import com
.google
.appengine
.api
.memcache
.MemcacheService
.IdentifiableValue
;
11 import com
.google
.appengine
.api
.memcache
.MemcacheService
.SetPolicy
;
12 import com
.google
.appengine
.api
.memcache
.MemcacheServiceApiHelper
.Provider
;
13 import com
.google
.appengine
.api
.memcache
.MemcacheServiceApiHelper
.RpcResponseHandler
;
14 import com
.google
.appengine
.api
.memcache
.MemcacheServiceApiHelper
.Transformer
;
15 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheBatchIncrementRequest
;
16 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheBatchIncrementResponse
;
17 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheDeleteRequest
;
18 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheDeleteResponse
;
19 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheDeleteResponse
.DeleteStatusCode
;
20 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheFlushRequest
;
21 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheFlushResponse
;
22 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheGetRequest
;
23 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheGetResponse
;
24 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheIncrementRequest
;
25 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheIncrementRequest
.Direction
;
26 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheIncrementResponse
;
27 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheIncrementResponse
.IncrementStatusCode
;
28 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheServiceError
;
29 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheSetRequest
;
30 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheSetResponse
;
31 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheSetResponse
.SetStatusCode
;
32 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheStatsRequest
;
33 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MemcacheStatsResponse
;
34 import com
.google
.appengine
.api
.memcache
.MemcacheServicePb
.MergedNamespaceStats
;
35 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
36 import com
.google
.apphosting
.api
.ApiProxy
;
37 import com
.google
.common
.base
.Joiner
;
38 import com
.google
.common
.base
.StringUtil
;
39 import com
.google
.common
.primitives
.Bytes
;
40 import com
.google
.protobuf
.ByteString
;
41 import com
.google
.protobuf
.Message
;
43 import java
.io
.IOException
;
44 import java
.util
.ArrayList
;
45 import java
.util
.Arrays
;
46 import java
.util
.Collection
;
47 import java
.util
.HashMap
;
48 import java
.util
.HashSet
;
49 import java
.util
.Iterator
;
50 import java
.util
.LinkedHashMap
;
51 import java
.util
.LinkedHashSet
;
52 import java
.util
.List
;
54 import java
.util
.Objects
;
56 import java
.util
.concurrent
.Future
;
57 import java
.util
.concurrent
.TimeUnit
;
60 * Java bindings for the AsyncMemcache service.
63 class AsyncMemcacheServiceImpl
extends BaseMemcacheServiceImpl
implements AsyncMemcacheService
{
65 static class StatsImpl
implements Stats
{
66 private final long hits
, misses
, bytesFetched
, items
, bytesStored
;
67 private final int maxCachedTime
;
69 StatsImpl(MergedNamespaceStats stats
) {
71 hits
= stats
.getHits();
72 misses
= stats
.getMisses();
73 bytesFetched
= stats
.getByteHits();
74 items
= stats
.getItems();
75 bytesStored
= stats
.getBytes();
76 maxCachedTime
= stats
.getOldestItemAge();
78 hits
= misses
= bytesFetched
= items
= bytesStored
= 0;
84 public long getHitCount() {
89 public long getMissCount() {
94 public long getBytesReturnedForHits() {
99 public long getItemCount() {
104 public long getTotalItemBytes() {
109 public int getMaxTimeWithoutAccess() {
110 return maxCachedTime
;
114 public String
toString() {
115 StringBuilder builder
= new StringBuilder();
116 builder
.append("Hits: ").append(hits
).append('\n');
117 builder
.append("Misses: ").append(misses
).append('\n');
118 builder
.append("Bytes Fetched: ").append(bytesFetched
).append('\n');
119 builder
.append("Bytes Stored: ").append(bytesStored
).append('\n');
120 builder
.append("Items: ").append(items
).append('\n');
121 builder
.append("Max Cached Time: ").append(maxCachedTime
).append('\n');
122 return builder
.toString();
126 static final class IdentifiableValueImpl
implements IdentifiableValue
{
127 private final Object value
;
128 private final long casId
;
130 IdentifiableValueImpl(Object value
, long casId
) {
136 public Object
getValue() {
145 public boolean equals(Object otherObj
) {
146 if (this == otherObj
) {
149 if ((otherObj
== null) || (getClass() != otherObj
.getClass())) {
152 IdentifiableValueImpl otherIdentifiableValue
= (IdentifiableValueImpl
) otherObj
;
153 return Objects
.equals(value
, otherIdentifiableValue
.value
) &&
154 (casId
== otherIdentifiableValue
.casId
);
158 public int hashCode() {
159 return Objects
.hash(value
, casId
);
163 private static class DefaultValueProviders
{
165 @SuppressWarnings("rawtypes")
166 private static final Provider NULL_PROVIDER
= new Provider() {
167 @Override public Object
get() {
172 private static final Provider
<Boolean
> FALSE_PROVIDER
= new Provider
<Boolean
>() {
173 @Override public Boolean
get() {
174 return Boolean
.FALSE
;
178 @SuppressWarnings("rawtypes")
179 private static final Provider SET_PROVIDER
= new Provider
<Set
<?
>>() {
180 @Override public Set
<?
> get() {
181 return new HashSet(0, 1);
185 @SuppressWarnings("rawtypes")
186 private static final Provider MAP_PROVIDER
= new Provider
<Map
<?
, ?
>>() {
187 @Override public Map
<?
, ?
> get() {
188 return new HashMap(0, 1);
192 private static final Provider
<Stats
> STATS_PROVIDER
= new Provider
<Stats
>() {
193 final Stats emptyStats
= new AsyncMemcacheServiceImpl
.StatsImpl(null);
195 @Override public Stats
get() {
200 static Provider
<Boolean
> falseValue() {
201 return FALSE_PROVIDER
;
204 @SuppressWarnings("unchecked")
205 static <T
> Provider
<T
> nullValue() {
206 return NULL_PROVIDER
;
209 @SuppressWarnings("unchecked")
210 static <T
> Provider
<Set
<T
>> emptySet() {
214 @SuppressWarnings("unchecked")
215 static <K
, V
> Provider
<Map
<K
, V
>> emptyMap() {
219 static Provider
<Stats
> emptyStats() {
220 return STATS_PROVIDER
;
224 private static class VoidFutureWrapper
<K
> extends FutureWrapper
<K
, Void
> {
226 private VoidFutureWrapper(Future
<K
> parent
) {
230 private static <K
> Future
<Void
> wrap(Future
<K
> parent
) {
231 return new VoidFutureWrapper
<K
>(parent
);
235 protected Void
wrap(K value
) {
240 protected Throwable
convertException(Throwable cause
) {
245 private static final class KeyValuePair
<K
, V
> {
247 private final V value
;
249 private KeyValuePair(K key
, V value
) {
254 static <K
, V
> KeyValuePair
<K
, V
> of(K key
, V value
) {
255 return new KeyValuePair
<K
, V
>(key
, value
);
259 AsyncMemcacheServiceImpl(String namespace
) {
263 static <T
, V
> Map
<T
, V
> makeMap(Collection
<T
> keys
, V value
) {
264 Map
<T
, V
> map
= new LinkedHashMap
<T
, V
>(keys
.size(), 1);
271 private static ByteString
makePbKey(Object key
) {
273 return ByteString
.copyFrom(MemcacheSerialization
.makePbKey(key
));
274 } catch (IOException ex
) {
275 throw new IllegalArgumentException("Cannot use as a key: '" + key
+ "'", ex
);
279 private static ValueAndFlags
serializeValue(Object value
) {
281 return MemcacheSerialization
.serialize(value
);
282 } catch (IOException ex
) {
283 throw new IllegalArgumentException("Cannot use as value: '" + value
+ "'", ex
);
287 private Object
deserializeItem(Object key
, MemcacheGetResponse
.Item item
) {
289 return MemcacheSerialization
.deserialize(item
.getValue().toByteArray(), item
.getFlags());
290 } catch (ClassNotFoundException ex
) {
291 getErrorHandler().handleDeserializationError(
292 new InvalidValueException("Can't find class for value of key '" + key
+ "'", ex
));
293 } catch (IOException ex
) {
294 getErrorHandler().handleDeserializationError(
295 new InvalidValueException("Failed to parse the value for '" + key
+ "'", ex
));
300 private <M
extends Message
, T
> RpcResponseHandler
<M
, T
> createRpcResponseHandler(
301 M response
, String errorText
, Transformer
<M
, T
> responseTransformer
) {
302 return new RpcResponseHandler
<M
, T
>(
303 response
, errorText
, responseTransformer
, getErrorHandler());
306 private <T
> RpcResponseHandlerForPut
<T
> createRpcResponseHandlerForPut(
307 Iterable
<MemcacheSetRequest
.Item
.Builder
> itemBuilders
, String namespace
,
308 MemcacheSetResponse response
, String errorText
,
309 Transformer
<MemcacheSetResponse
, T
> responseTransformer
) {
310 return new RpcResponseHandlerForPut
<T
>(
311 itemBuilders
, namespace
, response
, errorText
, responseTransformer
);
314 private class RpcResponseHandlerForPut
<T
> extends RpcResponseHandler
<MemcacheSetResponse
, T
> {
317 * We remember what was sent to the backend, so that if we get an error response we can test to
318 * see if the error was caused by the API being given invalid values.
320 private final Iterable
<MemcacheSetRequest
.Item
.Builder
> itemsSentToBackend
;
321 private final String namespace
;
323 RpcResponseHandlerForPut(Iterable
<MemcacheSetRequest
.Item
.Builder
> itemsSentToBackend
,
325 MemcacheSetResponse response
, String errorText
,
326 Transformer
<MemcacheSetResponse
, T
> responseTransfomer
) {
327 super(response
, errorText
, responseTransfomer
, getErrorHandler());
328 this.itemsSentToBackend
= itemsSentToBackend
;
329 this.namespace
= namespace
;
333 * When we get an error from the backend we check to see if it could possibly have been caused
334 * by an invalid key or value being passed into the API from the app. If so we test the original
335 * key and value passed in and if we find them to be invalid we throw an exception that is more
336 * informative to the app writer than the default exception.
339 void handleApiProxyException(Throwable cause
) throws Exception
{
340 if (cause
instanceof ApiProxy
.ApplicationException
) {
341 ApiProxy
.ApplicationException applicationException
= (ApiProxy
.ApplicationException
) cause
;
342 if (applicationException
.getApplicationError()
343 == MemcacheServiceError
.ErrorCode
.UNSPECIFIED_ERROR_VALUE
) {
344 handleApiProxyException(applicationException
.getErrorDetail());
346 } else if (cause
instanceof MemcacheServiceException
){
347 handleApiProxyException(cause
.getMessage());
349 super.handleApiProxyException(cause
);
352 private void handleApiProxyException(String msg
) throws MemcacheServiceException
{
353 for (MemcacheSetRequest
.Item
.Builder itemBuilder
: itemsSentToBackend
) {
354 ByteString pbKey
= itemBuilder
.getKey();
355 ByteString value
= itemBuilder
.getValue();
356 if (value
.size() + pbKey
.size() > ITEM_SIZE_LIMIT
) {
357 maybeThrow("Key+value is bigger than maximum allowed. " + msg
);
359 if (Bytes
.contains(pbKey
.toByteArray(), (byte) 0)) {
360 maybeThrow("Key contains embedded null byte. " + msg
);
362 if (namespace
!= null) {
364 NamespaceManager
.validateNamespace(namespace
);
365 } catch (IllegalArgumentException ex
) {
366 maybeThrow(ex
.toString());
372 /** Will throw exception or just log it, depending on what error handler is set.*/
373 private void maybeThrow(String msg
) {
374 getErrorHandler().handleServiceError(new MemcacheServiceException(msg
));
379 * Matches limit documented in
380 * https://developers.google.com/appengine/docs/python/memcache/#Python_Limits
382 * TODO(user) This is too conservative; need to revisit. The docs are conflating item size
383 * limit with total cache size accounting.
385 static final int ITEM_SIZE_LIMIT
= 1024 * 1024 - 96;
388 public Future
<Boolean
> contains(Object key
) {
389 return doGet(key
, false, "Memcache contains: exception testing contains (" + key
+ ")",
390 new Transformer
<MemcacheGetResponse
, Boolean
>() {
391 @Override public Boolean
transform(MemcacheGetResponse response
) {
392 return response
.getItemCount() > 0;
394 }, DefaultValueProviders
.falseValue());
397 private <T
> Future
<T
> doGet(Object key
, boolean forCas
, String errorText
,
398 final Transformer
<MemcacheGetResponse
, T
> responseTransfomer
, Provider
<T
> defaultValue
) {
399 MemcacheGetRequest
.Builder requestBuilder
= MemcacheGetRequest
.newBuilder();
400 requestBuilder
.addKey(makePbKey(key
));
401 requestBuilder
.setNameSpace(getEffectiveNamespace());
403 requestBuilder
.setForCas(true);
405 return makeAsyncCall("Get", requestBuilder
.build(),
406 createRpcResponseHandler(MemcacheGetResponse
.getDefaultInstance(), errorText
,
407 responseTransfomer
), defaultValue
);
411 public Future
<Object
> get(final Object key
) {
412 return doGet(key
, false, "Memcache get: exception getting 1 key (" + key
+ ")",
413 new Transformer
<MemcacheGetResponse
, Object
>() {
414 @Override public Object
transform(MemcacheGetResponse response
) {
415 return response
.getItemCount() == 0 ?
null : deserializeItem(key
, response
.getItem(0));
417 }, DefaultValueProviders
.nullValue());
421 public Future
<IdentifiableValue
> getIdentifiable(final Object key
) {
422 return doGet(key
, true, "Memcache getIdentifiable: exception getting 1 key (" + key
+ ")",
423 new Transformer
<MemcacheGetResponse
, IdentifiableValue
>() {
424 @Override public IdentifiableValue
transform(MemcacheGetResponse response
) {
425 if (response
.getItemCount() == 0) {
428 MemcacheGetResponse
.Item item
= response
.getItem(0);
429 return new IdentifiableValueImpl(deserializeItem(key
, item
), item
.getCasId());
431 }, DefaultValueProviders
.<IdentifiableValue
>nullValue());
435 public <K
> Future
<Map
<K
, IdentifiableValue
>> getIdentifiables(Collection
<K
> keys
) {
436 return doGetAll(keys
, true,
437 "Memcache getIdentifiables: exception getting multiple keys",
438 new Transformer
<KeyValuePair
<K
, MemcacheGetResponse
.Item
>, IdentifiableValue
>() {
440 public IdentifiableValue
transform(KeyValuePair
<K
, MemcacheGetResponse
.Item
> pair
) {
441 MemcacheGetResponse
.Item item
= pair
.value
;
442 Object value
= deserializeItem(pair
.key
, item
);
443 return new IdentifiableValueImpl(value
, item
.getCasId());
445 }, DefaultValueProviders
.<K
, IdentifiableValue
>emptyMap());
449 public <K
> Future
<Map
<K
, Object
>> getAll(Collection
<K
> keys
) {
450 return doGetAll(keys
, false,
451 "Memcache getAll: exception getting multiple keys",
452 new Transformer
<KeyValuePair
<K
, MemcacheGetResponse
.Item
>, Object
>() {
454 public Object
transform(KeyValuePair
<K
, MemcacheGetResponse
.Item
> pair
) {
455 return deserializeItem(pair
.key
, pair
.value
);
457 }, DefaultValueProviders
.<K
, Object
>emptyMap());
460 private <K
, V
> Future
<Map
<K
, V
>> doGetAll(Collection
<K
> keys
, boolean forCas
,
462 final Transformer
<KeyValuePair
<K
, MemcacheGetResponse
.Item
>, V
> responseTransfomer
,
463 Provider
<Map
<K
, V
>> defaultValue
) {
464 MemcacheGetRequest
.Builder requestBuilder
= MemcacheGetRequest
.newBuilder();
465 requestBuilder
.setNameSpace(getEffectiveNamespace());
466 final Map
<ByteString
, K
> byteStringToKey
= new HashMap
<ByteString
, K
>(keys
.size(), 1);
468 ByteString pbKey
= makePbKey(key
);
469 byteStringToKey
.put(pbKey
, key
);
470 requestBuilder
.addKey(pbKey
);
473 requestBuilder
.setForCas(forCas
);
475 return makeAsyncCall("Get", requestBuilder
.build(), createRpcResponseHandler(
476 MemcacheGetResponse
.getDefaultInstance(), errorText
,
477 new Transformer
<MemcacheGetResponse
, Map
<K
, V
>>() {
479 public Map
<K
, V
> transform(MemcacheGetResponse response
) {
480 Map
<K
, V
> result
= new HashMap
<K
, V
>();
481 for (MemcacheGetResponse
.Item item
: response
.getItemList()) {
482 K key
= byteStringToKey
.get(item
.getKey());
483 V obj
= responseTransfomer
.transform(KeyValuePair
.of(key
, item
));
484 result
.put(key
, obj
);
491 /** Use this to make sure we don't write arbitrarily big log messages. */
492 private static final int MAX_LOGGED_VALUE_SIZE
= 100;
495 * Represents the overhead to an item as it is stored in memcacheg
496 * if it has a value bigger than 65535 bytes and all the optional fields are set.
497 * This number was determined by fiddling with cacheserving/memcacheg/server:item_test
498 * and analyzing CalculateItemSize from cacheserving/memcacheg/server/item.cc.
500 * <p>The overhead can be between 61 and 73 bytes depending on whether optional
501 * fields (flag, expiration and CAS) are set or not, adding 4 bytes for each of
504 private static final int MEMCACHEG_OVERHEAD
= 73;
505 private static final int ONE_MEGABYTE
= 1024 * 1024;
506 public static final int MAX_ITEM_SIZE
= ONE_MEGABYTE
- MEMCACHEG_OVERHEAD
;
509 * Note: non-null oldValue implies Compare-and-Swap operation.
511 private Future
<Boolean
> doPut(final Object key
, IdentifiableValue oldValue
, Object value
,
512 Expiration expires
, MemcacheSetRequest
.SetPolicy policy
) {
513 MemcacheSetRequest
.Builder requestBuilder
= MemcacheSetRequest
.newBuilder();
514 requestBuilder
.setNameSpace(getEffectiveNamespace());
515 MemcacheSetRequest
.Item
.Builder itemBuilder
= MemcacheSetRequest
.Item
.newBuilder();
516 ValueAndFlags vaf
= serializeValue(value
);
517 itemBuilder
.setValue(ByteString
.copyFrom(vaf
.value
));
518 itemBuilder
.setFlags(vaf
.flags
.ordinal());
519 itemBuilder
.setKey(makePbKey(key
));
520 itemBuilder
.setExpirationTime(expires
== null ?
0 : expires
.getSecondsValue());
521 itemBuilder
.setSetPolicy(policy
);
522 if (policy
== MemcacheSetRequest
.SetPolicy
.CAS
) {
523 if (oldValue
== null) {
524 throw new IllegalArgumentException("oldValue must not be null.");
526 if (!(oldValue
instanceof IdentifiableValueImpl
)) {
527 throw new IllegalArgumentException(
528 "oldValue is an instance of an unapproved IdentifiableValue implementation. " +
529 "Perhaps you implemented your own version of IdentifiableValue? " +
530 "If so, don't do this.");
532 itemBuilder
.setCasId(((IdentifiableValueImpl
) oldValue
).getCasId());
534 final int itemSize
= itemBuilder
.getKey().size() + itemBuilder
.getValue().size();
535 requestBuilder
.addItem(itemBuilder
);
537 String valueAsString
=
538 StringUtil
.truncateAtMaxLength(String
.valueOf(value
), MAX_LOGGED_VALUE_SIZE
, true);
539 return makeAsyncCall("Set", requestBuilder
.build(), createRpcResponseHandlerForPut(
540 Arrays
.asList(itemBuilder
),
541 requestBuilder
.getNameSpace(),
542 MemcacheSetResponse
.getDefaultInstance(),
543 String
.format("Memcache put: exception setting 1 key (%s) to '%s'", key
, valueAsString
),
544 new Transformer
<MemcacheSetResponse
, Boolean
>() {
545 @Override public Boolean
transform(MemcacheSetResponse response
) {
546 if (response
.getSetStatusCount() != 1) {
547 throw new MemcacheServiceException("Memcache put: Set one item, got "
548 + response
.getSetStatusCount() + " response statuses");
550 SetStatusCode status
= response
.getSetStatus(0);
551 if (status
== SetStatusCode
.ERROR
) {
552 if (itemSize
> MAX_ITEM_SIZE
) {
553 throw new MemcacheServiceException(
554 String
.format("Memcache put: Item may not be more than %d bytes in length; "
555 + "received %d bytes.", MAX_ITEM_SIZE
, itemSize
));
557 throw new MemcacheServiceException(
558 "Memcache put: Error setting single item (" + key
+ ")");
560 return status
== SetStatusCode
.STORED
;
562 }), DefaultValueProviders
.falseValue());
565 private static MemcacheSetRequest
.SetPolicy
convertSetPolicyToPb(SetPolicy policy
) {
568 return MemcacheSetRequest
.SetPolicy
.SET
;
569 case ADD_ONLY_IF_NOT_PRESENT
:
570 return MemcacheSetRequest
.SetPolicy
.ADD
;
571 case REPLACE_ONLY_IF_PRESENT
:
572 return MemcacheSetRequest
.SetPolicy
.REPLACE
;
574 throw new IllegalArgumentException("Unknown policy: " + policy
);
578 public Future
<Boolean
> put(Object key
, Object value
, Expiration expires
, SetPolicy policy
) {
579 return doPut(key
, null, value
, expires
, convertSetPolicyToPb(policy
));
583 public Future
<Void
> put(Object key
, Object value
, Expiration expires
) {
584 return VoidFutureWrapper
.wrap(
585 doPut(key
, null, value
, expires
, MemcacheSetRequest
.SetPolicy
.SET
));
589 public Future
<Void
> put(Object key
, Object value
) {
590 return VoidFutureWrapper
.wrap(
591 doPut(key
, null, value
, null, MemcacheSetRequest
.SetPolicy
.SET
));
595 public Future
<Boolean
> putIfUntouched(Object key
, IdentifiableValue oldValue
,
596 Object newValue
, Expiration expires
) {
597 return doPut(key
, oldValue
, newValue
, expires
, MemcacheSetRequest
.SetPolicy
.CAS
);
601 public Future
<Boolean
> putIfUntouched(Object key
, IdentifiableValue oldValue
, Object newValue
) {
602 return doPut(key
, oldValue
, newValue
, null, MemcacheSetRequest
.SetPolicy
.CAS
);
606 public <T
> Future
<Set
<T
>> putIfUntouched(Map
<T
, CasValues
> values
) {
607 return doPutAll(values
, null, MemcacheSetRequest
.SetPolicy
.CAS
, "putIfUntouched");
611 public <T
> Future
<Set
<T
>> putIfUntouched(Map
<T
, CasValues
> values
, Expiration expiration
) {
612 return doPutAll(values
, expiration
, MemcacheSetRequest
.SetPolicy
.CAS
, "putIfUntouched");
615 private <T
> Future
<Set
<T
>> doPutAll(Map
<T
, ?
> values
, Expiration expires
,
616 MemcacheSetRequest
.SetPolicy policy
, String operation
) {
617 MemcacheSetRequest
.Builder requestBuilder
= MemcacheSetRequest
.newBuilder();
618 requestBuilder
.setNameSpace(getEffectiveNamespace());
620 final List
<T
> requestedKeys
= new ArrayList
<T
>(values
.size());
621 Set
<Integer
> oversized
= new HashSet
<>();
624 for (Map
.Entry
<T
, ?
> entry
: values
.entrySet()) {
625 MemcacheSetRequest
.Item
.Builder itemBuilder
= MemcacheSetRequest
.Item
.newBuilder();
626 requestedKeys
.add(entry
.getKey());
627 itemBuilder
.setKey(makePbKey(entry
.getKey()));
629 if (policy
== MemcacheSetRequest
.SetPolicy
.CAS
) {
630 CasValues value
= (CasValues
) entry
.getValue();
632 throw new IllegalArgumentException(entry
.getKey() + " has a null for CasValues");
634 vaf
= serializeValue(value
.getNewValue());
635 if (!(value
.getOldValue() instanceof IdentifiableValueImpl
)) {
636 throw new IllegalArgumentException(
637 entry
.getKey() + " CasValues has an oldValue instance of an unapproved " +
638 "IdentifiableValue implementation. Perhaps you implemented your own " +
639 "version of IdentifiableValue? If so, don't do this.");
641 itemBuilder
.setCasId(((IdentifiableValueImpl
) value
.getOldValue()).getCasId());
642 if (value
.getExipration() != null) {
643 itemBuilder
.setExpirationTime(value
.getExipration().getSecondsValue());
645 itemBuilder
.setExpirationTime(expires
== null ?
0 : expires
.getSecondsValue());
648 vaf
= serializeValue(entry
.getValue());
649 itemBuilder
.setExpirationTime(expires
== null ?
0 : expires
.getSecondsValue());
651 itemBuilder
.setValue(ByteString
.copyFrom(vaf
.value
));
652 itemBuilder
.setFlags(vaf
.flags
.ordinal());
653 itemBuilder
.setSetPolicy(policy
);
654 requestBuilder
.addItem(itemBuilder
);
656 int itemSize
= itemBuilder
.getKey().size() + itemBuilder
.getValue().size();
657 if (itemSize
> MAX_ITEM_SIZE
) {
658 oversized
.add(itemIndex
);
663 final Set
<Integer
> oversizedFinal
= new HashSet(oversized
);
665 return makeAsyncCall("Set", requestBuilder
.build(), createRpcResponseHandlerForPut(
666 requestBuilder
.getItemBuilderList(),
667 requestBuilder
.getNameSpace(),
668 MemcacheSetResponse
.getDefaultInstance(),
669 "Memcache " + operation
+ ": Unknown exception setting " + values
.size() + " keys",
670 new Transformer
<MemcacheSetResponse
, Set
<T
>>() {
671 @Override public Set
<T
> transform(MemcacheSetResponse response
) {
672 if (response
.getSetStatusCount() != requestedKeys
.size()) {
673 throw new MemcacheServiceException(String
.format(
674 "Memcache put: Set %d items, got %d response statuses",
675 requestedKeys
.size(),
676 response
.getSetStatusCount()));
678 HashSet
<T
> result
= new HashSet
<>();
679 HashSet
<T
> sizeErrors
= new HashSet
<>();
680 HashSet
<T
> otherErrors
= new HashSet
<>();
681 Iterator
<SetStatusCode
> statusIter
= response
.getSetStatusList().iterator();
684 for (T requestedKey
: requestedKeys
) {
685 SetStatusCode status
= statusIter
.next();
686 if (status
== MemcacheSetResponse
.SetStatusCode
.ERROR
) {
687 if (oversizedFinal
.contains(itemIndex
)) {
688 sizeErrors
.add(requestedKey
);
690 otherErrors
.add(requestedKey
);
692 } else if (status
== MemcacheSetResponse
.SetStatusCode
.STORED
) {
693 result
.add(requestedKey
);
697 if (!sizeErrors
.isEmpty() || !otherErrors
.isEmpty()) {
698 StringBuilder builder
= new StringBuilder("Memcache put: ");
699 if (!sizeErrors
.isEmpty()) {
700 builder
.append(sizeErrors
.size());
701 builder
.append(" items failed for exceeding ");
702 builder
.append(MAX_ITEM_SIZE
).append(" bytes; keys: ");
703 builder
.append(Joiner
.on(", ").join(sizeErrors
));
704 builder
.append(". ");
706 if (!otherErrors
.isEmpty()) {
707 builder
.append("Set failed to set ");
708 builder
.append(otherErrors
.size()).append(" keys: ");
709 builder
.append(Joiner
.on(", ").join(otherErrors
));
711 throw new MemcacheServiceException(builder
.toString());
715 }), DefaultValueProviders
.<T
>emptySet());
719 public <T
> Future
<Set
<T
>> putAll(Map
<T
, ?
> values
, Expiration expires
, SetPolicy policy
) {
720 return doPutAll(values
, expires
, convertSetPolicyToPb(policy
), "putAll");
724 public Future
<Void
> putAll(Map
<?
, ?
> values
, Expiration expires
) {
725 return VoidFutureWrapper
.wrap(
726 doPutAll(values
, expires
, MemcacheSetRequest
.SetPolicy
.SET
, "putAll"));
730 public Future
<Void
> putAll(Map
<?
, ?
> values
) {
731 return VoidFutureWrapper
.wrap(
732 doPutAll(values
, null, MemcacheSetRequest
.SetPolicy
.SET
, "putAll"));
736 public Future
<Boolean
> delete(Object key
) {
737 return delete(key
, 0);
741 public Future
<Boolean
> delete(Object key
, long millisNoReAdd
) {
742 MemcacheDeleteRequest request
= MemcacheDeleteRequest
.newBuilder()
743 .setNameSpace(getEffectiveNamespace())
744 .addItem(MemcacheDeleteRequest
.Item
.newBuilder()
745 .setKey(makePbKey(key
))
746 .setDeleteTime((int) TimeUnit
.SECONDS
.convert(millisNoReAdd
, TimeUnit
.MILLISECONDS
)))
748 return makeAsyncCall("Delete", request
, createRpcResponseHandler(
749 MemcacheDeleteResponse
.getDefaultInstance(),
750 "Memcache delete: Unknown exception deleting key: " + key
,
751 new Transformer
<MemcacheDeleteResponse
, Boolean
>() {
752 @Override public Boolean
transform(MemcacheDeleteResponse response
) {
753 return response
.getDeleteStatus(0) == DeleteStatusCode
.DELETED
;
755 }), DefaultValueProviders
.falseValue());
759 public <T
> Future
<Set
<T
>> deleteAll(Collection
<T
> keys
) {
760 return deleteAll(keys
, 0);
764 public <T
> Future
<Set
<T
>> deleteAll(Collection
<T
> keys
, long millisNoReAdd
) {
765 MemcacheDeleteRequest
.Builder requestBuilder
=
766 MemcacheDeleteRequest
.newBuilder().setNameSpace(getEffectiveNamespace());
767 final List
<T
> requestedKeys
= new ArrayList
<T
>(keys
.size());
769 requestedKeys
.add(key
);
770 requestBuilder
.addItem(MemcacheDeleteRequest
.Item
.newBuilder()
771 .setDeleteTime((int) (millisNoReAdd
/ 1000))
772 .setKey(makePbKey(key
)));
774 return makeAsyncCall("Delete", requestBuilder
.build(), createRpcResponseHandler(
775 MemcacheDeleteResponse
.getDefaultInstance(),
776 "Memcache deleteAll: Unknown exception deleting multiple keys",
777 new Transformer
<MemcacheDeleteResponse
, Set
<T
>>() {
778 @Override public Set
<T
> transform(MemcacheDeleteResponse response
) {
779 Set
<T
> retval
= new LinkedHashSet
<T
>();
780 Iterator
<T
> requestedKeysIter
= requestedKeys
.iterator();
781 for (DeleteStatusCode deleteStatus
: response
.getDeleteStatusList()) {
782 T requestedKey
= requestedKeysIter
.next();
783 if (deleteStatus
== DeleteStatusCode
.DELETED
) {
784 retval
.add(requestedKey
);
789 }), DefaultValueProviders
.<T
>emptySet());
792 private static MemcacheIncrementRequest
.Builder
newIncrementRequestBuilder(
793 Object key
, long delta
, Long initialValue
) {
794 MemcacheIncrementRequest
.Builder requestBuilder
= MemcacheIncrementRequest
.newBuilder();
795 requestBuilder
.setKey(makePbKey(key
));
797 requestBuilder
.setDirection(Direction
.INCREMENT
);
798 requestBuilder
.setDelta(delta
);
800 requestBuilder
.setDirection(Direction
.DECREMENT
);
801 requestBuilder
.setDelta(-delta
);
803 if (initialValue
!= null) {
804 requestBuilder
.setInitialValue(initialValue
);
805 requestBuilder
.setInitialFlags(MemcacheSerialization
.Flag
.LONG
.ordinal());
807 return requestBuilder
;
811 public Future
<Long
> increment(Object key
, long delta
) {
812 return increment(key
, delta
, null);
816 public Future
<Long
> increment(final Object key
, long delta
, Long initialValue
) {
817 MemcacheIncrementRequest request
= newIncrementRequestBuilder(key
, delta
, initialValue
)
818 .setNameSpace(getEffectiveNamespace())
820 return makeAsyncCall("Increment", request
,
821 new RpcResponseHandler
<MemcacheIncrementResponse
, Long
>(
822 MemcacheIncrementResponse
.getDefaultInstance(),
823 "Memcache increment: exception when incrementing key '" + key
+ "'",
824 new Transformer
<MemcacheIncrementResponse
, Long
>() {
825 @Override public Long
transform(MemcacheIncrementResponse response
) {
826 return response
.hasNewValue() ? response
.getNewValue() : null;
828 }, getErrorHandler()) {
829 @Override void handleApiProxyException(Throwable cause
) throws Exception
{
830 if (cause
instanceof ApiProxy
.ApplicationException
) {
831 ApiProxy
.ApplicationException applicationException
=
832 (ApiProxy
.ApplicationException
) cause
;
833 getLogger().info(applicationException
.getErrorDetail());
834 if (applicationException
.getApplicationError() ==
835 MemcacheServiceError
.ErrorCode
.INVALID_VALUE_VALUE
) {
836 throw new InvalidValueException("Non-incrementable value for key '" + key
+ "'");
839 super.handleApiProxyException(cause
);
841 }, DefaultValueProviders
.<Long
>nullValue());
845 public <T
> Future
<Map
<T
, Long
>> incrementAll(Collection
<T
> keys
, long delta
) {
846 return incrementAll(keys
, delta
, null);
850 public <T
> Future
<Map
<T
, Long
>> incrementAll(Collection
<T
> keys
, long delta
, Long initialValue
) {
851 return incrementAll(makeMap(keys
, delta
), initialValue
);
855 public <T
> Future
<Map
<T
, Long
>> incrementAll(Map
<T
, Long
> offsets
) {
856 return incrementAll(offsets
, null);
860 public <T
> Future
<Map
<T
, Long
>> incrementAll(Map
<T
, Long
> offsets
, Long initialValue
) {
861 MemcacheBatchIncrementRequest
.Builder requestBuilder
=
862 MemcacheBatchIncrementRequest
.newBuilder().setNameSpace(getEffectiveNamespace());
863 final List
<T
> requestedKeys
= new ArrayList
<T
>(offsets
.size());
864 for (Map
.Entry
<T
, Long
> entry
: offsets
.entrySet()) {
865 requestedKeys
.add(entry
.getKey());
866 requestBuilder
.addItem(
867 newIncrementRequestBuilder(entry
.getKey(), entry
.getValue(), initialValue
));
869 return makeAsyncCall("BatchIncrement", requestBuilder
.build(), createRpcResponseHandler(
870 MemcacheBatchIncrementResponse
.getDefaultInstance(),
871 "Memcache incrmentAll: exception incrementing multiple keys",
872 new Transformer
<MemcacheBatchIncrementResponse
, Map
<T
, Long
>>() {
873 @Override public Map
<T
, Long
> transform(MemcacheBatchIncrementResponse response
) {
874 Map
<T
, Long
> result
= new LinkedHashMap
<T
, Long
>(requestedKeys
.size(), 1);
875 Iterator
<MemcacheIncrementResponse
> items
= response
.getItemList().iterator();
876 for (T requestedKey
: requestedKeys
) {
877 MemcacheIncrementResponse item
= items
.next();
878 if (item
.getIncrementStatus().equals(IncrementStatusCode
.OK
) && item
.hasNewValue()) {
879 result
.put(requestedKey
, item
.getNewValue());
881 result
.put(requestedKey
, null);
887 new Provider
<Map
<T
, Long
>>() {
888 @Override public Map
<T
, Long
> get() {
889 return makeMap(requestedKeys
, null);
895 public Future
<Void
> clearAll() {
896 return makeAsyncCall("FlushAll", MemcacheFlushRequest
.getDefaultInstance(),
897 createRpcResponseHandler(MemcacheFlushResponse
.getDefaultInstance(),
898 "Memcache clearAll: exception",
899 new Transformer
<MemcacheFlushResponse
, Void
>() {
900 @Override public Void
transform(MemcacheFlushResponse response
) {
903 }), DefaultValueProviders
.<Void
>nullValue());
907 public Future
<Stats
> getStatistics() {
908 return makeAsyncCall("Stats", MemcacheStatsRequest
.getDefaultInstance(),
909 createRpcResponseHandler(MemcacheStatsResponse
.getDefaultInstance(),
910 "Memcache getStatistics: exception",
911 new Transformer
<MemcacheStatsResponse
, Stats
>() {
912 @Override public Stats
transform(MemcacheStatsResponse response
) {
913 return new StatsImpl(response
.getStats());
915 }), DefaultValueProviders
.emptyStats());