Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / Batcher.java
blob195fb863df796b0aed4d2c680b2392c3796a9209
1 package com.google.appengine.api.datastore;
3 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
4 import com.google.common.collect.Iterators;
5 import com.google.common.collect.Lists;
6 import com.google.common.collect.Maps;
7 import com.google.io.protocol.Protocol;
8 import com.google.protobuf.MessageLite;
9 import com.google.protobuf.MessageLiteOrBuilder;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.NoSuchElementException;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
22 /**
23 * A class that splits groups of values into multiple batches based on size, count and group
24 * affiliation.
26 * This class purposefully delays conversion to protocol message format to reduce memory usage.
28 * @param <R> the batch message type, usually the request
29 * @param <F> the java native value type to batch
30 * @param <T> the proto value type to batch
32 abstract class Batcher<R extends MessageLiteOrBuilder, F, T extends MessageLite> {
33 /**
34 * @return the group affiliation for the given value.
36 abstract Object getGroup(F value);
38 /**
39 * Adds the given proto value to the given batch.
41 * @param pb the proto to add
42 * @param batch the batch to add to
44 abstract void addToBatch(T pb, R batch);
46 /**
47 * @return a new empty batch based on the base batch template.
49 abstract R newBatch(R baseBatch);
51 /**
52 * @return the maximum message size in bytes for a single batch
54 abstract int getMaxSize();
56 /**
57 * @return the maximum number of values to add to a single batch
59 abstract int getMaxCount();
61 /**
62 * @return the maximum number of groups to include in a single batch (if grouping is enabled)
64 abstract int getMaxGroups();
66 /**
67 * @return the protocol message version of the value
69 abstract T toPb(F value);
71 /**
72 * Models an item and its associated index in some ordered collection.
74 * @param <F> The type of the item.
76 static class IndexedItem<F> {
77 final int index;
78 final F item;
80 IndexedItem(int index, F item) {
81 this.index = index;
82 this.item = item;
85 @Override
86 public String toString() {
87 return String.format("IndexedItem(%d, %s)", index, item);
91 /***
92 * A future that re-orders the results of a batch operation given the order returned by {@link
93 * Batcher#getBatches(Collection, MessageLiteOrBuilder, int, boolean, List)}.
95 * @param <K> batch type
96 * @param <V> aggregated result type
98 abstract static class ReorderingMultiFuture<K, V> extends MultiFuture<K, V> {
99 private final Collection<Integer> order;
102 * @param futures the batched futures
103 * @param order a collection containing the index at which the associated value should appear.
105 public ReorderingMultiFuture(Iterable<Future<K>> futures, Collection<Integer> order) {
106 super(futures);
107 this.order = order;
112 * @param batch a batch result
113 * @param indexItr an iterator that produces the associated index for each batch result. {@code
114 * next()} will be called exactly once for each value in batch.
115 * @param result the aggregated result instance to populate.
116 * @return the populated aggregate result
118 protected abstract V aggregate(K batch, Iterator<Integer> indexItr, V result);
121 * @param size the number of results to expect
122 * @return the object that should be populated with the re-orded results
124 protected abstract V initResult(int size);
126 @Override
127 public final V get() throws InterruptedException, ExecutionException {
128 Iterator<Integer> indexItr = order.iterator();
129 V result = initResult(order.size());
130 for (Future<K> future : futures) {
131 result = aggregate(future.get(), indexItr, result);
133 return result;
136 @Override
137 public final V get(long timeout, TimeUnit unit)
138 throws InterruptedException, ExecutionException, TimeoutException {
139 Iterator<Integer> indexItr = order.iterator();
140 V result = initResult(order.size());
141 for (Future<K> future : futures) {
142 result = aggregate(future.get(timeout, unit), indexItr, result);
144 return result;
149 * An iterator that builds batches lazily.
151 * @param <V> the intermediate value type
153 private abstract class BatchIterator<V> implements Iterator<R> {
155 * Must be called only once per value and in the order in which the values are added to batches.
157 * @return the original value
159 protected abstract F getValue(V value);
161 final R baseBatch;
162 final int baseSize;
163 final int maxSize = getMaxSize();
164 final int maxCount = getMaxCount();
165 final int maxGroups;
166 final Iterator<? extends Iterable<V>> groupItr;
167 Iterator<V> valueItr;
168 T nextValue;
171 * @param baseBatch the base batch template
172 * @param groupedValues an iterator the returns groups of values, must not be empty or
173 * contain any empty group.
175 BatchIterator(R baseBatch, int baseBatchSize, Iterator<? extends Iterable<V>> groupedValues) {
176 this.baseBatch = baseBatch;
177 this.baseSize = baseBatchSize;
178 this.groupItr = groupedValues;
179 this.valueItr = groupItr.next().iterator();
180 this.nextValue = toPb(getValue(valueItr.next()));
181 this.maxGroups = getMaxGroups();
184 @Override
185 public boolean hasNext() {
186 return nextValue != null;
189 @Override
190 public R next() {
191 if (!hasNext()) {
192 throw new NoSuchElementException();
194 R batch = newBatch(baseBatch);
195 int size = baseSize;
196 int numGroups = 1;
197 for (int i = 0; i < maxCount && numGroups <= maxGroups; ++i) {
198 int valueSize = getEmbeddedSize(nextValue);
199 if (i > 0 &&
200 size + valueSize > maxSize) {
201 break;
203 size += valueSize;
204 addToBatch(nextValue, batch);
206 if (!valueItr.hasNext()) {
207 if (!groupItr.hasNext()) {
208 nextValue = null;
209 break;
211 valueItr = groupItr.next().iterator();
212 ++numGroups;
214 nextValue = toPb(getValue(valueItr.next()));
216 return batch;
219 @Override
220 public void remove() {
221 throw new UnsupportedOperationException();
226 * @return the embedded size of the given message
228 private static int getEmbeddedSize(MessageLite pb) {
229 return Protocol.stringSize(pb.getSerializedSize()) + 1;
233 * Gets or create the collection for the given key and adds the given value.
235 * Creates collections that allow duplicates and preserves order (ArrayList).
237 * @param map the map from which get or create the collection
238 * @param key the key of the collection to add value to
239 * @param value the value to add
241 private <T> void put(Map<Object, Collection<T>> map, Object key, T value) {
242 Collection<T> col = map.get(key);
243 if (col == null) {
244 col = Lists.newArrayList();
245 map.put(key, col);
247 col.add(value);
251 * Lazily compute batches.
253 * @param values the values to batch
254 * @param baseBatch the batch template to use
255 * @param baseBatchSize serialized size of baseBatch
256 * @param group if the values should be grouped using {@link #getGroup}
257 * @return an iterator that lazily computes batches.
259 Iterator<R> getBatches(Collection<F> values, R baseBatch, int baseBatchSize, boolean group) {
260 if (values.isEmpty()) {
261 return Collections.emptyIterator();
264 Iterator<? extends Iterable<F>> groupItr;
265 if (group) {
266 Map<Object, Collection<F>> groupedValues = Maps.newLinkedHashMap();
267 for (F value : values) {
268 put(groupedValues, getGroup(value), value);
270 groupItr = groupedValues.values().iterator();
271 } else {
272 groupItr = Iterators.singletonIterator(values);
275 return new BatchIterator<F>(baseBatch, baseBatchSize, groupItr) {
276 @Override
277 protected F getValue(F value) {
278 return value;
284 * Lazily compute batches and populate order with indexes of the value as they are added
285 * to each batch.
287 * @param values the values to batch
288 * @param baseBatch the batch template to use
289 * @param baseBatchSize size of baseBatch
290 * @param group if the values should be grouped using {@link #getGroup}
291 * @param order the list to populate with the indexes of the values as they are added to batches
292 * @return an iterator that lazily computes batches.
294 Iterator<R> getBatches(Collection<F> values, R baseBatch, int baseBatchSize, boolean group,
295 final List<Integer> order) {
296 if (values.isEmpty()) {
297 return Collections.emptyIterator();
300 Iterator<? extends Iterable<IndexedItem<F>>> groupItr;
301 if (group) {
302 Map<Object, Collection<IndexedItem<F>>> groupedValues = Maps.newLinkedHashMap();
303 int index = 0;
304 for (F value : values) {
305 put(groupedValues, getGroup(value), new IndexedItem<F>(index++, value));
307 groupItr = groupedValues.values().iterator();
308 } else {
309 List<IndexedItem<F>> indexedValue = Lists.newArrayList();
310 int index = 0;
311 for (F value : values) {
312 indexedValue.add(new IndexedItem<F>(index++, value));
314 groupItr = Iterators.<Iterable<IndexedItem<F>>>singletonIterator(indexedValue);
317 return new BatchIterator<IndexedItem<F>>(baseBatch, baseBatchSize, groupItr) {
318 @Override
319 protected F getValue(IndexedItem<F> value) {
320 order.add(value.index);
321 return value.item;