1 // Copyright 2010 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.appengine
.api
.datastore
.DatastoreApiHelper
.makeAsyncCall
;
6 import static com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
.MASTER_SLAVE
;
7 import static com
.google
.appengine
.api
.datastore
.FetchOptions
.Builder
.withLimit
;
8 import static com
.google
.appengine
.api
.datastore
.ReadPolicy
.Consistency
.EVENTUAL
;
10 import com
.google
.appengine
.api
.datastore
.Batcher
.ReorderingMultiFuture
;
11 import com
.google
.appengine
.api
.datastore
.DatastoreService
.KeyRangeState
;
12 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
13 import com
.google
.appengine
.api
.datastore
.Index
.IndexState
;
14 import com
.google
.appengine
.api
.datastore
.Query
.FilterOperator
;
15 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
16 import com
.google
.apphosting
.api
.ApiBasePb
.StringProto
;
17 import com
.google
.apphosting
.api
.ApiProxy
.ApiConfig
;
18 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
;
19 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.AllocateIdsRequest
;
20 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.AllocateIdsResponse
;
21 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.CompositeIndices
;
22 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.DatastoreService_3
.Method
;
23 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.DeleteRequest
;
24 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.DeleteResponse
;
25 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.GetRequest
;
26 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.GetResponse
;
27 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.PutRequest
;
28 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.PutResponse
;
29 import com
.google
.common
.collect
.Lists
;
30 import com
.google
.common
.collect
.Maps
;
31 import com
.google
.io
.protocol
.ProtocolMessage
;
32 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.CompositeIndex
;
33 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.EntityProto
;
34 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.Reference
;
36 import java
.util
.ArrayList
;
37 import java
.util
.Collection
;
38 import java
.util
.Collections
;
39 import java
.util
.Iterator
;
40 import java
.util
.LinkedHashMap
;
41 import java
.util
.List
;
44 import java
.util
.concurrent
.ExecutionException
;
45 import java
.util
.concurrent
.Future
;
46 import java
.util
.concurrent
.TimeUnit
;
47 import java
.util
.concurrent
.TimeoutException
;
48 import java
.util
.logging
.Level
;
51 * An implementation of AsyncDatastoreService using the DatastoreV3 API.
54 class AsyncDatastoreServiceImpl
extends BaseAsyncDatastoreServiceImpl
{
57 * A base batcher for DatastoreV3 operations executed in the context of an {@link
58 * AsyncDatastoreServiceImpl}.
59 * @param <S> the response message type
60 * @param <R> the request message type
61 * @param <F> the Java specific representation of a value
62 * @param <T> the proto representation of value
64 private abstract class V3Batcher
<S
extends ProtocolMessage
<S
>, R
extends ProtocolMessage
<R
>,
65 F
, T
extends ProtocolMessage
<T
>> extends BaseRpcBatcher
<S
, R
, F
, T
> {
67 final R
newBatch(R baseBatch
) {
68 return baseBatch
.clone();
73 * A base batcher for operations that operate on {@link Key}s.
74 * @param <S> the response message type
75 * @param <R> the request message type
77 private abstract class V3KeyBatcher
<S
extends ProtocolMessage
<S
>, R
extends ProtocolMessage
<R
>>
78 extends V3Batcher
<S
, R
, Key
, Reference
> {
80 final Object
getGroup(Key value
) {
81 return value
.getRootKey();
85 final Reference
toPb(Key value
) {
86 return KeyTranslator
.convertToPb(value
);
90 private final V3KeyBatcher
<DeleteResponse
, DeleteRequest
> deleteBatcher
=
91 new V3KeyBatcher
<DeleteResponse
, DeleteRequest
>() {
93 void addToBatch(Reference value
, DeleteRequest batch
) {
99 return datastoreServiceConfig
.maxBatchWriteEntities
;
103 protected Future
<DeleteResponse
> makeCall(DeleteRequest batch
) {
104 return makeAsyncCall(apiConfig
, Method
.Delete
, batch
, new DeleteResponse());
108 private final V3KeyBatcher
<GetResponse
, GetRequest
> getByKeyBatcher
=
109 new V3KeyBatcher
<GetResponse
, GetRequest
>() {
111 void addToBatch(Reference value
, GetRequest batch
) {
117 return datastoreServiceConfig
.maxBatchReadEntities
;
121 protected Future
<GetResponse
> makeCall(GetRequest batch
) {
122 return makeAsyncCall(apiConfig
, Method
.Get
, batch
, new GetResponse());
126 private final V3Batcher
<GetResponse
, GetRequest
, Reference
, Reference
> getByReferenceBatcher
=
127 new V3Batcher
<GetResponse
, GetRequest
, Reference
, Reference
>() {
129 final Object
getGroup(Reference value
) {
130 return value
.getPath().getElement(0);
134 final Reference
toPb(Reference value
) {
139 void addToBatch(Reference value
, GetRequest batch
) {
145 return datastoreServiceConfig
.maxBatchReadEntities
;
149 protected Future
<GetResponse
> makeCall(GetRequest batch
) {
150 return makeAsyncCall(apiConfig
, Method
.Get
, batch
, new GetResponse());
154 private final V3Batcher
<PutResponse
, PutRequest
, Entity
, EntityProto
> putBatcher
=
155 new V3Batcher
<PutResponse
, PutRequest
, Entity
, EntityProto
>() {
157 Object
getGroup(Entity value
) {
158 return value
.getKey().getRootKey();
162 void addToBatch(EntityProto value
, PutRequest batch
) {
163 batch
.addEntity(value
);
168 return datastoreServiceConfig
.maxBatchWriteEntities
;
172 protected Future
<PutResponse
> makeCall(PutRequest batch
) {
173 return makeAsyncCall(apiConfig
, Method
.Put
, batch
, new PutResponse());
177 EntityProto
toPb(Entity value
) {
178 return EntityTranslator
.convertToPb(value
);
182 private final ApiConfig apiConfig
;
184 public AsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig
,
185 ApiConfig apiConfig
, TransactionStack defaultTxnProvider
) {
186 super(datastoreServiceConfig
, defaultTxnProvider
,
187 new QueryRunnerV3(datastoreServiceConfig
, apiConfig
));
188 this.apiConfig
= apiConfig
;
192 protected TransactionImpl
.InternalTransaction
doBeginTransaction(TransactionOptions options
) {
193 DatastoreV3Pb
.Transaction remoteTxn
= new DatastoreV3Pb
.Transaction();
194 DatastoreV3Pb
.BeginTransactionRequest request
= new DatastoreV3Pb
.BeginTransactionRequest();
195 request
.setApp(datastoreServiceConfig
.getAppIdNamespace().getAppId());
196 request
.setAllowMultipleEg(options
.isXG());
198 Future
<DatastoreV3Pb
.Transaction
> future
=
199 DatastoreApiHelper
.makeAsyncCall(apiConfig
, Method
.BeginTransaction
, request
, remoteTxn
);
201 return new InternalTransactionV3(apiConfig
, request
.getApp(), future
);
205 protected final Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
, final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
) {
206 final GetRequest baseReq
= new GetRequest();
207 baseReq
.setAllowDeferred(true);
209 TransactionImpl
.ensureTxnActive(txn
);
210 baseReq
.setTransaction(InternalTransactionV3
.localTxnToRemoteTxn(txn
));
212 if (datastoreServiceConfig
.getReadPolicy().getConsistency() == EVENTUAL
) {
213 baseReq
.setFailoverMs(ARBITRARY_FAILOVER_READ_MS
);
214 baseReq
.setStrong(false);
217 final boolean shouldUseMultipleBatches
= getDatastoreType() != MASTER_SLAVE
&& txn
== null
218 && datastoreServiceConfig
.getReadPolicy().getConsistency() != EVENTUAL
;
220 Iterator
<GetRequest
> batches
= getByKeyBatcher
.getBatches(keysToGet
, baseReq
,
221 baseReq
.getSerializedSize(), shouldUseMultipleBatches
);
222 List
<Future
<GetResponse
>> futures
= getByKeyBatcher
.makeCalls(batches
);
224 return registerInTransaction(txn
, new MultiFuture
<GetResponse
, Map
<Key
, Entity
>>(futures
) {
226 * A Map from a Reference without an App Id specified to the corresponding Key that the user
227 * requested. This is a workaround for the Remote API to support matching requested Keys to
228 * Entities that may be from a different App Id .
230 private Map
<Reference
, Key
> keyMapIgnoringAppId
;
233 public Map
<Key
, Entity
> get() throws InterruptedException
, ExecutionException
{
235 aggregate(futures
, null, null);
236 } catch (TimeoutException e
) {
237 throw new RuntimeException(e
);
243 public Map
<Key
, Entity
> get(long timeout
, TimeUnit unit
)
244 throws InterruptedException
, ExecutionException
, TimeoutException
{
245 aggregate(futures
, timeout
, unit
);
250 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
251 * any results were deferred.
253 * @param currentFutures the Futures corresponding to the batches of the initial GetRequests.
254 * @param timeout the timeout to use while waiting on the Future, or null for none.
255 * @param timeoutUnit the unit of the timeout, or null for none.
257 private void aggregate(
258 Iterable
<Future
<GetResponse
>> currentFutures
, Long timeout
, TimeUnit timeoutUnit
)
259 throws ExecutionException
, InterruptedException
, TimeoutException
{
261 List
<Reference
> deferredRefs
= Lists
.newLinkedList();
263 for (Future
<GetResponse
> currentFuture
: currentFutures
) {
264 GetResponse resp
= getFutureWithOptionalTimeout(currentFuture
, timeout
, timeoutUnit
);
265 addEntitiesToResultMap(resp
);
266 deferredRefs
.addAll(resp
.deferreds());
269 if (deferredRefs
.isEmpty()) {
273 Iterator
<GetRequest
> followupBatches
= getByReferenceBatcher
.getBatches(deferredRefs
,
274 baseReq
, baseReq
.getSerializedSize(), shouldUseMultipleBatches
);
275 currentFutures
= getByReferenceBatcher
.makeCalls(followupBatches
);
280 * Convenience method to get the result of a Future and optionally specify a timeout.
282 * @param future the Future to get.
283 * @param timeout the timeout to use while waiting on the Future, or null for none.
284 * @param timeoutUnit the unit of the timeout, or null for none.
285 * @return the result of the Future.
286 * @throws TimeoutException will only ever be thrown if a timeout is provided.
288 private GetResponse
getFutureWithOptionalTimeout(
289 Future
<GetResponse
> future
, Long timeout
, TimeUnit timeoutUnit
)
290 throws ExecutionException
, InterruptedException
, TimeoutException
{
291 if (timeout
== null) {
294 return future
.get(timeout
, timeoutUnit
);
299 * Adds the Entities from the GetResponse to the resultMap. Will omit Keys that were missing.
300 * Handles Keys with different App Ids from the Entity.Key. See
301 * {@link #findKeyFromRequestIgnoringAppId(Reference)}
303 private void addEntitiesToResultMap(GetResponse response
) {
304 for (GetResponse
.Entity entityResult
: response
.entitys()) {
305 if (entityResult
.hasEntity()) {
306 Entity responseEntity
= EntityTranslator
.createFromPb(entityResult
.getEntity());
307 Key responseKey
= responseEntity
.getKey();
309 if (!keysToGet
.contains(responseKey
)) {
310 responseKey
= findKeyFromRequestIgnoringAppId(entityResult
.getEntity().getKey());
312 resultMap
.put(responseKey
, responseEntity
);
318 * This is a hack to support calls going through the Remote API. The problem is:
320 * The requested Key may have a local app id.
321 * The returned Entity may have a remote app id.
323 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
324 * user can always do map.get(keyFromRequest).
326 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
329 * Note that we used to be able to rely on the order of the Response Entities matching the
330 * order of Request Keys. We can no longer do so with the addition of Deferred results.
332 * @param referenceFromResponse the reference from the Response that did not match any of the
333 * requested Keys. (May be mutated.)
334 * @return the Key from the request that corresponds to the given Reference from the Response
337 private Key
findKeyFromRequestIgnoringAppId(Reference referenceFromResponse
) {
338 if (keyMapIgnoringAppId
== null) {
339 keyMapIgnoringAppId
= Maps
.newHashMap();
340 for (Key requestKey
: keysToGet
) {
341 Reference requestKeyAsRefWithoutApp
= KeyTranslator
.convertToPb(requestKey
).clearApp();
342 keyMapIgnoringAppId
.put(requestKeyAsRefWithoutApp
, requestKey
);
346 Key result
= keyMapIgnoringAppId
.get(referenceFromResponse
.clearApp());
347 if (result
== null) {
348 throw new DatastoreFailureException("Internal error");
356 protected Future
<List
<Key
>> doBatchPut( Transaction txn
,
357 final List
<Entity
> entities
) {
358 PutRequest baseReq
= new PutRequest();
360 TransactionImpl
.ensureTxnActive(txn
);
361 baseReq
.setTransaction(InternalTransactionV3
.localTxnToRemoteTxn(txn
));
363 boolean group
= !baseReq
.hasTransaction();
364 List
<Integer
> order
= Lists
.newArrayListWithCapacity(entities
.size());
365 Iterator
<PutRequest
> batches
= putBatcher
.getBatches(entities
, baseReq
,
366 baseReq
.getSerializedSize(), group
, order
);
367 List
<Future
<PutResponse
>> futures
= putBatcher
.makeCalls(batches
);
369 return registerInTransaction(txn
,
370 new ReorderingMultiFuture
<PutResponse
, List
<Key
>>(futures
, order
) {
372 protected List
<Key
> aggregate(
373 PutResponse intermediateResult
, Iterator
<Integer
> indexItr
, List
<Key
> result
) {
374 for (Reference reference
: intermediateResult
.keys()) {
375 int index
= indexItr
.next();
376 Key key
= entities
.get(index
).getKey();
377 KeyTranslator
.updateKey(reference
, key
);
378 result
.set(index
, key
);
384 protected List
<Key
> initResult(int size
) {
385 List
<Key
> result
= new ArrayList
<Key
>(Collections
.<Key
>nCopies(size
, null));
392 protected Future
<Void
> doBatchDelete( Transaction txn
, Collection
<Key
> keys
) {
393 DeleteRequest baseReq
= new DeleteRequest();
395 TransactionImpl
.ensureTxnActive(txn
);
396 baseReq
.setTransaction(InternalTransactionV3
.localTxnToRemoteTxn(txn
));
398 boolean group
= !baseReq
.hasTransaction();
399 Iterator
<DeleteRequest
> batches
= deleteBatcher
.getBatches(keys
, baseReq
,
400 baseReq
.getSerializedSize(), group
);
401 List
<Future
<DeleteResponse
>> futures
= deleteBatcher
.makeCalls(batches
);
402 return registerInTransaction(txn
, new MultiFuture
<DeleteResponse
, Void
>(futures
) {
404 public Void
get() throws InterruptedException
, ExecutionException
{
405 for (Future
<DeleteResponse
> future
: futures
) {
412 public Void
get(long timeout
, TimeUnit unit
)
413 throws InterruptedException
, ExecutionException
, TimeoutException
{
414 for (Future
<DeleteResponse
> future
: futures
) {
415 future
.get(timeout
, unit
);
422 static Reference
buildAllocateIdsRef(
423 Key parent
, String kind
, AppIdNamespace appIdNamespace
) {
424 if (parent
!= null && !parent
.isComplete()) {
425 throw new IllegalArgumentException("parent key must be complete");
427 Key key
= new Key(kind
, parent
, Key
.NOT_ASSIGNED
, "ignored", appIdNamespace
);
428 return KeyTranslator
.convertToPb(key
);
432 public Future
<KeyRange
> allocateIds(final Key parent
, final String kind
, long num
) {
434 throw new IllegalArgumentException("num must be > 0");
437 if (num
> 1000000000) {
438 throw new IllegalArgumentException("num must be < 1 billion");
441 final AppIdNamespace appIdNamespace
= datastoreServiceConfig
.getAppIdNamespace();
442 Reference allocateIdsRef
= buildAllocateIdsRef(parent
, kind
, appIdNamespace
);
443 AllocateIdsRequest req
=
444 new AllocateIdsRequest().setSize(num
).setModelKey(allocateIdsRef
);
445 AllocateIdsResponse resp
= new AllocateIdsResponse();
446 Future
<AllocateIdsResponse
> future
= makeAsyncCall(apiConfig
, Method
.AllocateIds
, req
, resp
);
447 return new FutureWrapper
<AllocateIdsResponse
, KeyRange
>(future
) {
449 protected KeyRange
wrap(AllocateIdsResponse resp
) throws Exception
{
450 return new KeyRange(parent
, kind
, resp
.getStart(), resp
.getEnd(), appIdNamespace
);
454 protected Throwable
convertException(Throwable cause
) {
461 public Future
<KeyRangeState
> allocateIdRange(final KeyRange range
) {
462 Key parent
= range
.getParent();
463 final String kind
= range
.getKind();
464 final long start
= range
.getStart().getId();
465 long end
= range
.getEnd().getId();
467 AllocateIdsRequest req
= new AllocateIdsRequest()
468 .setModelKey(AsyncDatastoreServiceImpl
.buildAllocateIdsRef(parent
, kind
, null))
470 AllocateIdsResponse resp
= new AllocateIdsResponse();
471 Future
<AllocateIdsResponse
> future
= makeAsyncCall(apiConfig
, Method
.AllocateIds
, req
, resp
);
472 return new FutureWrapper
<AllocateIdsResponse
, KeyRangeState
>(future
) {
473 @SuppressWarnings("deprecation")
475 protected KeyRangeState
wrap(AllocateIdsResponse resp
) throws Exception
{
476 Query query
= new Query(kind
).setKeysOnly();
478 Entity
.KEY_RESERVED_PROPERTY
, FilterOperator
.GREATER_THAN_OR_EQUAL
, range
.getStart());
480 Entity
.KEY_RESERVED_PROPERTY
, FilterOperator
.LESS_THAN_OR_EQUAL
, range
.getEnd());
481 List
<Entity
> collision
= prepare(query
).asList(withLimit(1));
483 if (!collision
.isEmpty()) {
484 return KeyRangeState
.COLLISION
;
487 boolean raceCondition
= start
< resp
.getStart();
488 return raceCondition ? KeyRangeState
.CONTENTION
: KeyRangeState
.EMPTY
;
492 protected Throwable
convertException(Throwable cause
) {
499 public Future
<Map
<Index
, IndexState
>> getIndexes() {
500 StringProto req
= new StringProto();
501 req
.setValue(datastoreServiceConfig
.getAppIdNamespace().getAppId());
502 return new FutureWrapper
<CompositeIndices
, Map
<Index
, IndexState
>>(makeAsyncCall(apiConfig
,
503 Method
.GetIndices
, req
, new CompositeIndices())) {
505 protected Map
<Index
, IndexState
> wrap(CompositeIndices indices
) throws Exception
{
506 Map
<Index
, IndexState
> answer
= new LinkedHashMap
<Index
, IndexState
>();
507 for (CompositeIndex ci
: indices
.indexs()) {
508 Index index
= IndexTranslator
.convertFromPb(ci
);
509 switch (ci
.getStateEnum()) {
511 answer
.put(index
, IndexState
.DELETING
);
514 answer
.put(index
, IndexState
.ERROR
);
517 answer
.put(index
, IndexState
.SERVING
);
520 answer
.put(index
, IndexState
.BUILDING
);
523 logger
.log(Level
.WARNING
, "Unrecognized index state for " + index
);
531 protected Throwable
convertException(Throwable cause
) {