Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / InternalTransactionCloudDatastoreV1.java
blob4e1ac110700bade7c1a356cdd73228437f2e8108
1 package com.google.appengine.api.datastore;
3 import com.google.appengine.api.utils.FutureWrapper;
4 import com.google.common.collect.MapMaker;
5 import com.google.common.collect.Maps;
6 import com.google.common.primitives.Bytes;
7 import com.google.datastore.v1beta3.BeginTransactionResponse;
8 import com.google.datastore.v1beta3.CommitRequest;
9 import com.google.datastore.v1beta3.Mutation;
10 import com.google.datastore.v1beta3.RollbackRequest;
11 import com.google.protobuf.ByteString;
12 import com.google.protobuf.InvalidProtocolBufferException;
14 import java.util.Collection;
15 import java.util.Map;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.atomic.AtomicLong;
19 /**
20 * Implementation of the Cloud Datastore v1 specific logic to handle a {@link Transaction}.
22 * <p>In Cloud Datastore, puts and gets are stored on the client until commit. This class serializes
23 * mutations as they are received to avoid memory penalties associated with the full
24 * proto objects.
26 class InternalTransactionCloudDatastoreV1 implements TransactionImpl.InternalTransaction {
27 private final CommitRequest.Builder commitReqBuilder = CommitRequest.newBuilder();
29 /**
30 * Generates a unique identifier (for a given runtime) which can be used for later
31 * lookup of the instance.
33 private static final AtomicLong clientIdGenerator = new AtomicLong();
35 /**
36 * Used to store {@link InternalTransactionCloudDatastoreV1} objects for reidentification when a
37 * potentially wrapped Transaction object is passed back to the SDK in a future call.
38 * Each {@link InternalTransactionCloudDatastoreV1} instance is wrapped in a
39 * {@link TransactionImpl}. We use weak references in this static map because this object's
40 * purpose is tied to the lifetime of the wrapper.
42 private static final Map<String, InternalTransactionCloudDatastoreV1>
43 internalTransactionRegister = new MapMaker().weakValues().makeMap();
45 /**
46 * The ID reported through {@link #getId()}. This ID is also used for instance lookup, see
47 * {@link #getById(String)}.
49 private final String clientId = Long.toString(clientIdGenerator.getAndIncrement());
51 /**
52 * The list of mutations (deferred Put/Delete operations) that will be sent to the server as part
53 * of the Commit RPC. A linked map is used to generate consistent results for unit tests;
54 * however iteration order shouldn't affect correctness.
56 private final Map<com.google.datastore.v1beta3.Key, byte[]> mutationMap = Maps.newLinkedHashMap();
58 /**
59 * The {@link Future} associated with the BeginTransaction RPC we sent to the
60 * datastore server.
62 private final Future<BeginTransactionResponse> beginTxnFuture;
64 protected final CloudDatastoreV1Proxy dsApiProxy;
66 private boolean isWritable = true;
68 /**
69 * Objects should be created with {@link #create(CloudDatastoreV1Proxy, Future)} due
70 * to post-construction manipulation.
72 private InternalTransactionCloudDatastoreV1(CloudDatastoreV1Proxy dsApiProxy,
73 Future<BeginTransactionResponse> beginTxnFuture) {
74 this.dsApiProxy = dsApiProxy;
75 this.beginTxnFuture = beginTxnFuture;
78 static TransactionImpl.InternalTransaction create(CloudDatastoreV1Proxy dsApiProxy,
79 Future<BeginTransactionResponse> future) {
80 return registerTxn(new InternalTransactionCloudDatastoreV1(dsApiProxy, future));
83 /**
84 * Convert a mutation to a format suitable for committing later.
86 byte[] serializeMutation(Mutation mutation) {
87 byte[] bytes = commitReqBuilder.addMutations(mutation).buildPartial().toByteArray();
88 commitReqBuilder.clearMutations();
89 return bytes;
92 /**
93 * Convert the partial proto segments into a serialized {@link CommitRequest}.
95 Future<?> sendCommit(Collection<byte[]> mutations) {
96 byte[][] protoSegmentsArray = new byte[mutations.size() + 1][];
97 protoSegmentsArray[0] = CommitRequest.newBuilder()
98 .setTransaction(getHandle())
99 .build().toByteArray();
100 int arrayIndex = 1;
101 for (byte[] mutData : mutations) {
102 protoSegmentsArray[arrayIndex++] = mutData;
104 try {
105 return dsApiProxy.rawCommit(Bytes.concat(protoSegmentsArray));
106 } catch (InvalidProtocolBufferException e) {
107 throw new RuntimeException("Unexpected error.", e);
112 * Register a new transaction on the internal roaster.
113 * @return The txn, for chaining.
115 static InternalTransactionCloudDatastoreV1 registerTxn(InternalTransactionCloudDatastoreV1 txn) {
116 internalTransactionRegister.put(txn.clientId, txn);
117 return txn;
121 * Provides the unique identifier for the txn.
122 * Blocks on the future since the handle comes back from the datastore
123 * server.
125 ByteString getHandle() {
126 return FutureHelper.quietGet(beginTxnFuture).getTransaction();
130 * Schedules a put operation for when this transaction is committed.
132 void deferPut(Entity entity) {
133 deferPut(DataTypeTranslator.toV1Entity(entity));
136 void deferPut(com.google.datastore.v1beta3.Entity.Builder entityProto) {
137 checkWritable();
138 mutationMap.put(entityProto.getKey(), serializeMutation(Mutation.newBuilder()
139 .setOp(Mutation.Operation.UPSERT)
140 .setEntity(entityProto)
141 .build()));
144 void deferDelete(Key key) {
145 checkWritable();
146 com.google.datastore.v1beta3.Key keyV1 = DataTypeTranslator.toV1Key(key).build();
147 mutationMap.put(keyV1, serializeMutation(Mutation.newBuilder()
148 .setOp(Mutation.Operation.DELETE)
149 .setKey(keyV1)
150 .build()));
153 @Override
154 public Future<Void> doCommitAsync() {
155 isWritable = false;
156 Future<Void> result = new VoidFutureWrapper<>(sendCommit(mutationMap.values()));
157 mutationMap.clear();
158 return result;
161 @Override
162 public Future<Void> doRollbackAsync() {
163 isWritable = false;
164 mutationMap.clear();
165 return new VoidFutureWrapper<>(dsApiProxy.rollback(
166 RollbackRequest.newBuilder().setTransaction(getHandle()).build()));
169 @Override
170 public String getId() {
171 return clientId;
174 private void checkWritable() {
175 if (!isWritable) {
176 throw new IllegalStateException("Transaction is not writable.");
181 * Locates the {@link InternalTransactionCloudDatastoreV1} object associated with a
182 * {@link Transaction} by looking up the ID in an static, threadsafe map.
183 * @throws IllegalArgumentException If a txn object is not found.
184 * @return Internal transaction object associated with the given ID.
186 static InternalTransactionCloudDatastoreV1 getById(String txnClientId) {
187 InternalTransactionCloudDatastoreV1 txnImpl = internalTransactionRegister.get(txnClientId);
188 if (txnImpl == null) {
189 throw new IllegalArgumentException("Transaction not found with ID: " + txnClientId);
191 return txnImpl;
194 private static class VoidFutureWrapper<T> extends FutureWrapper<T, Void> {
195 public VoidFutureWrapper(Future<T> parent) {
196 super(parent);
199 @Override
200 protected Void wrap(T ignore) throws Exception {
201 return null;
204 @Override
205 protected Throwable convertException(Throwable cause) {
206 return cause;