1 package com
.google
.appengine
.api
.datastore
;
3 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
4 import com
.google
.apphosting
.datastore
.DatastoreV4
;
5 import com
.google
.apphosting
.datastore
.DatastoreV4
.BeginTransactionResponse
;
6 import com
.google
.apphosting
.datastore
.DatastoreV4
.CommitRequest
;
7 import com
.google
.apphosting
.datastore
.DatastoreV4
.Mutation
;
8 import com
.google
.apphosting
.datastore
.DatastoreV4
.RollbackRequest
;
9 import com
.google
.apphosting
.datastore
.EntityV4
;
10 import com
.google
.common
.collect
.MapMaker
;
11 import com
.google
.common
.collect
.Maps
;
12 import com
.google
.protobuf
.ByteString
;
14 import java
.util
.Collection
;
16 import java
.util
.concurrent
.Future
;
17 import java
.util
.concurrent
.atomic
.AtomicLong
;
20 * Implementation of the V4-specific logic to handle a {@link Transaction}.
22 * In V4, puts and gets are stored on the client until commit. This class serializes
23 * mutations as they are received to avoid memory penalties associated with the full
26 * This object is subclassed to manage the two mutation styles.
28 * @param <T> Internal storage format of each mutation.
30 abstract class BaseInternalTransactionV4
<T
> implements TransactionImpl
.InternalTransaction
{
33 * Generates a unique identifier (for a given runtime) which can be used for later
34 * lookup of the instance.
36 private static final AtomicLong clientIdGenerator
= new AtomicLong();
39 * Used to store {@link BaseInternalTransactionV4} objects for reidentification when a
40 * potentially wrapped Transaction object is passed back to the SDK in a future call.
41 * Each {@link BaseInternalTransactionV4} instance is wrapped in a {@link TransactionImpl}.
42 * We use weak references in this static map because this object's purpose is tied to
43 * the lifetime of the wrapper.
45 private static final Map
<String
, BaseInternalTransactionV4
<?
>> internalTransactionRegister
=
46 new MapMaker().weakValues().makeMap();
49 * The ID reported through {@link #getId()}. This ID is also used for instance lookup, see
50 * {@link #getById(String)}.
52 private final String clientId
= Long
.toString(clientIdGenerator
.getAndIncrement());
55 * The list of mutations (deferred Put/Delete operations) that will be sent to the server as part
56 * of the Commit RPC. A linked map is used to generate consistent results for unit tests;
57 * however iteration order shouldn't affect correctness.
59 private final Map
<EntityV4
.Key
, T
> mutationMap
= Maps
.newLinkedHashMap();
62 * The {@link Future} associated with the BeginTransaction RPC we sent to the
65 private final Future
<BeginTransactionResponse
> beginTxnFuture
;
67 protected final DatastoreV4Proxy dsApiProxy
;
69 private boolean isWritable
= true;
71 BaseInternalTransactionV4(DatastoreV4Proxy dsApiProxy
,
72 Future
<BeginTransactionResponse
> beginTxnFuture
) {
73 this.dsApiProxy
= dsApiProxy
;
74 this.beginTxnFuture
= beginTxnFuture
;
78 * Convert a mutation to a format suitable for committing later.
80 abstract T
serializeMutation(Mutation mutation
);
83 * Convert the partial proto segments into a serialized {@link CommitRequest}.
85 abstract Future
<?
> sendCommit(Collection
<T
> mutations
);
88 * Register a new transaction on the internal roaster.
89 * @return The txn, for chaining.
91 static <T
> BaseInternalTransactionV4
<T
> registerTxn(BaseInternalTransactionV4
<T
> txn
) {
92 internalTransactionRegister
.put(txn
.clientId
, txn
);
97 * Provides the unique identifier for the txn.
98 * Blocks on the future since the handle comes back from the datastore
101 ByteString
getHandle() {
102 return FutureHelper
.quietGet(beginTxnFuture
).getTransaction();
106 * Schedules a put operation for when this transaction is committed.
108 void deferPut(Entity entity
) {
109 deferPut(DataTypeTranslator
.toV4Entity(entity
));
112 void deferPut(EntityV4
.Entity
.Builder entityProto
) {
114 mutationMap
.put(entityProto
.getKey(),
115 serializeMutation(Mutation
.newBuilder()
116 .setOp(DatastoreV4
.Mutation
.Operation
.UPSERT
)
117 .setEntity(entityProto
)
121 void deferDelete(Key key
) {
123 EntityV4
.Key v4Key
= DataTypeTranslator
.toV4Key(key
).build();
124 mutationMap
.put(v4Key
,
125 serializeMutation(Mutation
.newBuilder()
126 .setOp(DatastoreV4
.Mutation
.Operation
.DELETE
)
132 public Future
<Void
> doCommitAsync() {
134 Future
<Void
> result
= new VoidFutureWrapper
<>(sendCommit(mutationMap
.values()));
140 public Future
<Void
> doRollbackAsync() {
143 return new VoidFutureWrapper
<>(dsApiProxy
.rollback(
144 RollbackRequest
.newBuilder().setTransaction(getHandle()).build()));
148 public String
getId() {
152 private void checkWritable() {
154 throw new IllegalStateException("Transaction is not writable.");
159 * Locates the {@link BaseInternalTransactionV4} object associated with a
160 * {@link Transaction} by looking up the ID in an static, threadsafe map.
161 * @throws IllegalArgumentException If a txn object is not found.
162 * @return Internal transaction object associated with the given ID.
164 static BaseInternalTransactionV4
<?
> getById(String txnClientId
) {
165 BaseInternalTransactionV4
<?
> txnImpl
= internalTransactionRegister
.get(txnClientId
);
166 if (txnImpl
== null) {
167 throw new IllegalArgumentException("Transaction not found with ID: " + txnClientId
);
172 private static class VoidFutureWrapper
<T
> extends FutureWrapper
<T
, Void
> {
173 public VoidFutureWrapper(Future
<T
> parent
) {
178 protected Void
wrap(T ignore
) throws Exception
{
183 protected Throwable
convertException(Throwable cause
) {