1 package com
.google
.appengine
.api
.datastore
;
3 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
4 import com
.google
.common
.collect
.Iterators
;
5 import com
.google
.common
.collect
.Lists
;
6 import com
.google
.common
.collect
.Maps
;
7 import com
.google
.io
.protocol
.Protocol
;
8 import com
.google
.protobuf
.Message
;
10 import java
.util
.Collection
;
11 import java
.util
.Collections
;
12 import java
.util
.Iterator
;
13 import java
.util
.List
;
15 import java
.util
.NoSuchElementException
;
16 import java
.util
.concurrent
.ExecutionException
;
17 import java
.util
.concurrent
.Future
;
18 import java
.util
.concurrent
.TimeUnit
;
19 import java
.util
.concurrent
.TimeoutException
;
22 * A class that splits groups of values into multiple batches based on size, count and group
25 * This class purposefully delays conversion to protocol message format to reduce memory usage.
27 * @param <R> the batch message type, usually the request
28 * @param <F> the java native value type to batch
29 * @param <T> the proto value type to batch
31 abstract class Batcher
<R
extends Message
, F
, T
extends Message
> {
33 * @return the group affiliation for the given value.
35 abstract Object
getGroup(F value
);
38 * Adds the given proto value to the given batch.
40 * @param pb the proto to add
41 * @param batch the batch to add to
43 abstract void addToBatch(T pb
, R batch
);
46 * @return a new empty batch based on the base batch template.
48 abstract R
newBatch(R baseBatch
);
51 * @return the maximum message size in bytes for a single batch
53 abstract int getMaxSize();
56 * @return the maximum number of values to add to a single batch
58 abstract int getMaxCount();
61 * @return the maximum number of groups to include in a single batch (if grouping is enabled)
63 abstract int getMaxGroups();
66 * @return the protocol message version of the value
68 abstract T
toPb(F value
);
71 * Models an item and its associated index in some ordered collection.
73 * @param <F> The type of the item.
75 static class IndexedItem
<F
> {
79 IndexedItem(int index
, F item
) {
85 public String
toString() {
86 return String
.format("IndexedItem(%d, %s)", index
, item
);
91 * A future that re-orders the results of a batch operation given the order returned by {@link
92 * Batcher#getBatches(Collection, Message, boolean, List)}.
94 * @param <K> batch type
95 * @param <V> aggregated result type
97 abstract static class ReorderingMultiFuture
<K
, V
> extends MultiFuture
<K
, V
> {
98 private final Collection
<Integer
> order
;
101 * @param futures the batched futures
102 * @param order a collection containing the index at which the associated value should appear.
104 public ReorderingMultiFuture(Iterable
<Future
<K
>> futures
, Collection
<Integer
> order
) {
111 * @param batch a batch result
112 * @param indexItr an iterator that produces the associated index for each batch result. {@code
113 * next()} will be called exactly once for each value in batch.
114 * @param result the aggregated result instance to populate.
115 * @return the populated aggregate result
117 protected abstract V
aggregate(K batch
, Iterator
<Integer
> indexItr
, V result
);
120 * @param size the number of results to expect
121 * @return the object that should be populated with the re-orded results
123 protected abstract V
initResult(int size
);
126 public final V
get() throws InterruptedException
, ExecutionException
{
127 Iterator
<Integer
> indexItr
= order
.iterator();
128 V result
= initResult(order
.size());
129 for (Future
<K
> future
: futures
) {
130 result
= aggregate(future
.get(), indexItr
, result
);
136 public final V
get(long timeout
, TimeUnit unit
)
137 throws InterruptedException
, ExecutionException
, TimeoutException
{
138 Iterator
<Integer
> indexItr
= order
.iterator();
139 V result
= initResult(order
.size());
140 for (Future
<K
> future
: futures
) {
141 result
= aggregate(future
.get(timeout
, unit
), indexItr
, result
);
148 * An iterator that builds batches lazily.
150 * @param <V> the intermediate value type
152 private abstract class BatchIterator
<V
> implements Iterator
<R
> {
154 * Must be called only once per value and in the order in which the values are added to batches.
156 * @return the original value
158 protected abstract F
getValue(V value
);
161 final int maxSize
= getMaxSize();
162 final int maxCount
= getMaxCount();
164 final Iterator
<?
extends Iterable
<V
>> groupItr
;
165 Iterator
<V
> valueItr
;
169 * @param baseBatch the base batch template
170 * @param groupedValues an iterator the returns groups of values, must not be empty or
171 * contain any empty group.
173 BatchIterator(R baseBatch
, Iterator
<?
extends Iterable
<V
>> groupedValues
) {
174 this.baseBatch
= baseBatch
;
175 this.groupItr
= groupedValues
;
176 this.valueItr
= groupItr
.next().iterator();
177 this.nextValue
= toPb(getValue(valueItr
.next()));
178 this.maxGroups
= getMaxGroups();
182 public boolean hasNext() {
183 return nextValue
!= null;
189 throw new NoSuchElementException();
191 R batch
= newBatch(baseBatch
);
192 int size
= batch
.getSerializedSize();
194 for (int i
= 0; i
< maxCount
&& numGroups
<= maxGroups
; ++i
) {
195 int valueSize
= getEmbeddedSize(nextValue
);
197 size
+ valueSize
> maxSize
) {
201 addToBatch(nextValue
, batch
);
203 if (!valueItr
.hasNext()) {
204 if (!groupItr
.hasNext()) {
208 valueItr
= groupItr
.next().iterator();
211 nextValue
= toPb(getValue(valueItr
.next()));
217 public void remove() {
218 throw new UnsupportedOperationException();
223 * @return the embedded size of the given message
225 private static int getEmbeddedSize(Message pb
) {
226 return Protocol
.stringSize(pb
.getSerializedSize()) + 1;
230 * Gets or create the collection for the given key and adds the given value.
232 * Creates collections that allow duplicates and preserves order (ArrayList).
234 * @param map the map from which get or create the collection
235 * @param key the key of the collection to add value to
236 * @param value the value to add
238 private <T
> void put(Map
<Object
, Collection
<T
>> map
, Object key
, T value
) {
239 Collection
<T
> col
= map
.get(key
);
241 col
= Lists
.newArrayList();
248 * Lazily compute batches.
250 * @param values the values to batch
251 * @param baseBatch the batch template to use
252 * @param group if the values should be grouped using {@link #getGroup}
253 * @return an iterator that lazily computes batches.
255 Iterator
<R
> getBatches(Collection
<F
> values
, R baseBatch
, boolean group
) {
256 if (values
.isEmpty()) {
257 return Collections
.emptyIterator();
260 Iterator
<?
extends Iterable
<F
>> groupItr
;
262 Map
<Object
, Collection
<F
>> groupedValues
= Maps
.newLinkedHashMap();
263 for (F value
: values
) {
264 put(groupedValues
, getGroup(value
), value
);
266 groupItr
= groupedValues
.values().iterator();
268 groupItr
= Iterators
.singletonIterator(values
);
271 return new BatchIterator
<F
>(baseBatch
, groupItr
) {
273 protected F
getValue(F value
) {
280 * Lazily compute batches and populate order with indexes of the value as they are added
283 * @param values the values to batch
284 * @param baseBatch the batch template to use
285 * @param group if the values should be grouped using {@link #getGroup}
286 * @param order the list to populate with the indexes of the values as they are added to batches
287 * @return an iterator that lazily computes batches.
289 Iterator
<R
> getBatches(Collection
<F
> values
, R baseBatch
, boolean group
,
290 final List
<Integer
> order
) {
291 if (values
.isEmpty()) {
292 return Collections
.emptyIterator();
295 Iterator
<?
extends Iterable
<IndexedItem
<F
>>> groupItr
;
297 Map
<Object
, Collection
<IndexedItem
<F
>>> groupedValues
= Maps
.newLinkedHashMap();
299 for (F value
: values
) {
300 put(groupedValues
, getGroup(value
), new IndexedItem
<F
>(index
++, value
));
302 groupItr
= groupedValues
.values().iterator();
304 List
<IndexedItem
<F
>> indexedValue
= Lists
.newArrayList();
306 for (F value
: values
) {
307 indexedValue
.add(new IndexedItem
<F
>(index
++, value
));
309 groupItr
= Iterators
.<Iterable
<IndexedItem
<F
>>>singletonIterator(indexedValue
);
312 return new BatchIterator
<IndexedItem
<F
>>(baseBatch
, groupItr
) {
314 protected F
getValue(IndexedItem
<F
> value
) {
315 order
.add(value
.index
);