1 package com
.google
.appengine
.api
.datastore
;
3 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
4 import com
.google
.common
.collect
.MapMaker
;
5 import com
.google
.common
.collect
.Maps
;
6 import com
.google
.common
.primitives
.Bytes
;
7 import com
.google
.datastore
.v1beta3
.BeginTransactionResponse
;
8 import com
.google
.datastore
.v1beta3
.CommitRequest
;
9 import com
.google
.datastore
.v1beta3
.Mutation
;
10 import com
.google
.datastore
.v1beta3
.RollbackRequest
;
11 import com
.google
.protobuf
.ByteString
;
12 import com
.google
.protobuf
.InvalidProtocolBufferException
;
14 import java
.util
.Collection
;
16 import java
.util
.concurrent
.Future
;
17 import java
.util
.concurrent
.atomic
.AtomicLong
;
20 * Implementation of the Cloud Datastore v1 specific logic to handle a {@link Transaction}.
22 * <p>In Cloud Datastore, puts and gets are stored on the client until commit. This class serializes
23 * mutations as they are received to avoid memory penalties associated with the full
26 class InternalTransactionCloudDatastoreV1
implements TransactionImpl
.InternalTransaction
{
27 private final CommitRequest
.Builder commitReqBuilder
= CommitRequest
.newBuilder();
30 * Generates a unique identifier (for a given runtime) which can be used for later
31 * lookup of the instance.
33 private static final AtomicLong clientIdGenerator
= new AtomicLong();
36 * Used to store {@link InternalTransactionCloudDatastoreV1} objects for reidentification when a
37 * potentially wrapped Transaction object is passed back to the SDK in a future call.
38 * Each {@link InternalTransactionCloudDatastoreV1} instance is wrapped in a
39 * {@link TransactionImpl}. We use weak references in this static map because this object's
40 * purpose is tied to the lifetime of the wrapper.
42 private static final Map
<String
, InternalTransactionCloudDatastoreV1
>
43 internalTransactionRegister
= new MapMaker().weakValues().makeMap();
46 * The ID reported through {@link #getId()}. This ID is also used for instance lookup, see
47 * {@link #getById(String)}.
49 private final String clientId
= Long
.toString(clientIdGenerator
.getAndIncrement());
52 * The list of mutations (deferred Put/Delete operations) that will be sent to the server as part
53 * of the Commit RPC. A linked map is used to generate consistent results for unit tests;
54 * however iteration order shouldn't affect correctness.
56 private final Map
<com
.google
.datastore
.v1beta3
.Key
, byte[]> mutationMap
= Maps
.newLinkedHashMap();
59 * The {@link Future} associated with the BeginTransaction RPC we sent to the
62 private final Future
<BeginTransactionResponse
> beginTxnFuture
;
64 protected final CloudDatastoreV1Proxy dsApiProxy
;
66 private boolean isWritable
= true;
69 * Objects should be created with {@link #create(CloudDatastoreV1Proxy, Future)} due
70 * to post-construction manipulation.
72 private InternalTransactionCloudDatastoreV1(CloudDatastoreV1Proxy dsApiProxy
,
73 Future
<BeginTransactionResponse
> beginTxnFuture
) {
74 this.dsApiProxy
= dsApiProxy
;
75 this.beginTxnFuture
= beginTxnFuture
;
78 static TransactionImpl
.InternalTransaction
create(CloudDatastoreV1Proxy dsApiProxy
,
79 Future
<BeginTransactionResponse
> future
) {
80 return registerTxn(new InternalTransactionCloudDatastoreV1(dsApiProxy
, future
));
84 * Convert a mutation to a format suitable for committing later.
86 byte[] serializeMutation(Mutation mutation
) {
87 byte[] bytes
= commitReqBuilder
.addMutations(mutation
).buildPartial().toByteArray();
88 commitReqBuilder
.clearMutations();
93 * Convert the partial proto segments into a serialized {@link CommitRequest}.
95 Future
<?
> sendCommit(Collection
<byte[]> mutations
) {
96 byte[][] protoSegmentsArray
= new byte[mutations
.size() + 1][];
97 protoSegmentsArray
[0] = CommitRequest
.newBuilder()
98 .setTransaction(getHandle())
99 .build().toByteArray();
101 for (byte[] mutData
: mutations
) {
102 protoSegmentsArray
[arrayIndex
++] = mutData
;
105 return dsApiProxy
.rawCommit(Bytes
.concat(protoSegmentsArray
));
106 } catch (InvalidProtocolBufferException e
) {
107 throw new RuntimeException("Unexpected error.", e
);
112 * Register a new transaction on the internal roaster.
113 * @return The txn, for chaining.
115 static InternalTransactionCloudDatastoreV1
registerTxn(InternalTransactionCloudDatastoreV1 txn
) {
116 internalTransactionRegister
.put(txn
.clientId
, txn
);
121 * Provides the unique identifier for the txn.
122 * Blocks on the future since the handle comes back from the datastore
125 ByteString
getHandle() {
126 return FutureHelper
.quietGet(beginTxnFuture
).getTransaction();
130 * Schedules a put operation for when this transaction is committed.
132 void deferPut(Entity entity
) {
133 deferPut(DataTypeTranslator
.toV1Entity(entity
));
136 void deferPut(com
.google
.datastore
.v1beta3
.Entity
.Builder entityProto
) {
138 mutationMap
.put(entityProto
.getKey(), serializeMutation(Mutation
.newBuilder()
139 .setOp(Mutation
.Operation
.UPSERT
)
140 .setEntity(entityProto
)
144 void deferDelete(Key key
) {
146 com
.google
.datastore
.v1beta3
.Key keyV1
= DataTypeTranslator
.toV1Key(key
).build();
147 mutationMap
.put(keyV1
, serializeMutation(Mutation
.newBuilder()
148 .setOp(Mutation
.Operation
.DELETE
)
154 public Future
<Void
> doCommitAsync() {
156 Future
<Void
> result
= new VoidFutureWrapper
<>(sendCommit(mutationMap
.values()));
162 public Future
<Void
> doRollbackAsync() {
165 return new VoidFutureWrapper
<>(dsApiProxy
.rollback(
166 RollbackRequest
.newBuilder().setTransaction(getHandle()).build()));
170 public String
getId() {
174 private void checkWritable() {
176 throw new IllegalStateException("Transaction is not writable.");
181 * Locates the {@link InternalTransactionCloudDatastoreV1} object associated with a
182 * {@link Transaction} by looking up the ID in an static, threadsafe map.
183 * @throws IllegalArgumentException If a txn object is not found.
184 * @return Internal transaction object associated with the given ID.
186 static InternalTransactionCloudDatastoreV1
getById(String txnClientId
) {
187 InternalTransactionCloudDatastoreV1 txnImpl
= internalTransactionRegister
.get(txnClientId
);
188 if (txnImpl
== null) {
189 throw new IllegalArgumentException("Transaction not found with ID: " + txnClientId
);
194 private static class VoidFutureWrapper
<T
> extends FutureWrapper
<T
, Void
> {
195 public VoidFutureWrapper(Future
<T
> parent
) {
200 protected Void
wrap(T ignore
) throws Exception
{
205 protected Throwable
convertException(Throwable cause
) {