1.9.30 sync.
[gae.git] / java / src / main / com / google / appengine / api / datastore / InternalTransactionCloudDatastoreV1.java
blobe4ff95f0516ba02655607835bc0941697205ee64
1 package com.google.appengine.api.datastore;
3 import com.google.appengine.api.utils.FutureWrapper;
4 import com.google.common.collect.MapMaker;
5 import com.google.common.collect.Maps;
6 import com.google.common.primitives.Bytes;
7 import com.google.datastore.v1beta3.BeginTransactionResponse;
8 import com.google.datastore.v1beta3.CommitRequest;
9 import com.google.datastore.v1beta3.Mutation;
10 import com.google.datastore.v1beta3.RollbackRequest;
11 import com.google.protobuf.ByteString;
12 import com.google.protobuf.InvalidProtocolBufferException;
14 import java.util.Collection;
15 import java.util.Map;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.atomic.AtomicLong;
19 /**
20 * Implementation of the Cloud Datastore v1 specific logic to handle a {@link Transaction}.
22 * <p>In Cloud Datastore, puts and gets are stored on the client until commit. This class serializes
23 * mutations as they are received to avoid memory penalties associated with the full
24 * proto objects.
26 class InternalTransactionCloudDatastoreV1 implements TransactionImpl.InternalTransaction {
27 /**
28 * Prefix for transaction IDs ({@link #getId()}). Enables this class to reliably distinguish
29 * transactions it created from v3 transactions, whose IDs are always entirely numerical.
31 private static final String TXN_ID_PREFIX = "v1-";
33 private final CommitRequest.Builder commitReqBuilder = CommitRequest.newBuilder();
35 /**
36 * Generates a unique identifier (for a given runtime) which can be used for later
37 * lookup of the instance.
39 private static final AtomicLong clientIdGenerator = new AtomicLong();
41 /**
42 * Used to store {@link InternalTransactionCloudDatastoreV1} objects for reidentification when a
43 * potentially wrapped Transaction object is passed back to the SDK in a future call.
44 * Each {@link InternalTransactionCloudDatastoreV1} instance is wrapped in a
45 * {@link TransactionImpl}. We use weak references in this static map because this object's
46 * purpose is tied to the lifetime of the wrapper.
48 private static final Map<String, InternalTransactionCloudDatastoreV1>
49 internalTransactionRegister = new MapMaker().weakValues().makeMap();
51 /**
52 * The ID reported through {@link #getId()}. This ID is also used for instance lookup, see
53 * {@link #get(Transaction)}.
55 private final String clientId = TXN_ID_PREFIX
56 + Long.toString(clientIdGenerator.getAndIncrement());
58 /**
59 * The list of mutations (deferred Put/Delete operations) that will be sent to the server as part
60 * of the Commit RPC. A linked map is used to generate consistent results for unit tests;
61 * however iteration order shouldn't affect correctness.
63 private final Map<com.google.datastore.v1beta3.Key, byte[]> mutationMap = Maps.newLinkedHashMap();
65 /**
66 * The {@link Future} associated with the BeginTransaction RPC we sent to the
67 * datastore server.
69 private final Future<BeginTransactionResponse> beginTxnFuture;
71 protected final CloudDatastoreV1Proxy dsApiProxy;
73 private boolean isWritable = true;
75 /**
76 * Objects should be created with {@link #create(CloudDatastoreV1Proxy, Future)} due
77 * to post-construction manipulation.
79 private InternalTransactionCloudDatastoreV1(CloudDatastoreV1Proxy dsApiProxy,
80 Future<BeginTransactionResponse> beginTxnFuture) {
81 this.dsApiProxy = dsApiProxy;
82 this.beginTxnFuture = beginTxnFuture;
85 static TransactionImpl.InternalTransaction create(CloudDatastoreV1Proxy dsApiProxy,
86 Future<BeginTransactionResponse> future) {
87 return registerTxn(new InternalTransactionCloudDatastoreV1(dsApiProxy, future));
90 /**
91 * Convert a mutation to a format suitable for committing later.
93 byte[] serializeMutation(Mutation mutation) {
94 byte[] bytes = commitReqBuilder.addMutations(mutation).buildPartial().toByteArray();
95 commitReqBuilder.clearMutations();
96 return bytes;
99 /**
100 * Convert the partial proto segments into a serialized {@link CommitRequest}.
102 Future<?> sendCommit(Collection<byte[]> mutations) {
103 byte[][] protoSegmentsArray = new byte[mutations.size() + 1][];
104 protoSegmentsArray[0] = CommitRequest.newBuilder()
105 .setTransaction(getTransactionBytes())
106 .build().toByteArray();
107 int arrayIndex = 1;
108 for (byte[] mutData : mutations) {
109 protoSegmentsArray[arrayIndex++] = mutData;
111 try {
112 return dsApiProxy.rawCommit(Bytes.concat(protoSegmentsArray));
113 } catch (InvalidProtocolBufferException e) {
114 throw new RuntimeException("Unexpected error.", e);
119 * Register a new transaction on the internal roaster.
120 * @return The txn, for chaining.
122 static InternalTransactionCloudDatastoreV1 registerTxn(InternalTransactionCloudDatastoreV1 txn) {
123 internalTransactionRegister.put(txn.clientId, txn);
124 return txn;
128 * Returns the transaction bytes for this transaction.
129 * Blocks on the future since the bytes are returned by the datastore server.
131 ByteString getTransactionBytes() {
132 return FutureHelper.quietGet(beginTxnFuture).getTransaction();
136 * Schedules a put operation for when this transaction is committed.
138 void deferPut(Entity entity) {
139 deferPut(DataTypeTranslator.toV1Entity(entity));
142 void deferPut(com.google.datastore.v1beta3.Entity.Builder entityProto) {
143 checkWritable();
144 mutationMap.put(entityProto.getKey(), serializeMutation(Mutation.newBuilder()
145 .setUpsert(entityProto)
146 .build()));
149 void deferDelete(Key key) {
150 checkWritable();
151 com.google.datastore.v1beta3.Key keyV1 = DataTypeTranslator.toV1Key(key).build();
152 mutationMap.put(keyV1, serializeMutation(Mutation.newBuilder()
153 .setDelete(keyV1)
154 .build()));
157 @Override
158 public Future<Void> doCommitAsync() {
159 isWritable = false;
160 Future<Void> result = new VoidFutureWrapper<>(sendCommit(mutationMap.values()));
161 mutationMap.clear();
162 return result;
165 @Override
166 public Future<Void> doRollbackAsync() {
167 isWritable = false;
168 mutationMap.clear();
169 return new VoidFutureWrapper<>(dsApiProxy.rollback(
170 RollbackRequest.newBuilder().setTransaction(getTransactionBytes()).build()));
173 @Override
174 public String getId() {
175 return clientId;
178 private void checkWritable() {
179 if (!isWritable) {
180 throw new IllegalStateException("Transaction is not writable.");
185 * Locates the {@link InternalTransactionCloudDatastoreV1} object associated with a
186 * {@link Transaction} by looking up the ID in an static, threadsafe map.
188 * @throws IllegalArgumentException If a txn object is not found.
189 * @return Internal transaction object associated with the given ID.
191 static InternalTransactionCloudDatastoreV1 get(Transaction txn) {
192 String txnId = txn.getId();
193 InternalTransactionCloudDatastoreV1 txnImpl = internalTransactionRegister.get(txnId);
194 if (txnImpl == null) {
195 throw new IllegalArgumentException("Transaction not found with ID: " + txnId);
197 return txnImpl;
200 static boolean isV1Transaction(Transaction txn) {
201 return internalTransactionRegister.containsKey(txn.getId());
204 private static class VoidFutureWrapper<T> extends FutureWrapper<T, Void> {
205 public VoidFutureWrapper(Future<T> parent) {
206 super(parent);
209 @Override
210 protected Void wrap(T ignore) throws Exception {
211 return null;
214 @Override
215 protected Throwable convertException(Throwable cause) {
216 return cause;