App Engine Java SDK version 1.9.1
[gae.git] / java / src / main / com / google / appengine / api / datastore / Batcher.java
blob4e208f701eb5e8fd5812324623109e73bd2d1b16
1 package com.google.appengine.api.datastore;
3 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
4 import com.google.common.collect.Iterators;
5 import com.google.common.collect.Lists;
6 import com.google.common.collect.Maps;
7 import com.google.io.protocol.Protocol;
8 import com.google.protobuf.Message;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.Iterator;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.NoSuchElementException;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.TimeoutException;
21 /**
22 * A class that splits groups of values into multiple batches based on size, count and group
23 * affiliation.
25 * This class purposefully delays conversion to protocol message format to reduce memory usage.
27 * @param <R> the batch message type, usually the request
28 * @param <F> the java native value type to batch
29 * @param <T> the proto value type to batch
31 abstract class Batcher<R extends Message, F, T extends Message> {
32 /**
33 * @return the group affiliation for the given value.
35 abstract Object getGroup(F value);
37 /**
38 * Adds the given proto value to the given batch.
40 * @param pb the proto to add
41 * @param batch the batch to add to
43 abstract void addToBatch(T pb, R batch);
45 /**
46 * @return a new empty batch based on the base batch template.
48 abstract R newBatch(R baseBatch);
50 /**
51 * @return the maximum message size in bytes for a single batch
53 abstract int getMaxSize();
55 /**
56 * @return the maximum number of values to add to a single batch
58 abstract int getMaxCount();
60 /**
61 * @return the maximum number of groups to include in a single batch (if grouping is enabled)
63 abstract int getMaxGroups();
65 /**
66 * @return the protocol message version of the value
68 abstract T toPb(F value);
70 /**
71 * Models an item and its associated index in some ordered collection.
73 * @param <F> The type of the item.
75 static class IndexedItem<F> {
76 final int index;
77 final F item;
79 IndexedItem(int index, F item) {
80 this.index = index;
81 this.item = item;
84 @Override
85 public String toString() {
86 return String.format("IndexedItem(%d, %s)", index, item);
90 /***
91 * A future that re-orders the results of a batch operation given the order returned by {@link
92 * Batcher#getBatches(Collection, Message, boolean, List)}.
94 * @param <K> batch type
95 * @param <V> aggregated result type
97 abstract static class ReorderingMultiFuture<K, V> extends MultiFuture<K, V> {
98 private final Collection<Integer> order;
101 * @param futures the batched futures
102 * @param order a collection containing the index at which the associated value should appear.
104 public ReorderingMultiFuture(Iterable<Future<K>> futures, Collection<Integer> order) {
105 super(futures);
106 this.order = order;
111 * @param batch a batch result
112 * @param indexItr an iterator that produces the associated index for each batch result. {@code
113 * next()} will be called exactly once for each value in batch.
114 * @param result the aggregated result instance to populate.
115 * @return the populated aggregate result
117 protected abstract V aggregate(K batch, Iterator<Integer> indexItr, V result);
120 * @param size the number of results to expect
121 * @return the object that should be populated with the re-orded results
123 protected abstract V initResult(int size);
125 @Override
126 public final V get() throws InterruptedException, ExecutionException {
127 Iterator<Integer> indexItr = order.iterator();
128 V result = initResult(order.size());
129 for (Future<K> future : futures) {
130 result = aggregate(future.get(), indexItr, result);
132 return result;
135 @Override
136 public final V get(long timeout, TimeUnit unit)
137 throws InterruptedException, ExecutionException, TimeoutException {
138 Iterator<Integer> indexItr = order.iterator();
139 V result = initResult(order.size());
140 for (Future<K> future : futures) {
141 result = aggregate(future.get(timeout, unit), indexItr, result);
143 return result;
148 * An iterator that builds batches lazily.
150 * @param <V> the intermediate value type
152 private abstract class BatchIterator<V> implements Iterator<R> {
154 * Must be called only once per value and in the order in which the values are added to batches.
156 * @return the original value
158 protected abstract F getValue(V value);
160 final R baseBatch;
161 final int maxSize = getMaxSize();
162 final int maxCount = getMaxCount();
163 final int maxGroups;
164 final Iterator<? extends Iterable<V>> groupItr;
165 Iterator<V> valueItr;
166 T nextValue;
169 * @param baseBatch the base batch template
170 * @param groupedValues an iterator the returns groups of values, must not be empty or
171 * contain any empty group.
173 BatchIterator(R baseBatch, Iterator<? extends Iterable<V>> groupedValues) {
174 this.baseBatch = baseBatch;
175 this.groupItr = groupedValues;
176 this.valueItr = groupItr.next().iterator();
177 this.nextValue = toPb(getValue(valueItr.next()));
178 this.maxGroups = getMaxGroups();
181 @Override
182 public boolean hasNext() {
183 return nextValue != null;
186 @Override
187 public R next() {
188 if (!hasNext()) {
189 throw new NoSuchElementException();
191 R batch = newBatch(baseBatch);
192 int size = batch.getSerializedSize();
193 int numGroups = 1;
194 for (int i = 0; i < maxCount && numGroups <= maxGroups; ++i) {
195 int valueSize = getEmbeddedSize(nextValue);
196 if (i > 0 &&
197 size + valueSize > maxSize) {
198 break;
200 size += valueSize;
201 addToBatch(nextValue, batch);
203 if (!valueItr.hasNext()) {
204 if (!groupItr.hasNext()) {
205 nextValue = null;
206 break;
208 valueItr = groupItr.next().iterator();
209 ++numGroups;
211 nextValue = toPb(getValue(valueItr.next()));
213 return batch;
216 @Override
217 public void remove() {
218 throw new UnsupportedOperationException();
223 * @return the embedded size of the given message
225 private static int getEmbeddedSize(Message pb) {
226 return Protocol.stringSize(pb.getSerializedSize()) + 1;
230 * Gets or create the collection for the given key and adds the given value.
232 * Creates collections that allow duplicates and preserves order (ArrayList).
234 * @param map the map from which get or create the collection
235 * @param key the key of the collection to add value to
236 * @param value the value to add
238 private <T> void put(Map<Object, Collection<T>> map, Object key, T value) {
239 Collection<T> col = map.get(key);
240 if (col == null) {
241 col = Lists.newArrayList();
242 map.put(key, col);
244 col.add(value);
248 * Lazily compute batches.
250 * @param values the values to batch
251 * @param baseBatch the batch template to use
252 * @param group if the values should be grouped using {@link #getGroup}
253 * @return an iterator that lazily computes batches.
255 Iterator<R> getBatches(Collection<F> values, R baseBatch, boolean group) {
256 if (values.isEmpty()) {
257 return Collections.emptyIterator();
260 Iterator<? extends Iterable<F>> groupItr;
261 if (group) {
262 Map<Object, Collection<F>> groupedValues = Maps.newLinkedHashMap();
263 for (F value : values) {
264 put(groupedValues, getGroup(value), value);
266 groupItr = groupedValues.values().iterator();
267 } else {
268 groupItr = Iterators.singletonIterator(values);
271 return new BatchIterator<F>(baseBatch, groupItr) {
272 @Override
273 protected F getValue(F value) {
274 return value;
280 * Lazily compute batches and populate order with indexes of the value as they are added
281 * to each batch.
283 * @param values the values to batch
284 * @param baseBatch the batch template to use
285 * @param group if the values should be grouped using {@link #getGroup}
286 * @param order the list to populate with the indexes of the values as they are added to batches
287 * @return an iterator that lazily computes batches.
289 Iterator<R> getBatches(Collection<F> values, R baseBatch, boolean group,
290 final List<Integer> order) {
291 if (values.isEmpty()) {
292 return Collections.emptyIterator();
295 Iterator<? extends Iterable<IndexedItem<F>>> groupItr;
296 if (group) {
297 Map<Object, Collection<IndexedItem<F>>> groupedValues = Maps.newLinkedHashMap();
298 int index = 0;
299 for (F value : values) {
300 put(groupedValues, getGroup(value), new IndexedItem<F>(index++, value));
302 groupItr = groupedValues.values().iterator();
303 } else {
304 List<IndexedItem<F>> indexedValue = Lists.newArrayList();
305 int index = 0;
306 for (F value : values) {
307 indexedValue.add(new IndexedItem<F>(index++, value));
309 groupItr = Iterators.<Iterable<IndexedItem<F>>>singletonIterator(indexedValue);
312 return new BatchIterator<IndexedItem<F>>(baseBatch, groupItr) {
313 @Override
314 protected F getValue(IndexedItem<F> value) {
315 order.add(value.index);
316 return value.item;