1 package com
.google
.appengine
.api
.datastore
;
3 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
4 import com
.google
.common
.collect
.Iterators
;
5 import com
.google
.common
.collect
.Lists
;
6 import com
.google
.common
.collect
.Maps
;
7 import com
.google
.io
.protocol
.Protocol
;
8 import com
.google
.protobuf
.MessageLite
;
9 import com
.google
.protobuf
.MessageLiteOrBuilder
;
11 import java
.util
.Collection
;
12 import java
.util
.Collections
;
13 import java
.util
.Iterator
;
14 import java
.util
.List
;
16 import java
.util
.NoSuchElementException
;
17 import java
.util
.concurrent
.ExecutionException
;
18 import java
.util
.concurrent
.Future
;
19 import java
.util
.concurrent
.TimeUnit
;
20 import java
.util
.concurrent
.TimeoutException
;
23 * A class that splits groups of values into multiple batches based on size, count and group
26 * This class purposefully delays conversion to protocol message format to reduce memory usage.
28 * @param <R> the batch message type, usually the request
29 * @param <F> the java native value type to batch
30 * @param <T> the proto value type to batch
32 abstract class Batcher
<R
extends MessageLiteOrBuilder
, F
, T
extends MessageLite
> {
34 * @return the group affiliation for the given value.
36 abstract Object
getGroup(F value
);
39 * Adds the given proto value to the given batch.
41 * @param pb the proto to add
42 * @param batch the batch to add to
44 abstract void addToBatch(T pb
, R batch
);
47 * @return a new empty batch based on the base batch template.
49 abstract R
newBatch(R baseBatch
);
52 * @return the maximum message size in bytes for a single batch
54 abstract int getMaxSize();
57 * @return the maximum number of values to add to a single batch
59 abstract int getMaxCount();
62 * @return the maximum number of groups to include in a single batch (if grouping is enabled)
64 abstract int getMaxGroups();
67 * @return the protocol message version of the value
69 abstract T
toPb(F value
);
72 * Models an item and its associated index in some ordered collection.
74 * @param <F> The type of the item.
76 static class IndexedItem
<F
> {
80 IndexedItem(int index
, F item
) {
86 public String
toString() {
87 return String
.format("IndexedItem(%d, %s)", index
, item
);
92 * A future that re-orders the results of a batch operation given the order returned by {@link
93 * Batcher#getBatches(Collection, MessageLiteOrBuilder, int, boolean, List)}.
95 * @param <K> batch type
96 * @param <V> aggregated result type
98 abstract static class ReorderingMultiFuture
<K
, V
> extends MultiFuture
<K
, V
> {
99 private final Collection
<Integer
> order
;
102 * @param futures the batched futures
103 * @param order a collection containing the index at which the associated value should appear.
105 public ReorderingMultiFuture(Iterable
<Future
<K
>> futures
, Collection
<Integer
> order
) {
112 * @param batch a batch result
113 * @param indexItr an iterator that produces the associated index for each batch result. {@code
114 * next()} will be called exactly once for each value in batch.
115 * @param result the aggregated result instance to populate.
116 * @return the populated aggregate result
118 protected abstract V
aggregate(K batch
, Iterator
<Integer
> indexItr
, V result
);
121 * @param size the number of results to expect
122 * @return the object that should be populated with the re-orded results
124 protected abstract V
initResult(int size
);
127 public final V
get() throws InterruptedException
, ExecutionException
{
128 Iterator
<Integer
> indexItr
= order
.iterator();
129 V result
= initResult(order
.size());
130 for (Future
<K
> future
: futures
) {
131 result
= aggregate(future
.get(), indexItr
, result
);
137 public final V
get(long timeout
, TimeUnit unit
)
138 throws InterruptedException
, ExecutionException
, TimeoutException
{
139 Iterator
<Integer
> indexItr
= order
.iterator();
140 V result
= initResult(order
.size());
141 for (Future
<K
> future
: futures
) {
142 result
= aggregate(future
.get(timeout
, unit
), indexItr
, result
);
149 * An iterator that builds batches lazily.
151 * @param <V> the intermediate value type
153 private abstract class BatchIterator
<V
> implements Iterator
<R
> {
155 * Must be called only once per value and in the order in which the values are added to batches.
157 * @return the original value
159 protected abstract F
getValue(V value
);
163 final int maxSize
= getMaxSize();
164 final int maxCount
= getMaxCount();
166 final Iterator
<?
extends Iterable
<V
>> groupItr
;
167 Iterator
<V
> valueItr
;
171 * @param baseBatch the base batch template
172 * @param groupedValues an iterator the returns groups of values, must not be empty or
173 * contain any empty group.
175 BatchIterator(R baseBatch
, int baseBatchSize
, Iterator
<?
extends Iterable
<V
>> groupedValues
) {
176 this.baseBatch
= baseBatch
;
177 this.baseSize
= baseBatchSize
;
178 this.groupItr
= groupedValues
;
179 this.valueItr
= groupItr
.next().iterator();
180 this.nextValue
= toPb(getValue(valueItr
.next()));
181 this.maxGroups
= getMaxGroups();
185 public boolean hasNext() {
186 return nextValue
!= null;
192 throw new NoSuchElementException();
194 R batch
= newBatch(baseBatch
);
197 for (int i
= 0; i
< maxCount
&& numGroups
<= maxGroups
; ++i
) {
198 int valueSize
= getEmbeddedSize(nextValue
);
200 size
+ valueSize
> maxSize
) {
204 addToBatch(nextValue
, batch
);
206 if (!valueItr
.hasNext()) {
207 if (!groupItr
.hasNext()) {
211 valueItr
= groupItr
.next().iterator();
214 nextValue
= toPb(getValue(valueItr
.next()));
220 public void remove() {
221 throw new UnsupportedOperationException();
226 * @return the embedded size of the given message
228 private static int getEmbeddedSize(MessageLite pb
) {
229 return Protocol
.stringSize(pb
.getSerializedSize()) + 1;
233 * Gets or create the collection for the given key and adds the given value.
235 * Creates collections that allow duplicates and preserves order (ArrayList).
237 * @param map the map from which get or create the collection
238 * @param key the key of the collection to add value to
239 * @param value the value to add
241 private <T
> void put(Map
<Object
, Collection
<T
>> map
, Object key
, T value
) {
242 Collection
<T
> col
= map
.get(key
);
244 col
= Lists
.newArrayList();
251 * Lazily compute batches.
253 * @param values the values to batch
254 * @param baseBatch the batch template to use
255 * @param baseBatchSize serialized size of baseBatch
256 * @param group if the values should be grouped using {@link #getGroup}
257 * @return an iterator that lazily computes batches.
259 Iterator
<R
> getBatches(Collection
<F
> values
, R baseBatch
, int baseBatchSize
, boolean group
) {
260 if (values
.isEmpty()) {
261 return Collections
.emptyIterator();
264 Iterator
<?
extends Iterable
<F
>> groupItr
;
266 Map
<Object
, Collection
<F
>> groupedValues
= Maps
.newLinkedHashMap();
267 for (F value
: values
) {
268 put(groupedValues
, getGroup(value
), value
);
270 groupItr
= groupedValues
.values().iterator();
272 groupItr
= Iterators
.singletonIterator(values
);
275 return new BatchIterator
<F
>(baseBatch
, baseBatchSize
, groupItr
) {
277 protected F
getValue(F value
) {
284 * Lazily compute batches and populate order with indexes of the value as they are added
287 * @param values the values to batch
288 * @param baseBatch the batch template to use
289 * @param baseBatchSize size of baseBatch
290 * @param group if the values should be grouped using {@link #getGroup}
291 * @param order the list to populate with the indexes of the values as they are added to batches
292 * @return an iterator that lazily computes batches.
294 Iterator
<R
> getBatches(Collection
<F
> values
, R baseBatch
, int baseBatchSize
, boolean group
,
295 final List
<Integer
> order
) {
296 if (values
.isEmpty()) {
297 return Collections
.emptyIterator();
300 Iterator
<?
extends Iterable
<IndexedItem
<F
>>> groupItr
;
302 Map
<Object
, Collection
<IndexedItem
<F
>>> groupedValues
= Maps
.newLinkedHashMap();
304 for (F value
: values
) {
305 put(groupedValues
, getGroup(value
), new IndexedItem
<F
>(index
++, value
));
307 groupItr
= groupedValues
.values().iterator();
309 List
<IndexedItem
<F
>> indexedValue
= Lists
.newArrayList();
311 for (F value
: values
) {
312 indexedValue
.add(new IndexedItem
<F
>(index
++, value
));
314 groupItr
= Iterators
.<Iterable
<IndexedItem
<F
>>>singletonIterator(indexedValue
);
317 return new BatchIterator
<IndexedItem
<F
>>(baseBatch
, baseBatchSize
, groupItr
) {
319 protected F
getValue(IndexedItem
<F
> value
) {
320 order
.add(value
.index
);