1 package com
.google
.appengine
.api
.datastore
;
3 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
4 import com
.google
.common
.collect
.MapMaker
;
5 import com
.google
.common
.collect
.Maps
;
6 import com
.google
.common
.primitives
.Bytes
;
7 import com
.google
.datastore
.v1beta3
.BeginTransactionResponse
;
8 import com
.google
.datastore
.v1beta3
.CommitRequest
;
9 import com
.google
.datastore
.v1beta3
.Mutation
;
10 import com
.google
.datastore
.v1beta3
.RollbackRequest
;
11 import com
.google
.protobuf
.ByteString
;
12 import com
.google
.protobuf
.InvalidProtocolBufferException
;
14 import java
.util
.Collection
;
16 import java
.util
.concurrent
.Future
;
17 import java
.util
.concurrent
.atomic
.AtomicLong
;
20 * Implementation of the Cloud Datastore v1 specific logic to handle a {@link Transaction}.
22 * <p>In Cloud Datastore, puts and gets are stored on the client until commit. This class serializes
23 * mutations as they are received to avoid memory penalties associated with the full
26 class InternalTransactionCloudDatastoreV1
implements TransactionImpl
.InternalTransaction
{
28 * Prefix for transaction IDs ({@link #getId()}). Enables this class to reliably distinguish
29 * transactions it created from v3 transactions, whose IDs are always entirely numerical.
31 private static final String TXN_ID_PREFIX
= "v1-";
33 private final CommitRequest
.Builder commitReqBuilder
= CommitRequest
.newBuilder();
36 * Generates a unique identifier (for a given runtime) which can be used for later
37 * lookup of the instance.
39 private static final AtomicLong clientIdGenerator
= new AtomicLong();
42 * Used to store {@link InternalTransactionCloudDatastoreV1} objects for reidentification when a
43 * potentially wrapped Transaction object is passed back to the SDK in a future call.
44 * Each {@link InternalTransactionCloudDatastoreV1} instance is wrapped in a
45 * {@link TransactionImpl}. We use weak references in this static map because this object's
46 * purpose is tied to the lifetime of the wrapper.
48 private static final Map
<String
, InternalTransactionCloudDatastoreV1
>
49 internalTransactionRegister
= new MapMaker().weakValues().makeMap();
52 * The ID reported through {@link #getId()}. This ID is also used for instance lookup, see
53 * {@link #get(Transaction)}.
55 private final String clientId
= TXN_ID_PREFIX
56 + Long
.toString(clientIdGenerator
.getAndIncrement());
59 * The list of mutations (deferred Put/Delete operations) that will be sent to the server as part
60 * of the Commit RPC. A linked map is used to generate consistent results for unit tests;
61 * however iteration order shouldn't affect correctness.
63 private final Map
<com
.google
.datastore
.v1beta3
.Key
, byte[]> mutationMap
= Maps
.newLinkedHashMap();
66 * The {@link Future} associated with the BeginTransaction RPC we sent to the
69 private final Future
<BeginTransactionResponse
> beginTxnFuture
;
71 protected final CloudDatastoreV1Proxy dsApiProxy
;
73 private boolean isWritable
= true;
76 * Objects should be created with {@link #create(CloudDatastoreV1Proxy, Future)} due
77 * to post-construction manipulation.
79 private InternalTransactionCloudDatastoreV1(CloudDatastoreV1Proxy dsApiProxy
,
80 Future
<BeginTransactionResponse
> beginTxnFuture
) {
81 this.dsApiProxy
= dsApiProxy
;
82 this.beginTxnFuture
= beginTxnFuture
;
85 static TransactionImpl
.InternalTransaction
create(CloudDatastoreV1Proxy dsApiProxy
,
86 Future
<BeginTransactionResponse
> future
) {
87 return registerTxn(new InternalTransactionCloudDatastoreV1(dsApiProxy
, future
));
91 * Convert a mutation to a format suitable for committing later.
93 byte[] serializeMutation(Mutation mutation
) {
94 byte[] bytes
= commitReqBuilder
.addMutations(mutation
).buildPartial().toByteArray();
95 commitReqBuilder
.clearMutations();
100 * Convert the partial proto segments into a serialized {@link CommitRequest}.
102 Future
<?
> sendCommit(Collection
<byte[]> mutations
) {
103 byte[][] protoSegmentsArray
= new byte[mutations
.size() + 1][];
104 protoSegmentsArray
[0] = CommitRequest
.newBuilder()
105 .setTransaction(getTransactionBytes())
106 .build().toByteArray();
108 for (byte[] mutData
: mutations
) {
109 protoSegmentsArray
[arrayIndex
++] = mutData
;
112 return dsApiProxy
.rawCommit(Bytes
.concat(protoSegmentsArray
));
113 } catch (InvalidProtocolBufferException e
) {
114 throw new RuntimeException("Unexpected error.", e
);
119 * Register a new transaction on the internal roaster.
120 * @return The txn, for chaining.
122 static InternalTransactionCloudDatastoreV1
registerTxn(InternalTransactionCloudDatastoreV1 txn
) {
123 internalTransactionRegister
.put(txn
.clientId
, txn
);
128 * Returns the transaction bytes for this transaction.
129 * Blocks on the future since the bytes are returned by the datastore server.
131 ByteString
getTransactionBytes() {
132 return FutureHelper
.quietGet(beginTxnFuture
).getTransaction();
136 * Schedules a put operation for when this transaction is committed.
138 void deferPut(Entity entity
) {
139 deferPut(DataTypeTranslator
.toV1Entity(entity
));
142 void deferPut(com
.google
.datastore
.v1beta3
.Entity
.Builder entityProto
) {
144 mutationMap
.put(entityProto
.getKey(), serializeMutation(Mutation
.newBuilder()
145 .setUpsert(entityProto
)
149 void deferDelete(Key key
) {
151 com
.google
.datastore
.v1beta3
.Key keyV1
= DataTypeTranslator
.toV1Key(key
).build();
152 mutationMap
.put(keyV1
, serializeMutation(Mutation
.newBuilder()
158 public Future
<Void
> doCommitAsync() {
160 Future
<Void
> result
= new VoidFutureWrapper
<>(sendCommit(mutationMap
.values()));
166 public Future
<Void
> doRollbackAsync() {
169 return new VoidFutureWrapper
<>(dsApiProxy
.rollback(
170 RollbackRequest
.newBuilder().setTransaction(getTransactionBytes()).build()));
174 public String
getId() {
178 private void checkWritable() {
180 throw new IllegalStateException("Transaction is not writable.");
185 * Locates the {@link InternalTransactionCloudDatastoreV1} object associated with a
186 * {@link Transaction} by looking up the ID in an static, threadsafe map.
188 * @throws IllegalArgumentException If a txn object is not found.
189 * @return Internal transaction object associated with the given ID.
191 static InternalTransactionCloudDatastoreV1
get(Transaction txn
) {
192 String txnId
= txn
.getId();
193 InternalTransactionCloudDatastoreV1 txnImpl
= internalTransactionRegister
.get(txnId
);
194 if (txnImpl
== null) {
195 throw new IllegalArgumentException("Transaction not found with ID: " + txnId
);
200 static boolean isV1Transaction(Transaction txn
) {
201 return internalTransactionRegister
.containsKey(txn
.getId());
204 private static class VoidFutureWrapper
<T
> extends FutureWrapper
<T
, Void
> {
205 public VoidFutureWrapper(Future
<T
> parent
) {
210 protected Void
wrap(T ignore
) throws Exception
{
215 protected Throwable
convertException(Throwable cause
) {