App Engine Java SDK version 1.9.14
[gae.git] / python / google / appengine / datastore / datastore_query.py
blob2698701011fbcda8352dd26e84b96763b8f9228e
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """A thin wrapper around datastore query RPC calls.
23 This provides wrappers around the internal only datastore_pb library and is
24 designed to be the lowest-level API to be used by all Python datastore client
25 libraries for executing queries. It provides a layer of protection so the actual
26 RPC syntax can change without affecting client libraries.
28 Any class, function, field or argument starting with an '_' is for INTERNAL use
29 only and should not be used by developers!
30 """
38 __all__ = ['Batch',
39 'Batcher',
40 'CompositeFilter',
41 'CompositeOrder',
42 'CorrelationFilter',
43 'Cursor',
44 'FetchOptions',
45 'FilterPredicate',
46 'Order',
47 'PropertyFilter',
48 'PropertyOrder',
49 'Query',
50 'QueryOptions',
51 'ResultsIterator',
52 'make_filter',
53 'apply_query',
54 'inject_results',
57 import base64
58 import collections
59 import pickle
61 from google.appengine.datastore import entity_pb
63 from google.appengine.api import datastore_errors
64 from google.appengine.api import datastore_types
65 from google.appengine.api.search import geo_util
66 from google.appengine.datastore import datastore_index
67 from google.appengine.datastore import datastore_pb
68 from google.appengine.datastore import datastore_rpc
71 class _BaseComponent(object):
72 """A base class for query components.
74 Currently just implements basic == and != functions.
75 """
77 def __eq__(self, other):
78 if self.__class__ is not other.__class__:
79 return NotImplemented
80 return self is other or self.__dict__ == other.__dict__
82 def __ne__(self, other):
83 equal = self.__eq__(other)
84 if equal is NotImplemented:
85 return equal
86 return not equal
89 def make_filter(name, op, values):
90 """Constructs a FilterPredicate from the given name, op and values.
92 Args:
93 name: A non-empty string, the name of the property to filter.
94 op: One of PropertyFilter._OPERATORS.keys(), the operator to use.
95 values: A supported value, the value to compare against.
97 Returns:
98 if values is a list, a CompositeFilter that uses AND to combine all
99 values, otherwise a PropertyFilter for the single value.
101 Raises:
102 datastore_errors.BadPropertyError: if the property name is invalid.
103 datastore_errors.BadValueError: if the property did not validate correctly
104 or the value was an empty list.
105 Other exception types (like OverflowError): if the property value does not
106 meet type-specific criteria.
108 datastore_types.ValidateProperty(name, values)
109 properties = datastore_types.ToPropertyPb(name, values)
110 if isinstance(properties, list):
111 filters = [PropertyFilter(op, prop) for prop in properties]
112 return CompositeFilter(CompositeFilter.AND, filters)
113 else:
114 return PropertyFilter(op, properties)
117 def _make_key_value_map(entity, property_names):
118 """Extracts key values from the given entity.
120 Args:
121 entity: The entity_pb.EntityProto to extract values from.
122 property_names: The names of the properties from which to extract values.
124 Returns:
125 A dict mapping property names to a lists of key values.
127 value_map = dict((name, []) for name in property_names)
130 for prop in entity.property_list():
131 if prop.name() in value_map:
132 value_map[prop.name()].append(
133 datastore_types.PropertyValueToKeyValue(prop.value()))
136 if datastore_types.KEY_SPECIAL_PROPERTY in value_map:
137 value_map[datastore_types.KEY_SPECIAL_PROPERTY] = [
138 datastore_types.ReferenceToKeyValue(entity.key())]
140 return value_map
143 class _PropertyComponent(_BaseComponent):
144 """A component that operates on a specific set of properties."""
146 def _get_prop_names(self):
147 """Returns a set of property names used by the filter."""
148 raise NotImplementedError
151 class FilterPredicate(_PropertyComponent):
152 """An abstract base class for all query filters.
154 All sub-classes must be immutable as these are often stored without creating a
155 defensive copy.
158 def __call__(self, entity):
159 """Applies the filter predicate to the given entity.
161 Args:
162 entity: the datastore_pb.EntityProto to test.
164 Returns:
165 True if the given entity matches the filter, False otherwise.
167 return self._apply(_make_key_value_map(entity, self._get_prop_names()))
169 def _apply(self, key_value_map):
170 """Apply the given component to the comparable value map.
172 A filter matches a list of values if at least one value in the list
173 matches the filter, for example:
174 'prop: [1, 2]' matches both 'prop = 1' and 'prop = 2' but not 'prop = 3'
176 Note: the values are actually represented as tuples whose first item
177 encodes the type; see datastore_types.PropertyValueToKeyValue().
179 Args:
180 key_value_map: A dict mapping property names to a list of
181 comparable values.
183 Return:
184 A boolean indicating if the given map matches the filter.
186 raise NotImplementedError
188 def _prune(self, key_value_map):
189 """Removes values from the given map that do not match the filter.
191 When doing a scan in the datastore, only index values that match the filters
192 are seen. When multiple values that point to the same entity are seen, the
193 entity only appears where the first value is found. This function removes
194 all values that don't match the query so that the first value in the map
195 is the same one the datastore would see first.
197 Args:
198 key_value_map: the comparable value map from which to remove
199 values. Does not need to contain values for all filtered properties.
201 Returns:
202 A value that evaluates to False if every value in a single list was
203 completely removed. This effectively applies the filter but is less
204 efficient than _apply().
206 raise NotImplementedError
208 def _to_pb(self):
209 """Internal only function to generate a pb."""
210 raise NotImplementedError(
211 'This filter only supports in memory operations (%r)' % self)
213 def _to_pbs(self):
214 """Internal only function to generate a list of pbs."""
215 return [self._to_pb()]
218 class _SinglePropertyFilter(FilterPredicate):
219 """Base class for a filter that operates on a single property."""
221 def _get_prop_name(self):
222 """Returns the name of the property being filtered."""
223 raise NotImplementedError
225 def _apply_to_value(self, value):
226 """Apply the filter to the given value.
228 Args:
229 value: The comparable value to check.
231 Returns:
232 A boolean indicating if the given value matches the filter.
234 raise NotImplementedError
236 def _get_prop_names(self):
237 return set([self._get_prop_name()])
239 def _apply(self, value_map):
240 for other_value in value_map[self._get_prop_name()]:
241 if self._apply_to_value(other_value):
242 return True
243 return False
245 def _prune(self, value_map):
250 if self._get_prop_name() not in value_map:
251 return True
252 values = [value for value in value_map[self._get_prop_name()]
253 if self._apply_to_value(value)]
254 value_map[self._get_prop_name()] = values
255 return bool(values)
258 class PropertyFilter(_SinglePropertyFilter):
259 """An immutable filter predicate that constrains a single property."""
261 _OPERATORS = {
262 '<': datastore_pb.Query_Filter.LESS_THAN,
263 '<=': datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
264 '>': datastore_pb.Query_Filter.GREATER_THAN,
265 '>=': datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
266 '=': datastore_pb.Query_Filter.EQUAL,
269 _OPERATORS_INVERSE = dict((value, key)
270 for key, value in _OPERATORS.iteritems())
272 _OPERATORS_TO_PYTHON_OPERATOR = {
273 datastore_pb.Query_Filter.LESS_THAN: '<',
274 datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: '<=',
275 datastore_pb.Query_Filter.GREATER_THAN: '>',
276 datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: '>=',
277 datastore_pb.Query_Filter.EQUAL: '==',
280 _INEQUALITY_OPERATORS = frozenset(['<', '<=', '>', '>='])
282 _INEQUALITY_OPERATORS_ENUM = frozenset([
283 datastore_pb.Query_Filter.LESS_THAN,
284 datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
285 datastore_pb.Query_Filter.GREATER_THAN,
286 datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
289 _UPPERBOUND_INEQUALITY_OPERATORS = frozenset(['<', '<='])
291 def __init__(self, op, value):
292 """Constructor.
294 Args:
295 op: A string representing the operator to use.
296 value: A entity_pb.Property, the property and value to compare against.
298 Raises:
299 datastore_errors.BadArgumentError if op has an unsupported value or value
300 is not an entity_pb.Property.
302 if op not in self._OPERATORS:
303 raise datastore_errors.BadArgumentError('unknown operator: %r' % (op,))
304 if not isinstance(value, entity_pb.Property):
305 raise datastore_errors.BadArgumentError(
306 'value argument should be entity_pb.Property (%r)' % (value,))
308 super(PropertyFilter, self).__init__()
309 self._filter = datastore_pb.Query_Filter()
310 self._filter.set_op(self._OPERATORS[op])
311 self._filter.add_property().CopyFrom(value)
313 @property
314 def op(self):
315 raw_op = self._filter.op()
316 return self._OPERATORS_INVERSE.get(raw_op, str(raw_op))
318 @property
319 def value(self):
321 return self._filter.property(0)
323 def __repr__(self):
324 prop = self.value
325 name = prop.name()
326 value = datastore_types.FromPropertyPb(prop)
327 return '%s(%r, <%r, %r>)' % (self.__class__.__name__, self.op, name, value)
329 def _get_prop_name(self):
330 return self._filter.property(0).name()
332 def _apply_to_value(self, value):
333 if not hasattr(self, '_cmp_value'):
334 if self._filter.op() == datastore_pb.Query_Filter.EXISTS:
336 return True
337 self._cmp_value = datastore_types.PropertyValueToKeyValue(
338 self._filter.property(0).value())
339 self._condition = ('value %s self._cmp_value' %
340 self._OPERATORS_TO_PYTHON_OPERATOR[self._filter.op()])
341 return eval(self._condition)
343 def _has_inequality(self):
344 """Returns True if the filter predicate contains inequalities filters."""
345 return self._filter.op() in self._INEQUALITY_OPERATORS_ENUM
347 @classmethod
348 def _from_pb(cls, filter_pb):
350 self = cls.__new__(cls)
351 self._filter = filter_pb
352 return self
354 def _to_pb(self):
355 """Returns the internal only pb representation."""
356 return self._filter
358 def __getstate__(self):
359 raise pickle.PicklingError(
360 'Pickling of datastore_query.PropertyFilter is unsupported.')
362 def __eq__(self, other):
365 if self.__class__ is not other.__class__:
366 if other.__class__ is _PropertyRangeFilter:
367 return [self._filter] == other._to_pbs()
368 return NotImplemented
369 return self._filter == other._filter
372 class _PropertyRangeFilter(_SinglePropertyFilter):
373 """A filter predicate that represents a range of values.
375 Since we allow multi-valued properties there is a large difference between
376 "x > 0 AND x < 1" and "0 < x < 1." An entity with x = [-1, 2] will match the
377 first but not the second.
379 Since the datastore only allows a single inequality filter, multiple
380 in-equality filters are merged into a single range filter in the
381 datastore (unlike equality filters). This class is used by
382 datastore_query.CompositeFilter to implement the same logic.
385 _start_key_value = None
386 _end_key_value = None
388 @datastore_rpc._positional(1)
389 def __init__(self, start=None, start_incl=True, end=None, end_incl=True):
390 """Constructs a range filter using start and end properties.
392 Args:
393 start: A entity_pb.Property to use as a lower bound or None to indicate
394 no lower bound.
395 start_incl: A boolean that indicates if the lower bound is inclusive.
396 end: A entity_pb.Property to use as an upper bound or None to indicate
397 no upper bound.
398 end_incl: A boolean that indicates if the upper bound is inclusive.
400 if start is not None and not isinstance(start, entity_pb.Property):
401 raise datastore_errors.BadArgumentError(
402 'start argument should be entity_pb.Property (%r)' % (start,))
403 if end is not None and not isinstance(end, entity_pb.Property):
404 raise datastore_errors.BadArgumentError(
405 'start argument should be entity_pb.Property (%r)' % (end,))
406 if start and end and start.name() != end.name():
407 raise datastore_errors.BadArgumentError(
408 'start and end arguments must be on the same property (%s != %s)' %
409 (start.name(), end.name()))
410 if not start and not end:
411 raise datastore_errors.BadArgumentError(
412 'Unbounded ranges are not supported.')
414 super(_PropertyRangeFilter, self).__init__()
415 self._start = start
416 self._start_incl = start_incl
417 self._end = end
418 self._end_incl = end_incl
420 @classmethod
421 def from_property_filter(cls, prop_filter):
422 op = prop_filter._filter.op()
423 if op == datastore_pb.Query_Filter.GREATER_THAN:
424 return cls(start=prop_filter._filter.property(0), start_incl=False)
425 elif op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL:
426 return cls(start=prop_filter._filter.property(0))
427 elif op == datastore_pb.Query_Filter.LESS_THAN:
428 return cls(end=prop_filter._filter.property(0), end_incl=False)
429 elif op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL:
430 return cls(end=prop_filter._filter.property(0))
431 else:
432 raise datastore_errors.BadArgumentError(
433 'Unsupported operator (%s)' % (op,))
435 def intersect(self, other):
436 """Returns a filter representing the intersection of self and other."""
437 if isinstance(other, PropertyFilter):
438 other = self.from_property_filter(other)
439 elif not isinstance(other, _PropertyRangeFilter):
440 raise datastore_errors.BadArgumentError(
441 'other argument should be a _PropertyRangeFilter (%r)' % (other,))
443 if other._get_prop_name() != self._get_prop_name():
444 raise datastore_errors.BadArgumentError(
445 'other argument must be on the same property (%s != %s)' %
446 (other._get_prop_name(), self._get_prop_name()))
448 start_source = None
449 if other._start:
450 if self._start:
451 result = cmp(self._get_start_key_value(), other._get_start_key_value())
452 if result == 0:
453 result = cmp(other._start_incl, self._start_incl)
454 if result > 0:
455 start_source = self
456 elif result < 0:
457 start_source = other
458 else:
459 start_source = other
460 elif self._start:
461 start_source = self
463 end_source = None
464 if other._end:
465 if self._end:
466 result = cmp(self._get_end_key_value(), other._get_end_key_value())
467 if result == 0:
468 result = cmp(self._end_incl, other._end_incl)
469 if result < 0:
470 end_source = self
471 elif result > 0:
472 end_source = other
473 else:
474 end_source = other
475 elif self._end:
476 end_source = self
478 if start_source:
479 if end_source in (start_source, None):
480 return start_source
482 result = _PropertyRangeFilter(start=start_source._start,
483 start_incl=start_source._start_incl,
484 end=end_source._end,
485 end_incl=end_source._end_incl)
487 result._start_key_value = start_source._start_key_value
488 result._end_key_value = end_source._end_key_value
489 return result
490 else:
491 return end_source or self
493 def _get_start_key_value(self):
494 if self._start_key_value is None:
495 self._start_key_value = datastore_types.PropertyValueToKeyValue(
496 self._start.value())
497 return self._start_key_value
499 def _get_end_key_value(self):
500 if self._end_key_value is None:
501 self._end_key_value = datastore_types.PropertyValueToKeyValue(
502 self._end.value())
503 return self._end_key_value
505 def _apply_to_value(self, value):
506 """Apply the filter to the given value.
508 Args:
509 value: The comparable value to check.
511 Returns:
512 A boolean indicating if the given value matches the filter.
514 if self._start:
515 result = cmp(self._get_start_key_value(), value)
516 if result > 0 or (result == 0 and not self._start_incl):
517 return False
519 if self._end:
520 result = cmp(self._get_end_key_value(), value)
521 if result < 0 or (result == 0 and not self._end_incl):
522 return False
524 return True
526 def _get_prop_name(self):
527 if self._start:
528 return self._start.name()
529 if self._end:
530 return self._end.name()
531 assert False
533 def _to_pbs(self):
534 pbs = []
535 if self._start:
536 if self._start_incl:
537 op = datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL
538 else:
539 op = datastore_pb.Query_Filter.GREATER_THAN
540 pb = datastore_pb.Query_Filter()
541 pb.set_op(op)
542 pb.add_property().CopyFrom(self._start)
543 pbs.append(pb)
545 if self._end:
546 if self._end_incl:
547 op = datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL
548 else:
549 op = datastore_pb.Query_Filter.LESS_THAN
550 pb = datastore_pb.Query_Filter()
551 pb.set_op(op)
552 pb.add_property().CopyFrom(self._end)
553 pbs.append(pb)
555 return pbs
557 def __getstate__(self):
558 raise pickle.PicklingError(
559 'Pickling of %r is unsupported.' % self)
561 def __eq__(self, other):
564 if self.__class__ is not other.__class__:
565 return NotImplemented
566 return (self._start == other._start and
567 self._end == other._end and
568 (self._start_incl == other._start_incl or self._start is None) and
569 (self._end_incl == other._end_incl or self._end is None))
572 class _PropertyExistsFilter(FilterPredicate):
573 """A FilterPredicate that matches entities containing specific properties.
575 Only works as an in-memory filter. Used internally to filter out entities
576 that don't have all properties in a given Order.
579 def __init__(self, names):
580 super(_PropertyExistsFilter, self).__init__()
581 self._names = frozenset(names)
583 def _apply(self, value_map):
584 for name in self._names:
585 if not value_map.get(name):
586 return False
587 return True
589 def _get_prop_names(self):
590 return self._names
592 def _prune(self, _):
594 raise NotImplementedError
596 def __getstate__(self):
597 raise pickle.PicklingError(
598 'Pickling of %r is unsupported.' % self)
601 class CorrelationFilter(FilterPredicate):
602 """A filter that isolates correlated values and applies a sub-filter on them.
604 This filter assumes that every property used by the sub-filter should be
605 grouped before being passed to the sub-filter. The default grouping puts
606 each value in its own group. Consider:
607 e = {a: [1, 2], b: [2, 1, 3], c: 4}
609 A correlation filter with a sub-filter that operates on (a, b) will be tested
610 against the following 3 sets of values:
611 {a: 1, b: 2}
612 {a: 2, b: 1}
613 {b: 3}
615 In this case CorrelationFilter('a = 2 AND b = 2') won't match this entity but
616 CorrelationFilter('a = 2 AND b = 1') will. To apply an uncorrelated filter on
617 c, the filter must be applied in parallel to the correlation filter. For
618 example:
619 CompositeFilter(AND, [CorrelationFilter('a = 2 AND b = 1'), 'c = 3'])
621 If 'c = 3' was included in the correlation filter, c would be grouped as well.
622 This would result in the following values:
623 {a: 1, b: 2, c: 3}
624 {a: 2, b: 1}
625 {b: 3}
627 If any set of correlated values match the sub-filter then the entity matches
628 the correlation filter.
631 def __init__(self, subfilter):
632 """Constructor.
634 Args:
635 subfilter: A FilterPredicate to apply to the correlated values
637 self._subfilter = subfilter
639 @property
640 def subfilter(self):
641 return self._subfilter
643 def __repr__(self):
644 return '%s(%r)' % (self.__class__.__name__, self.subfilter)
646 def _apply(self, value_map):
649 base_map = dict((prop, []) for prop in self._get_prop_names())
652 value_maps = []
653 for prop in base_map:
655 grouped = self._group_values(prop, value_map[prop])
657 while len(value_maps) < len(grouped):
658 value_maps.append(base_map.copy())
660 for value, map in zip(grouped, value_maps):
661 map[prop] = value
663 return self._apply_correlated(value_maps)
665 def _apply_correlated(self, value_maps):
666 """Applies sub-filter to the correlated value maps.
668 The default implementation matches when any value_map in value_maps
669 matches the sub-filter.
671 Args:
672 value_maps: A list of correlated value_maps.
673 Returns:
674 True if any the entity matches the correlation filter.
677 for map in value_maps:
678 if self._subfilter._apply(map):
679 return True
680 return False
682 def _group_values(self, prop, values):
683 """A function that groups the given values.
685 Override this function to introduce custom grouping logic. The default
686 implementation assumes each value belongs in its own group.
688 Args:
689 prop: The name of the property who's values are being grouped.
690 values: A list of opaque values.
692 Returns:
693 A list of lists of grouped values.
695 return [[value] for value in values]
697 def _get_prop_names(self):
698 return self._subfilter._get_prop_names()
701 class CompositeFilter(FilterPredicate):
702 """An immutable filter predicate that combines other predicates.
704 This class proactively merges sub-filters that are combined using the same
705 operator. For example:
706 CompositeFilter(AND, [f1, f2, CompositeFilter(AND, [f3, f4]), f5, f6])
707 is equivalent to:
708 CompositeFilter(AND, [f1, f2, f3, f4, f5, f6])
710 Currently filters can only be combined using an AND operator.
713 AND = 'and'
714 _OPERATORS = frozenset([AND])
716 def __init__(self, op, filters):
717 """Constructor.
719 Args:
720 op: The operator to use to combine the given filters
721 filters: A list of one or more filters to combine
723 Raises:
724 datastore_errors.BadArgumentError if op is not in CompsiteFilter.OPERATORS
725 or filters is not a non-empty list containing only FilterPredicates.
727 if not op in self._OPERATORS:
728 raise datastore_errors.BadArgumentError('unknown operator (%s)' % (op,))
729 if not filters or not isinstance(filters, (list, tuple)):
730 raise datastore_errors.BadArgumentError(
731 'filters argument should be a non-empty list (%r)' % (filters,))
733 super(CompositeFilter, self).__init__()
734 self._op = op
735 flattened = []
738 for f in filters:
739 if isinstance(f, CompositeFilter) and f._op == self._op:
742 flattened.extend(f._filters)
743 elif isinstance(f, FilterPredicate):
744 flattened.append(f)
745 else:
746 raise datastore_errors.BadArgumentError(
747 'filters argument must be a list of FilterPredicates, found (%r)' %
748 (f,))
751 if op == self.AND:
752 filters = flattened
753 flattened = []
754 ineq_map = {}
756 for f in filters:
757 if (isinstance(f, _PropertyRangeFilter) or
758 (isinstance(f, PropertyFilter) and f._has_inequality())):
759 name = f._get_prop_name()
760 index = ineq_map.get(name)
761 if index is not None:
762 range_filter = flattened[index]
763 flattened[index] = range_filter.intersect(f)
764 else:
765 if isinstance(f, PropertyFilter):
766 range_filter = _PropertyRangeFilter.from_property_filter(f)
767 else:
768 range_filter = f
769 ineq_map[name] = len(flattened)
770 flattened.append(range_filter)
771 else:
772 flattened.append(f)
774 self._filters = tuple(flattened)
776 @property
777 def op(self):
778 return self._op
780 @property
781 def filters(self):
782 return self._filters
784 def __repr__(self):
785 op = self.op
786 if op == self.AND:
787 op = 'AND'
788 else:
789 op = str(op)
790 return '%s(%s, %r)' % (self.__class__.__name__, op, list(self.filters))
792 def _get_prop_names(self):
793 names = set()
794 for f in self._filters:
795 names |= f._get_prop_names()
796 return names
798 def _apply(self, value_map):
799 if self._op == self.AND:
800 for f in self._filters:
801 if not f._apply(value_map):
802 return False
803 return True
804 raise NotImplementedError
806 def _prune(self, value_map):
810 if self._op == self.AND:
822 matches = collections.defaultdict(set)
823 for f in self._filters:
824 props = f._get_prop_names()
825 local_value_map = dict((k, v) for k, v in value_map.iteritems()
826 if k in props)
828 if not f._prune(local_value_map):
829 return False
832 for (prop, values) in local_value_map.iteritems():
833 matches[prop].update(values)
836 for prop, value_set in matches.iteritems():
838 value_map[prop] = sorted(value_set)
839 return True
840 raise NotImplementedError
842 def _to_pbs(self):
843 """Returns the internal only pb representation."""
847 pbs = []
848 for f in self._filters:
849 pbs.extend(f._to_pbs())
850 return pbs
852 def __eq__(self, other):
853 if self.__class__ is other.__class__:
854 return super(CompositeFilter, self).__eq__(other)
857 if len(self._filters) == 1:
858 result = self._filters[0].__eq__(other)
859 if result is NotImplemented and hasattr(other, '__eq__'):
860 return other.__eq__(self._filters[0])
861 return result
862 return NotImplemented
865 class _IgnoreFilter(_SinglePropertyFilter):
866 """A filter that removes all entities with the given keys."""
868 def __init__(self, key_value_set):
869 super(_IgnoreFilter, self).__init__()
870 self._keys = key_value_set
872 def _get_prop_name(self):
873 return datastore_types.KEY_SPECIAL_PROPERTY
875 def _apply_to_value(self, value):
876 return value not in self._keys
879 class _DedupingFilter(_IgnoreFilter):
880 """A filter that removes duplicate keys."""
882 def __init__(self, key_value_set=None):
883 super(_DedupingFilter, self).__init__(key_value_set or set())
885 def _apply_to_value(self, value):
886 if super(_DedupingFilter, self)._apply_to_value(value):
887 self._keys.add(value)
888 return True
889 return False
892 class _BoundingCircleFilter(_SinglePropertyFilter):
893 """An immutable bounding circle filter for geo locations.
895 An immutable filter predicate that constrains a geo location property to a
896 bounding circle region. The filter is inclusive at the border. The property
897 has to be of type V3 PointValue. V4 GeoPoints converts to this type.
905 def __init__(self, property_name, latitude, longitude, radius_meters):
906 self._property_name = property_name
907 self._lat_lng = geo_util.LatLng(latitude, longitude)
908 self._radius_meters = radius_meters
910 if not radius_meters >= 0:
911 raise datastore_errors.BadArgumentError(
912 'invalid radius: %r' % radius_meters)
914 @classmethod
915 def _from_v4_pb(cls, bounding_circle_v4_pb):
916 return _BoundingCircleFilter(bounding_circle_v4_pb.property().name(),
917 bounding_circle_v4_pb.center().latitude(),
918 bounding_circle_v4_pb.center().longitude(),
919 bounding_circle_v4_pb.radius_meters())
921 def _get_prop_name(self):
922 return self._property_name
924 def _apply_to_value(self, value):
928 if value[0] != entity_pb.PropertyValue.kPointValueGroup:
929 return False
931 _, latitude, longitude = value
933 lat_lng = geo_util.LatLng(latitude, longitude)
936 return self._lat_lng - lat_lng <= self._radius_meters
939 class _BoundingBoxFilter(_SinglePropertyFilter):
940 """An immutable bounding box filter for geo locations.
942 An immutable filter predicate that constrains a geo location property to a
943 bounding box region. The filter is inclusive at the border. The property
944 has to be of type V3 PointValue. V4 GeoPoints converts to this type.
949 def __init__(self, property_name, southwest, northeast):
950 """Initializes a _BoundingBoxFilter.
952 Args:
953 property_name: the name of the property to filter on.
954 southwest: The south-west corner of the bounding box. The type is
955 datastore_types.GeoPt.
956 northeast: The north-east corner of the bounding box. The type is
957 datastore_types.GeoPt.
959 Raises:
960 datastore_errors.BadArgumentError if the south-west coordinate is on top
961 of the north-east coordinate.
965 if southwest.lat > northeast.lat:
966 raise datastore_errors.BadArgumentError(
967 'the south-west coordinate is on top of the north-east coordinate')
969 self._property_name = property_name
970 self._southwest = southwest
971 self._northeast = northeast
973 @classmethod
974 def _from_v4_pb(cls, bounding_box_v4_pb):
975 sw = datastore_types.GeoPt(bounding_box_v4_pb.southwest().latitude(),
976 bounding_box_v4_pb.southwest().longitude())
977 ne = datastore_types.GeoPt(bounding_box_v4_pb.northeast().latitude(),
978 bounding_box_v4_pb.northeast().longitude())
979 return _BoundingBoxFilter(bounding_box_v4_pb.property().name(), sw, ne)
981 def _get_prop_name(self):
982 return self._property_name
984 def _apply_to_value(self, value):
988 if value[0] != entity_pb.PropertyValue.kPointValueGroup:
989 return False
991 _, latitude, longitude = value
995 if not self._southwest.lat <= latitude <= self._northeast.lat:
996 return False
999 if self._southwest.lon > self._northeast.lon:
1000 return (longitude <= self._northeast.lon
1001 or longitude >= self._southwest.lon)
1002 else:
1003 return self._southwest.lon <= longitude <= self._northeast.lon
1006 class Order(_PropertyComponent):
1007 """A base class that represents a sort order on a query.
1009 All sub-classes must be immutable as these are often stored without creating a
1010 defensive copying.
1012 This class can be used as either the cmp or key arg in sorted() or
1013 list.sort(). To provide a stable ordering a trailing key ascending order is
1014 always used.
1017 @datastore_rpc._positional(1)
1018 def reversed(self, group_by=None):
1019 """Constructs an order representing the reverse of the current order.
1021 This function takes into account the effects of orders on properties not in
1022 the group_by clause of a query. For example, consider:
1023 SELECT A, First(B) ... GROUP BY A ORDER BY A, B
1024 Changing the order of B would effect which value is listed in the 'First(B)'
1025 column which would actually change the results instead of just reversing
1026 them.
1028 Args:
1029 group_by: If specified, only orders on properties in group_by will be
1030 reversed.
1032 Returns:
1033 A new order representing the reverse direction.
1035 raise NotImplementedError
1037 def _key(self, lhs_value_map):
1038 """Creates a key for the given value map."""
1039 raise NotImplementedError
1041 def _cmp(self, lhs_value_map, rhs_value_map):
1042 """Compares the given value maps."""
1043 raise NotImplementedError
1045 def _to_pb(self):
1046 """Internal only function to generate a filter pb."""
1047 raise NotImplementedError
1049 def key_for_filter(self, filter_predicate):
1050 if filter_predicate:
1051 return lambda x: self.key(x, filter_predicate)
1052 return self.key
1054 def cmp_for_filter(self, filter_predicate):
1055 if filter_predicate:
1056 return lambda x, y: self.cmp(x, y, filter_predicate)
1057 return self.cmp
1059 def key(self, entity, filter_predicate=None):
1060 """Constructs a "key" value for the given entity based on the current order.
1062 This function can be used as the key argument for list.sort() and sorted().
1064 Args:
1065 entity: The entity_pb.EntityProto to convert
1066 filter_predicate: A FilterPredicate used to prune values before comparing
1067 entities or None.
1069 Returns:
1070 A key value that identifies the position of the entity when sorted by
1071 the current order.
1073 names = self._get_prop_names()
1074 names.add(datastore_types.KEY_SPECIAL_PROPERTY)
1075 if filter_predicate is not None:
1076 names |= filter_predicate._get_prop_names()
1078 value_map = _make_key_value_map(entity, names)
1079 if filter_predicate is not None:
1080 filter_predicate._prune(value_map)
1081 return (self._key(value_map),
1082 value_map[datastore_types.KEY_SPECIAL_PROPERTY])
1084 def cmp(self, lhs, rhs, filter_predicate=None):
1085 """Compares the given values taking into account any filters.
1087 This function can be used as the cmp argument for list.sort() and sorted().
1089 This function is slightly more efficient that Order.key when comparing two
1090 entities, however it is much less efficient when sorting a list of entities.
1092 Args:
1093 lhs: An entity_pb.EntityProto
1094 rhs: An entity_pb.EntityProto
1095 filter_predicate: A FilterPredicate used to prune values before comparing
1096 entities or None.
1098 Returns:
1099 An integer <, = or > 0 representing the operator that goes in between lhs
1100 and rhs that to create a true statement.
1103 names = self._get_prop_names()
1104 if filter_predicate is not None:
1105 names |= filter_predicate._get_prop_names()
1107 lhs_value_map = _make_key_value_map(lhs, names)
1108 rhs_value_map = _make_key_value_map(rhs, names)
1109 if filter_predicate is not None:
1110 filter_predicate._prune(lhs_value_map)
1111 filter_predicate._prune(rhs_value_map)
1112 result = self._cmp(lhs_value_map, rhs_value_map)
1113 if result:
1114 return result
1116 if not lhs.has_key() and not rhs.has_key():
1117 return 0
1121 lhs_key = (lhs_value_map.get(datastore_types.KEY_SPECIAL_PROPERTY) or
1122 datastore_types.ReferenceToKeyValue(lhs.key()))
1123 rhs_key = (rhs_value_map.get(datastore_types.KEY_SPECIAL_PROPERTY) or
1124 datastore_types.ReferenceToKeyValue(rhs.key()))
1126 return cmp(lhs_key, rhs_key)
1129 class _ReverseOrder(_BaseComponent):
1130 """Reverses the comparison for the given object."""
1132 def __init__(self, obj):
1133 """Constructor for _ReverseOrder.
1135 Args:
1136 obj: Any comparable and hashable object.
1138 super(_ReverseOrder, self).__init__()
1139 self._obj = obj
1141 def __hash__(self):
1142 return hash(self._obj)
1144 def __cmp__(self, other):
1145 assert self.__class__ == other.__class__, (
1146 'A datastore_query._ReverseOrder object can only be compared to '
1147 'an object of the same type.')
1148 return -cmp(self._obj, other._obj)
1151 class PropertyOrder(Order):
1152 """An immutable class that represents a sort order for a single property."""
1154 ASCENDING = datastore_pb.Query_Order.ASCENDING
1155 DESCENDING = datastore_pb.Query_Order.DESCENDING
1156 _DIRECTIONS = frozenset([ASCENDING, DESCENDING])
1158 def __init__(self, prop, direction=ASCENDING):
1159 """Constructor.
1161 Args:
1162 prop: the name of the prop by which to sort.
1163 direction: the direction in which to sort the given prop.
1165 Raises:
1166 datastore_errors.BadArgumentError if the prop name or direction is
1167 invalid.
1169 datastore_types.ValidateString(prop,
1170 'prop',
1171 datastore_errors.BadArgumentError)
1172 if not direction in self._DIRECTIONS:
1173 raise datastore_errors.BadArgumentError('unknown direction: %r' %
1174 (direction,))
1175 super(PropertyOrder, self).__init__()
1176 self.__order = datastore_pb.Query_Order()
1177 self.__order.set_property(prop.encode('utf-8'))
1178 self.__order.set_direction(direction)
1180 @property
1181 def prop(self):
1182 return self.__order.property()
1184 @property
1185 def direction(self):
1186 return self.__order.direction()
1188 def __repr__(self):
1189 name = self.prop
1190 direction = self.direction
1191 extra = ''
1192 if direction == self.DESCENDING:
1193 extra = ', DESCENDING'
1194 name = repr(name).encode('utf-8')[1:-1]
1195 return '%s(<%s>%s)' % (self.__class__.__name__, name, extra)
1197 @datastore_rpc._positional(1)
1198 def reversed(self, group_by=None):
1199 if group_by and self.__order.property() not in group_by:
1200 return self
1202 if self.__order.direction() == self.ASCENDING:
1203 return PropertyOrder(self.__order.property().decode('utf-8'),
1204 self.DESCENDING)
1205 else:
1206 return PropertyOrder(self.__order.property().decode('utf-8'),
1207 self.ASCENDING)
1209 def _get_prop_names(self):
1210 return set([self.__order.property()])
1212 def _key(self, lhs_value_map):
1213 lhs_values = lhs_value_map[self.__order.property()]
1214 if not lhs_values:
1215 raise datastore_errors.BadArgumentError(
1216 'Missing value for property (%s)' % self.__order.property())
1218 if self.__order.direction() == self.ASCENDING:
1219 return min(lhs_values)
1220 else:
1221 return _ReverseOrder(max(lhs_values))
1223 def _cmp(self, lhs_value_map, rhs_value_map):
1224 lhs_values = lhs_value_map[self.__order.property()]
1225 rhs_values = rhs_value_map[self.__order.property()]
1227 if not lhs_values and not rhs_values:
1228 return 0
1230 if not lhs_values:
1231 raise datastore_errors.BadArgumentError(
1232 'LHS missing value for property (%s)' % self.__order.property())
1234 if not rhs_values:
1235 raise datastore_errors.BadArgumentError(
1236 'RHS missing value for property (%s)' % self.__order.property())
1238 if self.__order.direction() == self.ASCENDING:
1239 return cmp(min(lhs_values), min(rhs_values))
1240 else:
1241 return cmp(max(rhs_values), max(lhs_values))
1243 @classmethod
1244 def _from_pb(cls, order_pb):
1246 self = cls.__new__(cls)
1247 self.__order = order_pb
1248 return self
1250 def _to_pb(self):
1251 """Returns the internal only pb representation."""
1252 return self.__order
1254 def __getstate__(self):
1255 raise pickle.PicklingError(
1256 'Pickling of datastore_query.PropertyOrder is unsupported.')
1259 class CompositeOrder(Order):
1260 """An immutable class that represents a sequence of Orders.
1262 This class proactively flattens sub-orders that are of type CompositeOrder.
1263 For example:
1264 CompositeOrder([O1, CompositeOrder([02, 03]), O4])
1265 is equivalent to:
1266 CompositeOrder([O1, 02, 03, O4])
1269 def __init__(self, orders):
1270 """Constructor.
1272 Args:
1273 orders: A list of Orders which are applied in order.
1275 if not isinstance(orders, (list, tuple)):
1276 raise datastore_errors.BadArgumentError(
1277 'orders argument should be list or tuple (%r)' % (orders,))
1279 super(CompositeOrder, self).__init__()
1280 flattened = []
1281 for order in orders:
1282 if isinstance(order, CompositeOrder):
1283 flattened.extend(order._orders)
1284 elif isinstance(order, Order):
1285 flattened.append(order)
1286 else:
1287 raise datastore_errors.BadArgumentError(
1288 'orders argument should only contain Order (%r)' % (order,))
1289 self._orders = tuple(flattened)
1291 @property
1292 def orders(self):
1293 return self._orders
1295 def __repr__(self):
1296 return '%s(%r)' % (self.__class__.__name__, list(self.orders))
1298 @datastore_rpc._positional(1)
1299 def reversed(self, group_by=None):
1300 return CompositeOrder([order.reversed(group_by=group_by)
1301 for order in self._orders])
1303 def _get_prop_names(self):
1304 names = set()
1305 for order in self._orders:
1306 names |= order._get_prop_names()
1307 return names
1309 def _key(self, lhs_value_map):
1310 result = []
1311 for order in self._orders:
1312 result.append(order._key(lhs_value_map))
1313 return tuple(result)
1315 def _cmp(self, lhs_value_map, rhs_value_map):
1316 for order in self._orders:
1317 result = order._cmp(lhs_value_map, rhs_value_map)
1318 if result != 0:
1319 return result
1320 return 0
1322 def size(self):
1323 """Returns the number of sub-orders the instance contains."""
1324 return len(self._orders)
1326 def _to_pbs(self):
1327 """Returns an ordered list of internal only pb representations."""
1328 return [order._to_pb() for order in self._orders]
1330 def __eq__(self, other):
1331 if self.__class__ is other.__class__:
1332 return super(CompositeOrder, self).__eq__(other)
1335 if len(self._orders) == 1:
1336 result = self._orders[0].__eq__(other)
1337 if result is NotImplemented and hasattr(other, '__eq__'):
1338 return other.__eq__(self._orders[0])
1339 return result
1341 return NotImplemented
1344 class FetchOptions(datastore_rpc.Configuration):
1345 """An immutable class that contains all options for fetching results.
1347 These options apply to any request that pulls results from a query.
1349 This class reserves the right to define configuration options of any name
1350 except those that start with 'user_'. External subclasses should only define
1351 function or variables with names that start with in 'user_'.
1353 Options are set by passing keyword arguments to the constructor corresponding
1354 to the configuration options defined below and in datastore_rpc.Configuration.
1356 This object can be used as the default config for a datastore_rpc.Connection
1357 but in that case some options will be ignored, see option documentation below
1358 for details.
1361 @datastore_rpc.ConfigOption
1362 def produce_cursors(value):
1363 """If a Cursor should be returned with the fetched results.
1365 Raises:
1366 datastore_errors.BadArgumentError if value is not a bool.
1368 if not isinstance(value, bool):
1369 raise datastore_errors.BadArgumentError(
1370 'produce_cursors argument should be bool (%r)' % (value,))
1371 return value
1373 @datastore_rpc.ConfigOption
1374 def offset(value):
1375 """The number of results to skip before returning the first result.
1377 Only applies to the first request it is used with and is ignored if present
1378 on datastore_rpc.Connection.config.
1380 Raises:
1381 datastore_errors.BadArgumentError if value is not a integer or is less
1382 than zero.
1384 datastore_types.ValidateInteger(value,
1385 'offset',
1386 datastore_errors.BadArgumentError,
1387 zero_ok=True)
1388 return value
1390 @datastore_rpc.ConfigOption
1391 def batch_size(value):
1392 """The number of results to attempt to retrieve in a batch.
1394 Raises:
1395 datastore_errors.BadArgumentError if value is not a integer or is not
1396 greater than zero.
1398 datastore_types.ValidateInteger(value,
1399 'batch_size',
1400 datastore_errors.BadArgumentError)
1401 return value
1404 class QueryOptions(FetchOptions):
1405 """An immutable class that contains all options for running a query.
1407 This class contains options that control execution process (deadline,
1408 batch_size, read_policy, etc) and what part of the query results are returned
1409 (keys_only, projection, offset, limit, etc) Options that control the contents
1410 of the query results are specified on the datastore_query.Query directly.
1412 This class reserves the right to define configuration options of any name
1413 except those that start with 'user_'. External subclasses should only define
1414 function or variables with names that start with in 'user_'.
1416 Options are set by passing keyword arguments to the constructor corresponding
1417 to the configuration options defined below and in FetchOptions and
1418 datastore_rpc.Configuration.
1420 This object can be used as the default config for a datastore_rpc.Connection
1421 but in that case some options will be ignored, see below for details.
1425 ORDER_FIRST = datastore_pb.Query.ORDER_FIRST
1426 ANCESTOR_FIRST = datastore_pb.Query.ANCESTOR_FIRST
1427 FILTER_FIRST = datastore_pb.Query.FILTER_FIRST
1428 _HINTS = frozenset([ORDER_FIRST, ANCESTOR_FIRST, FILTER_FIRST])
1430 @datastore_rpc.ConfigOption
1431 def keys_only(value):
1432 """If the query should only return keys.
1434 Raises:
1435 datastore_errors.BadArgumentError if value is not a bool.
1437 if not isinstance(value, bool):
1438 raise datastore_errors.BadArgumentError(
1439 'keys_only argument should be bool (%r)' % (value,))
1440 return value
1442 @datastore_rpc.ConfigOption
1443 def projection(value):
1444 """A list or tuple of property names to project.
1446 If None, the entire entity is returned.
1448 Specifying a projection:
1449 - may change the index requirements for the given query;
1450 - will cause a partial entity to be returned;
1451 - will cause only entities that contain those properties to be returned;
1453 A partial entities only contain the property name and value for properties
1454 in the projection (meaning and multiple will not be set). They will also
1455 only contain a single value for any multi-valued property. However, if a
1456 multi-valued property is specified in the order, an inequality property, or
1457 the projected properties, the entity will be returned multiple times. Once
1458 for each unique combination of values.
1460 However, projection queries are significantly faster than normal queries.
1462 Raises:
1463 datastore_errors.BadArgumentError if value is empty or not a list or tuple
1464 of strings.
1466 if isinstance(value, list):
1467 value = tuple(value)
1468 elif not isinstance(value, tuple):
1469 raise datastore_errors.BadArgumentError(
1470 'projection argument should be a list or tuple (%r)' % (value,))
1471 if not value:
1472 raise datastore_errors.BadArgumentError(
1473 'projection argument cannot be empty')
1474 for prop in value:
1475 if not isinstance(prop, basestring):
1476 raise datastore_errors.BadArgumentError(
1477 'projection argument should contain only strings (%r)' % (prop,))
1479 return value
1481 @datastore_rpc.ConfigOption
1482 def limit(value):
1483 """Limit on the number of results to return.
1485 Raises:
1486 datastore_errors.BadArgumentError if value is not an integer or is less
1487 than zero.
1489 datastore_types.ValidateInteger(value,
1490 'limit',
1491 datastore_errors.BadArgumentError,
1492 zero_ok=True)
1493 return value
1495 @datastore_rpc.ConfigOption
1496 def prefetch_size(value):
1497 """Number of results to attempt to return on the initial request.
1499 Raises:
1500 datastore_errors.BadArgumentError if value is not an integer or is not
1501 greater than zero.
1503 datastore_types.ValidateInteger(value,
1504 'prefetch_size',
1505 datastore_errors.BadArgumentError,
1506 zero_ok=True)
1507 return value
1509 @datastore_rpc.ConfigOption
1510 def start_cursor(value):
1511 """Cursor to use a start position.
1513 Ignored if present on datastore_rpc.Connection.config.
1515 Raises:
1516 datastore_errors.BadArgumentError if value is not a Cursor.
1518 if not isinstance(value, Cursor):
1519 raise datastore_errors.BadArgumentError(
1520 'start_cursor argument should be datastore_query.Cursor (%r)' %
1521 (value,))
1522 return value
1524 @datastore_rpc.ConfigOption
1525 def end_cursor(value):
1526 """Cursor to use as an end position.
1528 Ignored if present on datastore_rpc.Connection.config.
1530 Raises:
1531 datastore_errors.BadArgumentError if value is not a Cursor.
1533 if not isinstance(value, Cursor):
1534 raise datastore_errors.BadArgumentError(
1535 'end_cursor argument should be datastore_query.Cursor (%r)' %
1536 (value,))
1537 return value
1539 @datastore_rpc.ConfigOption
1540 def hint(value):
1541 """Hint on how the datastore should plan the query.
1543 Raises:
1544 datastore_errors.BadArgumentError if value is not a known hint.
1546 if value not in QueryOptions._HINTS:
1547 raise datastore_errors.BadArgumentError('Unknown query hint (%r)' %
1548 (value,))
1549 return value
1552 class Cursor(_BaseComponent):
1553 """An immutable class that represents a relative position in a query.
1555 The position denoted by a Cursor is relative to a result in a query even
1556 if the result has been removed from the given query. Usually to position
1557 immediately after the last result returned by a batch.
1559 A cursor should only be used on a query with an identical signature to the
1560 one that produced it.
1563 @datastore_rpc._positional(1)
1564 def __init__(self, _cursor_pb=None, urlsafe=None, _cursor_bytes=None):
1565 """Constructor.
1567 A Cursor constructed with no arguments points the first result of any
1568 query. If such a Cursor is used as an end_cursor no results will ever be
1569 returned.
1573 super(Cursor, self).__init__()
1574 if ((urlsafe is not None) + (_cursor_pb is not None)
1575 + (_cursor_bytes is not None) > 1):
1576 raise datastore_errors.BadArgumentError(
1577 'Can only specify one of _cursor_pb, urlsafe, and _cursor_bytes')
1578 if urlsafe is not None:
1579 _cursor_bytes = self._urlsafe_to_bytes(urlsafe)
1580 if _cursor_pb is not None:
1581 if not isinstance(_cursor_pb, datastore_pb.CompiledCursor):
1582 raise datastore_errors.BadArgumentError(
1583 '_cursor_pb argument should be datastore_pb.CompiledCursor (%r)' %
1584 (_cursor_pb,))
1585 _cursor_bytes = _cursor_pb.Encode()
1586 if _cursor_bytes is not None:
1587 if _cursor_pb is None and urlsafe is None:
1591 Cursor._bytes_to_cursor_pb(_cursor_bytes)
1592 self.__cursor_bytes = _cursor_bytes
1593 else:
1594 self.__cursor_bytes = ''
1596 def __repr__(self):
1597 arg = self.to_websafe_string()
1598 if arg:
1599 arg = '<%s>' % arg
1600 return '%s(%s)' % (self.__class__.__name__, arg)
1603 def reversed(self):
1604 """Creates a cursor for use in a query with a reversed sort order."""
1605 compiled_cursor = self._to_pb()
1606 if compiled_cursor.has_position():
1607 pos = compiled_cursor.position()
1608 if pos.has_start_key():
1609 raise datastore_errors.BadRequestError('Cursor cannot be reversed.')
1610 pos.set_start_inclusive(not pos.start_inclusive())
1611 return Cursor(_cursor_pb=compiled_cursor)
1613 def to_bytes(self):
1614 """Serialize cursor as a byte string."""
1615 return self.__cursor_bytes
1617 @staticmethod
1618 def from_bytes(cursor):
1619 """Gets a Cursor given its byte string serialized form.
1621 The serialized form of a cursor may change in a non-backwards compatible
1622 way. In this case cursors must be regenerated from a new Query request.
1624 Args:
1625 cursor: A serialized cursor as returned by .to_bytes.
1627 Returns:
1628 A Cursor.
1630 Raises:
1631 datastore_errors.BadValueError if the cursor argument does not represent a
1632 serialized cursor.
1634 return Cursor(_cursor_bytes=cursor)
1637 @staticmethod
1638 def _bytes_to_cursor_pb(cursor):
1640 try:
1641 cursor_pb = datastore_pb.CompiledCursor(cursor)
1642 except (ValueError, TypeError), e:
1643 raise datastore_errors.BadValueError(
1644 'Invalid cursor (%r). Details: %s' % (cursor, e))
1645 except Exception, e:
1652 if e.__class__.__name__ == 'ProtocolBufferDecodeError':
1653 raise datastore_errors.BadValueError(
1654 'Invalid cursor %s. Details: %s' % (cursor, e))
1655 else:
1656 raise
1657 return cursor_pb
1659 def urlsafe(self):
1660 """Serialize cursor as a websafe string.
1662 Returns:
1663 A base64-encoded serialized cursor.
1665 return base64.urlsafe_b64encode(self.to_bytes())
1666 to_websafe_string = urlsafe
1668 @staticmethod
1669 def from_websafe_string(cursor):
1670 """Gets a Cursor given its websafe serialized form.
1672 The serialized form of a cursor may change in a non-backwards compatible
1673 way. In this case cursors must be regenerated from a new Query request.
1675 Args:
1676 cursor: A serialized cursor as returned by .to_websafe_string.
1678 Returns:
1679 A Cursor.
1681 Raises:
1682 datastore_errors.BadValueError if the cursor argument is not a string
1683 type of does not represent a serialized cursor.
1685 decoded_bytes = Cursor._urlsafe_to_bytes(cursor)
1686 return Cursor.from_bytes(decoded_bytes)
1688 @staticmethod
1689 def _urlsafe_to_bytes(cursor):
1691 if not isinstance(cursor, basestring):
1692 raise datastore_errors.BadValueError(
1693 'cursor argument should be str or unicode (%r)' % (cursor,))
1695 try:
1698 decoded_bytes = base64.b64decode(str(cursor).replace('-', '+').replace('_', '/'))
1699 except (ValueError, TypeError), e:
1700 raise datastore_errors.BadValueError(
1701 'Invalid cursor %s. Details: %s' % (cursor, e))
1702 return decoded_bytes
1704 @staticmethod
1705 def _from_query_result(query_result):
1706 if query_result.has_compiled_cursor():
1707 return Cursor(_cursor_pb=query_result.compiled_cursor())
1708 return None
1710 def advance(self, offset, query, conn):
1711 """Advances a Cursor by the given offset.
1713 Args:
1714 offset: The amount to advance the current query.
1715 query: A Query identical to the one this cursor was created from.
1716 conn: The datastore_rpc.Connection to use.
1718 Returns:
1719 A new cursor that is advanced by offset using the given query.
1721 datastore_types.ValidateInteger(offset,
1722 'offset',
1723 datastore_errors.BadArgumentError)
1724 if not isinstance(query, Query):
1725 raise datastore_errors.BadArgumentError(
1726 'query argument should be datastore_query.Query (%r)' % (query,))
1728 query_options = QueryOptions(
1729 start_cursor=self, offset=offset, limit=0, produce_cursors=True)
1730 return query.run(conn, query_options).next_batch(
1731 Batcher.AT_LEAST_OFFSET).cursor(0)
1734 def _to_pb(self):
1735 """Returns the internal only pb representation."""
1736 return Cursor._bytes_to_cursor_pb(self.__cursor_bytes)
1738 def __setstate__(self, state):
1739 if '_Cursor__compiled_cursor' in state:
1741 self.__cursor_bytes = state['_Cursor__compiled_cursor'].Encode()
1742 else:
1743 self.__dict__ = state
1746 class _QueryKeyFilter(_BaseComponent):
1747 """A class that implements the key filters available on a Query."""
1749 @datastore_rpc._positional(1)
1750 def __init__(self, app=None, namespace=None, kind=None, ancestor=None):
1751 """Constructs a _QueryKeyFilter.
1753 If app/namespace and ancestor are not defined, the app/namespace set in the
1754 environment is used.
1756 Args:
1757 app: a string representing the required app id or None.
1758 namespace: a string representing the required namespace or None.
1759 kind: a string representing the required kind or None.
1760 ancestor: a entity_pb.Reference representing the required ancestor or
1761 None.
1763 Raises:
1764 datastore_erros.BadArgumentError if app and ancestor.app() do not match or
1765 an unexpected type is passed in for any argument.
1767 if kind is not None:
1768 datastore_types.ValidateString(
1769 kind, 'kind', datastore_errors.BadArgumentError)
1771 if ancestor is not None:
1772 if not isinstance(ancestor, entity_pb.Reference):
1773 raise datastore_errors.BadArgumentError(
1774 'ancestor argument should be entity_pb.Reference (%r)' %
1775 (ancestor,))
1776 if app is None:
1777 app = ancestor.app()
1778 elif app != ancestor.app():
1779 raise datastore_errors.BadArgumentError(
1780 'ancestor argument should match app ("%r" != "%r")' %
1781 (ancestor.app(), app))
1783 if namespace is None:
1784 namespace = ancestor.name_space()
1785 elif namespace != ancestor.name_space():
1786 raise datastore_errors.BadArgumentError(
1787 'ancestor argument should match namespace ("%r" != "%r")' %
1788 (ancestor.name_space(), namespace))
1790 pb = entity_pb.Reference()
1791 pb.CopyFrom(ancestor)
1792 ancestor = pb
1793 self.__ancestor = ancestor
1794 self.__path = ancestor.path().element_list()
1795 else:
1796 self.__ancestor = None
1797 self.__path = None
1799 super(_QueryKeyFilter, self).__init__()
1800 self.__app = datastore_types.ResolveAppId(app).encode('utf-8')
1801 self.__namespace = (
1802 datastore_types.ResolveNamespace(namespace).encode('utf-8'))
1803 self.__kind = kind and kind.encode('utf-8')
1805 @property
1806 def app(self):
1807 return self.__app
1809 @property
1810 def namespace(self):
1811 return self.__namespace
1813 @property
1814 def kind(self):
1815 return self.__kind
1817 @property
1818 def ancestor(self):
1820 return self.__ancestor
1822 def __call__(self, entity_or_reference):
1823 """Apply the filter.
1825 Accepts either an entity or a reference to avoid the need to extract keys
1826 from entities when we have a list of entities (which is a common case).
1828 Args:
1829 entity_or_reference: Either an entity_pb.EntityProto or
1830 entity_pb.Reference.
1832 if isinstance(entity_or_reference, entity_pb.Reference):
1833 key = entity_or_reference
1834 elif isinstance(entity_or_reference, entity_pb.EntityProto):
1835 key = entity_or_reference.key()
1836 else:
1837 raise datastore_errors.BadArgumentError(
1838 'entity_or_reference argument must be an entity_pb.EntityProto ' +
1839 'or entity_pb.Reference (%r)' % (entity_or_reference))
1840 return (key.app() == self.__app and
1841 key.name_space() == self.__namespace and
1842 (not self.__kind or
1843 key.path().element_list()[-1].type() == self.__kind) and
1844 (not self.__path or
1845 key.path().element_list()[0:len(self.__path)] == self.__path))
1847 def _to_pb(self):
1848 pb = datastore_pb.Query()
1850 pb.set_app(self.__app)
1851 datastore_types.SetNamespace(pb, self.__namespace)
1852 if self.__kind is not None:
1853 pb.set_kind(self.__kind)
1854 if self.__ancestor:
1855 ancestor = pb.mutable_ancestor()
1856 ancestor.CopyFrom(self.__ancestor)
1858 return pb
1861 class _BaseQuery(_BaseComponent):
1862 """A base class for query implementations."""
1864 def run(self, conn, query_options=None):
1865 """Runs the query using provided datastore_rpc.Connection.
1867 Args:
1868 conn: The datastore_rpc.Connection to use
1869 query_options: Optional query options to use
1871 Returns:
1872 A Batcher that implicitly fetches query results asynchronously.
1874 Raises:
1875 datastore_errors.BadArgumentError if any of the arguments are invalid.
1877 return Batcher(query_options, self.run_async(conn, query_options))
1879 def run_async(self, conn, query_options=None):
1880 """Runs the query using the provided datastore_rpc.Connection.
1882 Args:
1883 conn: the datastore_rpc.Connection on which to run the query.
1884 query_options: Optional QueryOptions with which to run the query.
1886 Returns:
1887 An async object that can be used to grab the first Batch. Additional
1888 batches can be retrieved by calling Batch.next_batch/next_batch_async.
1890 Raises:
1891 datastore_errors.BadArgumentError if any of the arguments are invalid.
1893 raise NotImplementedError
1895 def __getstate__(self):
1896 raise pickle.PicklingError(
1897 'Pickling of %r is unsupported.' % self)
1900 class Query(_BaseQuery):
1901 """An immutable class that represents a query signature.
1903 A query signature consists of a source of entities (specified as app,
1904 namespace and optionally kind and ancestor) as well as a FilterPredicate,
1905 grouping and a desired ordering.
1908 @datastore_rpc._positional(1)
1909 def __init__(self, app=None, namespace=None, kind=None, ancestor=None,
1910 filter_predicate=None, group_by=None, order=None):
1911 """Constructor.
1913 Args:
1914 app: Optional app to query, derived from the environment if not specified.
1915 namespace: Optional namespace to query, derived from the environment if
1916 not specified.
1917 kind: Optional kind to query.
1918 ancestor: Optional ancestor to query, an entity_pb.Reference.
1919 filter_predicate: Optional FilterPredicate by which to restrict the query.
1920 order: Optional Order in which to return results.
1921 group_by: Optional list of properties to group the results by.
1923 Raises:
1924 datastore_errors.BadArgumentError if any argument is invalid.
1926 super(Query, self).__init__()
1929 if filter_predicate is not None and not isinstance(filter_predicate,
1930 FilterPredicate):
1931 raise datastore_errors.BadArgumentError(
1932 'filter_predicate should be datastore_query.FilterPredicate (%r)' %
1933 (filter_predicate,))
1936 if isinstance(order, CompositeOrder):
1937 if order.size() == 0:
1938 order = None
1939 elif isinstance(order, Order):
1940 order = CompositeOrder([order])
1941 elif order is not None:
1942 raise datastore_errors.BadArgumentError(
1943 'order should be Order (%r)' % (order,))
1946 if group_by is not None:
1947 if isinstance(group_by, list):
1948 group_by = tuple(group_by)
1949 elif not isinstance(group_by, tuple):
1950 raise datastore_errors.BadArgumentError(
1951 'group_by argument should be a list or tuple (%r)' % (group_by,))
1952 if not group_by:
1953 raise datastore_errors.BadArgumentError(
1954 'group_by argument cannot be empty')
1955 for prop in group_by:
1956 if not isinstance(prop, basestring):
1957 raise datastore_errors.BadArgumentError(
1958 'group_by argument should contain only strings (%r)' % (prop,))
1960 self._key_filter = _QueryKeyFilter(app=app, namespace=namespace, kind=kind,
1961 ancestor=ancestor)
1962 self._order = order
1963 self._filter_predicate = filter_predicate
1964 self._group_by = group_by
1966 @property
1967 def app(self):
1968 return self._key_filter.app
1970 @property
1971 def namespace(self):
1972 return self._key_filter.namespace
1974 @property
1975 def kind(self):
1976 return self._key_filter.kind
1978 @property
1979 def ancestor(self):
1980 return self._key_filter.ancestor
1982 @property
1983 def filter_predicate(self):
1984 return self._filter_predicate
1986 @property
1987 def order(self):
1988 return self._order
1990 @property
1991 def group_by(self):
1992 return self._group_by
1994 def __repr__(self):
1995 args = []
1996 args.append('app=%r' % self.app)
1997 ns = self.namespace
1998 if ns:
1999 args.append('namespace=%r' % ns)
2000 kind = self.kind
2001 if kind is not None:
2002 args.append('kind=%r' % kind)
2003 ancestor = self.ancestor
2004 if ancestor is not None:
2005 websafe = base64.urlsafe_b64encode(ancestor.Encode())
2006 args.append('ancestor=<%s>' % websafe)
2007 filter_predicate = self.filter_predicate
2008 if filter_predicate is not None:
2009 args.append('filter_predicate=%r' % filter_predicate)
2010 order = self.order
2011 if order is not None:
2012 args.append('order=%r' % order)
2013 group_by = self.group_by
2014 if group_by is not None:
2015 args.append('group_by=%r' % (group_by,))
2016 return '%s(%s)' % (self.__class__.__name__, ', '.join(args))
2018 def run_async(self, conn, query_options=None):
2019 if not isinstance(conn, datastore_rpc.BaseConnection):
2020 raise datastore_errors.BadArgumentError(
2021 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn,))
2023 if not QueryOptions.is_configuration(query_options):
2026 query_options = QueryOptions(config=query_options)
2028 start_cursor = query_options.start_cursor
2029 if not start_cursor and query_options.produce_cursors:
2030 start_cursor = Cursor()
2032 req = self._to_pb(conn, query_options)
2033 return Batch.create_async(self, query_options, conn, req,
2034 start_cursor=start_cursor)
2036 @classmethod
2037 def _from_pb(cls, query_pb):
2038 kind = query_pb.has_kind() and query_pb.kind().decode('utf-8') or None
2039 ancestor = query_pb.has_ancestor() and query_pb.ancestor() or None
2041 filter_predicate = None
2042 if query_pb.filter_size() > 0:
2043 filter_predicate = CompositeFilter(
2044 CompositeFilter.AND,
2045 [PropertyFilter._from_pb(filter_pb)
2046 for filter_pb in query_pb.filter_list()])
2048 order = None
2049 if query_pb.order_size() > 0:
2050 order = CompositeOrder([PropertyOrder._from_pb(order_pb)
2051 for order_pb in query_pb.order_list()])
2053 group_by = None
2054 if query_pb.group_by_property_name_size() > 0:
2055 group_by = tuple(name.decode('utf-8')
2056 for name in query_pb.group_by_property_name_list())
2058 return Query(app=query_pb.app().decode('utf-8'),
2059 namespace=query_pb.name_space().decode('utf-8'),
2060 kind=kind,
2061 ancestor=ancestor,
2062 filter_predicate=filter_predicate,
2063 order=order,
2064 group_by=group_by)
2066 def _to_pb(self, conn, query_options):
2067 """Returns the internal only pb representation."""
2068 pb = self._key_filter._to_pb()
2071 if self._filter_predicate:
2072 for f in self._filter_predicate._to_pbs():
2073 pb.add_filter().CopyFrom(f)
2076 if self._order:
2077 for order in self._order._to_pbs():
2078 pb.add_order().CopyFrom(order)
2081 if self._group_by:
2082 pb.group_by_property_name_list().extend(self._group_by)
2085 if QueryOptions.keys_only(query_options, conn.config):
2086 pb.set_keys_only(True)
2088 projection = QueryOptions.projection(query_options, conn.config)
2089 if projection:
2090 if self._group_by:
2091 extra = set(projection) - set(self._group_by)
2092 if extra:
2093 raise datastore_errors.BadRequestError(
2094 'projections includes properties not in the group_by argument: %s'
2095 % extra)
2096 pb.property_name_list().extend(projection)
2097 elif self._group_by:
2098 raise datastore_errors.BadRequestError(
2099 'cannot specify group_by without a projection')
2101 if QueryOptions.produce_cursors(query_options, conn.config):
2102 pb.set_compile(True)
2104 limit = QueryOptions.limit(query_options, conn.config)
2105 if limit is not None:
2106 pb.set_limit(limit)
2108 count = QueryOptions.prefetch_size(query_options, conn.config)
2109 if count is None:
2110 count = QueryOptions.batch_size(query_options, conn.config)
2111 if count is not None:
2112 pb.set_count(count)
2115 if query_options.offset:
2116 pb.set_offset(query_options.offset)
2119 if query_options.start_cursor is not None:
2120 pb.mutable_compiled_cursor().CopyFrom(query_options.start_cursor._to_pb())
2123 if query_options.end_cursor is not None:
2124 pb.mutable_end_compiled_cursor().CopyFrom(
2125 query_options.end_cursor._to_pb())
2128 if ((query_options.hint == QueryOptions.ORDER_FIRST and pb.order_size()) or
2129 (query_options.hint == QueryOptions.ANCESTOR_FIRST and
2130 pb.has_ancestor()) or
2131 (query_options.hint == QueryOptions.FILTER_FIRST and
2132 pb.filter_size() > 0)):
2133 pb.set_hint(query_options.hint)
2136 conn._set_request_read_policy(pb, query_options)
2137 conn._set_request_transaction(pb)
2139 return pb
2142 def apply_query(query, entities):
2143 """Performs the given query on a set of in-memory entities.
2145 This function can perform queries impossible in the datastore (e.g a query
2146 with multiple inequality filters on different properties) because all
2147 operations are done in memory. For queries that can also be executed on the
2148 the datastore, the results produced by this function may not use the same
2149 implicit ordering as the datastore. To ensure compatibility, explicit
2150 ordering must be used (e.g. 'ORDER BY ineq_prop, ..., __key__').
2152 Order by __key__ should always be used when a consistent result is desired
2153 (unless there is a sort order on another globally unique property).
2155 Args:
2156 query: a datastore_query.Query to apply
2157 entities: a list of entity_pb.EntityProto on which to apply the query.
2159 Returns:
2160 A list of entity_pb.EntityProto contain the results of the query.
2162 if not isinstance(query, Query):
2163 raise datastore_errors.BadArgumentError(
2164 "query argument must be a datastore_query.Query (%r)" % (query,))
2166 if not isinstance(entities, list):
2167 raise datastore_errors.BadArgumentError(
2168 "entities argument must be a list (%r)" % (entities,))
2170 filtered_entities = filter(query._key_filter, entities)
2172 if not query._order:
2177 if query._filter_predicate:
2178 return filter(query._filter_predicate, filtered_entities)
2179 return filtered_entities
2185 names = query._order._get_prop_names()
2186 if query._filter_predicate:
2187 names |= query._filter_predicate._get_prop_names()
2190 exists_filter = _PropertyExistsFilter(names)
2192 value_maps = []
2193 for entity in filtered_entities:
2194 value_map = _make_key_value_map(entity, names)
2198 if exists_filter._apply(value_map) and (
2199 not query._filter_predicate or
2200 query._filter_predicate._prune(value_map)):
2201 value_map['__entity__'] = entity
2202 value_maps.append(value_map)
2204 value_maps.sort(query._order._cmp)
2205 return [value_map['__entity__'] for value_map in value_maps]
2208 class _AugmentedQuery(_BaseQuery):
2209 """A query that combines a datastore query with in-memory filters/results."""
2211 @datastore_rpc._positional(2)
2212 def __init__(self, query, in_memory_results=None, in_memory_filter=None,
2213 max_filtered_count=None):
2214 """Constructor for _AugmentedQuery.
2216 Do not call directly. Use the utility functions instead (e.g.
2217 datastore_query.inject_results)
2219 Args:
2220 query: A datastore_query.Query object to augment.
2221 in_memory_results: a list of pre- sorted and filtered result to add to the
2222 stream of datastore results or None .
2223 in_memory_filter: a set of in-memory filters to apply to the datastore
2224 results or None.
2225 max_filtered_count: the maximum number of datastore entities that will be
2226 filtered out by in_memory_filter if known.
2228 if not isinstance(query, Query):
2229 raise datastore_errors.BadArgumentError(
2230 'query argument should be datastore_query.Query (%r)' % (query,))
2231 if (in_memory_filter is not None and
2232 not isinstance(in_memory_filter, FilterPredicate)):
2233 raise datastore_errors.BadArgumentError(
2234 'in_memory_filter argument should be ' +
2235 'datastore_query.FilterPredicate (%r)' % (in_memory_filter,))
2236 if (in_memory_results is not None and
2237 not isinstance(in_memory_results, list)):
2238 raise datastore_errors.BadArgumentError(
2239 'in_memory_results argument should be a list of' +
2240 'datastore_pv.EntityProto (%r)' % (in_memory_results,))
2241 datastore_types.ValidateInteger(max_filtered_count,
2242 'max_filtered_count',
2243 empty_ok=True,
2244 zero_ok=True)
2245 self._query = query
2246 self._max_filtered_count = max_filtered_count
2247 self._in_memory_filter = in_memory_filter
2248 self._in_memory_results = in_memory_results
2250 @property
2251 def app(self):
2252 return self._query._key_filter.app
2254 @property
2255 def namespace(self):
2256 return self._query._key_filter.namespace
2258 @property
2259 def kind(self):
2260 return self._query._key_filter.kind
2262 @property
2263 def ancestor(self):
2264 return self._query._key_filter.ancestor
2266 @property
2267 def filter_predicate(self):
2268 return self._query._filter_predicate
2270 @property
2271 def order(self):
2272 return self._query._order
2274 @property
2275 def group_by(self):
2276 return self._query._group_by
2278 def run_async(self, conn, query_options=None):
2279 if not isinstance(conn, datastore_rpc.BaseConnection):
2280 raise datastore_errors.BadArgumentError(
2281 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn,))
2283 if not QueryOptions.is_configuration(query_options):
2286 query_options = QueryOptions(config=query_options)
2288 if self._query._order:
2291 changes = {'keys_only': False}
2292 else:
2293 changes = {}
2295 if self._in_memory_filter or self._in_memory_results:
2299 in_memory_offset = query_options.offset
2300 in_memory_limit = query_options.limit
2302 if in_memory_limit is not None:
2303 if self._in_memory_filter is None:
2305 changes['limit'] = in_memory_limit
2306 elif self._max_filtered_count is not None:
2309 changes['limit'] = in_memory_limit + self._max_filtered_count
2310 else:
2312 changes['limit'] = None
2314 if in_memory_offset:
2316 changes['offset'] = None
2317 if changes.get('limit', None) is not None:
2318 changes['limit'] += in_memory_offset
2319 else:
2320 in_memory_offset = None
2321 else:
2322 in_memory_offset = None
2323 in_memory_limit = None
2325 req = self._query._to_pb(
2326 conn, QueryOptions(config=query_options, **changes))
2328 start_cursor = query_options.start_cursor
2329 if not start_cursor and query_options.produce_cursors:
2330 start_cursor = Cursor()
2332 return _AugmentedBatch.create_async(self, query_options, conn, req,
2333 in_memory_offset=in_memory_offset,
2334 in_memory_limit=in_memory_limit,
2335 start_cursor=start_cursor)
2338 @datastore_rpc._positional(1)
2339 def inject_results(query, updated_entities=None, deleted_keys=None):
2340 """Creates a query object that will inject changes into results.
2342 Args:
2343 query: The datastore_query.Query to augment
2344 updated_entities: A list of entity_pb.EntityProto's that have been updated
2345 and should take priority over any values returned by query.
2346 deleted_keys: A list of entity_pb.Reference's for entities that have been
2347 deleted and should be removed from query results.
2349 Returns:
2350 A datastore_query.AugmentedQuery if in memory filtering is requred,
2351 query otherwise.
2353 if not isinstance(query, Query):
2354 raise datastore_errors.BadArgumentError(
2355 'query argument should be datastore_query.Query (%r)' % (query,))
2357 overriden_keys = set()
2359 if deleted_keys is not None:
2360 if not isinstance(deleted_keys, list):
2361 raise datastore_errors.BadArgumentError(
2362 'deleted_keys argument must be a list (%r)' % (deleted_keys,))
2363 deleted_keys = filter(query._key_filter, deleted_keys)
2364 for key in deleted_keys:
2365 overriden_keys.add(datastore_types.ReferenceToKeyValue(key))
2367 if updated_entities is not None:
2368 if not isinstance(updated_entities, list):
2369 raise datastore_errors.BadArgumentError(
2370 'updated_entities argument must be a list (%r)' % (updated_entities,))
2373 updated_entities = filter(query._key_filter, updated_entities)
2374 for entity in updated_entities:
2375 overriden_keys.add(datastore_types.ReferenceToKeyValue(entity.key()))
2377 updated_entities = apply_query(query, updated_entities)
2378 else:
2379 updated_entities = []
2381 if not overriden_keys:
2382 return query
2384 return _AugmentedQuery(query,
2385 in_memory_filter=_IgnoreFilter(overriden_keys),
2386 in_memory_results=updated_entities,
2387 max_filtered_count=len(overriden_keys))
2390 class _BatchShared(object):
2391 """Data shared among the batches of a query."""
2393 def __init__(self, query, query_options, conn, augmented_query=None):
2394 self.__query = query
2395 self.__query_options = query_options
2396 self.__conn = conn
2397 self.__augmented_query = augmented_query
2398 self.__was_first_result_processed = False
2400 @property
2401 def query(self):
2402 return self.__query
2404 @property
2405 def query_options(self):
2406 return self.__query_options
2408 @property
2409 def conn(self):
2410 return self.__conn
2412 @property
2413 def augmented_query(self):
2414 return self.__augmented_query
2416 @property
2417 def keys_only(self):
2418 return self.__keys_only
2420 @property
2421 def compiled_query(self):
2422 return self.__compiled_query
2424 @property
2425 def index_list(self):
2426 """Returns the list of indexes used by the query.
2427 Possibly None when the adapter does not implement pb_to_index.
2429 return self.__index_list
2431 def process_query_result_if_first(self, query_result):
2432 if not self.__was_first_result_processed:
2433 self.__was_first_result_processed = True
2434 self.__keys_only = query_result.keys_only()
2435 if query_result.has_compiled_query():
2436 self.__compiled_query = query_result.compiled_query
2437 else:
2438 self.__compiled_query = None
2439 try:
2440 self.__index_list = [self.__conn.adapter.pb_to_index(index_pb)
2441 for index_pb in query_result.index_list()]
2442 except NotImplementedError:
2444 self.__index_list = None
2447 class Batch(object):
2448 """A batch of results returned by a query.
2450 This class contains a batch of results returned from the datastore and
2451 relevant metadata. This metadata includes:
2452 query: The query that produced this batch
2453 query_options: The QueryOptions used to run the query. This does not
2454 contained any options passed to the .next_batch() call that created the
2455 current batch.
2456 start_cursor, end_cursor: These are the cursors that can be used
2457 with a query to re-fetch this batch. They can also be used to
2458 find all entities before or after the given batch (by use start_cursor as
2459 an end cursor or vice versa). start_cursor can also be advanced to
2460 point to a position within the batch using Cursor.advance().
2461 skipped_results: the number of result skipped because of the offset
2462 given to the request that generated it. This can be set either on
2463 the original Query.run() request or in subsequent .next_batch() calls.
2464 more_results: If this is true there are more results that can be retrieved
2465 either by .next_batch() or Batcher.next().
2467 This class is also able to fetch the next batch of the query using
2468 .next_batch(). As batches of results must be fetched serially, .next_batch()
2469 can only be called once. Additional calls to .next_batch() will return None.
2470 When there are no more batches .next_batch() will return None as well. Note
2471 that batches returned by iterating over Batcher will always return None for
2472 .next_batch() as the Bather handles fetching the next batch automatically.
2474 A Batch typically represents the result of a single RPC request. The datastore
2475 operates on a "best effort" basis so the batch returned by .next_batch()
2476 or Query.run_async().get_result() may not have satisfied the requested offset
2477 or number of results (specified through FetchOptions.offset and
2478 FetchOptions.batch_size respectively). To satisfy these restrictions
2479 additional batches may be needed (with FetchOptions that specify the remaining
2480 offset or results needed). The Batcher class hides these limitations.
2483 __skipped_cursor = None
2485 @classmethod
2486 @datastore_rpc._positional(5)
2487 def create_async(cls, query, query_options, conn, req,
2488 start_cursor):
2489 batch_shared = _BatchShared(query, query_options, conn)
2490 batch0 = cls(batch_shared, start_cursor=start_cursor)
2491 return batch0._make_query_result_rpc_call('RunQuery', query_options, req)
2493 @datastore_rpc._positional(2)
2494 def __init__(self, batch_shared, start_cursor=Cursor()):
2495 """Constructor.
2497 This class is constructed in stages (one when an RPC is sent and another
2498 when an rpc is completed) and should not be constructed directly!!
2499 Use Query.run_async().get_result() to create a Batch or Query.run()
2500 to use a batcher.
2502 This constructor does not perform verification.
2504 Args:
2505 batch_shared: Data shared between batches for a a single query run.
2506 start_cursor: Optional cursor pointing before this batch.
2510 self._batch_shared = batch_shared
2511 self.__start_cursor = start_cursor
2513 @property
2514 def query_options(self):
2515 """The QueryOptions used to retrieve the first batch."""
2516 return self._batch_shared.query_options
2518 @property
2519 def query(self):
2520 """The query the current batch came from."""
2521 return self._batch_shared.query
2523 @property
2524 def results(self):
2525 """A list of entities in this batch."""
2526 return self.__results
2528 @property
2529 def keys_only(self):
2530 """Whether the entities in this batch only contain keys."""
2531 return self._batch_shared.keys_only
2533 @property
2534 def index_list(self):
2535 """Returns the list of indexes used to peform this batch's query.
2536 Possibly None when the adapter does not implement pb_to_index.
2538 return self._batch_shared.index_list
2540 @property
2541 def start_cursor(self):
2542 """A cursor that points to the position just before the current batch."""
2543 return self.__start_cursor
2545 @property
2546 def end_cursor(self):
2547 """A cursor that points to the position just after the current batch."""
2548 return self.__end_cursor
2550 @property
2551 def skipped_results(self):
2552 """The number of results skipped because of an offset in the request.
2554 An offset is satisfied before any results are returned. The start_cursor
2555 points to the position in the query before the skipped results.
2557 return self._skipped_results
2559 @property
2560 def more_results(self):
2561 """Whether more results can be retrieved from the query."""
2562 return self.__more_results
2564 def next_batch(self, fetch_options=None):
2565 """Synchronously get the next batch or None if there are no more batches.
2567 Args:
2568 fetch_options: Optional fetch options to use when fetching the next batch.
2569 Merged with both the fetch options on the original call and the
2570 connection.
2572 Returns:
2573 A new Batch of results or None if either the next batch has already been
2574 fetched or there are no more results.
2576 async = self.next_batch_async(fetch_options)
2577 if async is None:
2578 return None
2579 return async.get_result()
2581 def _compiled_query(self):
2582 return self._batch_shared.compiled_query
2584 def cursor(self, index):
2585 """Gets the cursor that points just after the result at index - 1.
2587 The index is relative to first result in .results. Since start_cursor
2588 points to the position before the first skipped result, the range of
2589 indexes this function supports is limited to
2590 [-skipped_results, len(results)].
2592 For example, using start_cursor=batch.cursor(i) and
2593 end_cursor=batch.cursor(j) will return the results found in
2594 batch.results[i:j]. Note that any result added in the range (i-1, j]
2595 will appear in the new query's results.
2597 Warning: Any index in the range (-skipped_results, 0) may cause
2598 continuation to miss or duplicate results if outside a transaction.
2600 Args:
2601 index: An int, the index relative to the first result before which the
2602 cursor should point.
2604 Returns:
2605 A Cursor that points to a position just after the result index - 1,
2606 which if used as a start_cursor will cause the first result to be
2607 batch.result[index].
2609 if not isinstance(index, (int, long)):
2610 raise datastore_errors.BadArgumentError(
2611 'index argument should be entity_pb.Reference (%r)' % (index,))
2612 if not -self._skipped_results <= index <= len(self.__results):
2613 raise datastore_errors.BadArgumentError(
2614 'index argument must be in the inclusive range [%d, %d]' %
2615 (-self._skipped_results, len(self.__results)))
2617 if index == -self._skipped_results:
2618 return self.__start_cursor
2619 elif (index == 0 and
2620 self.__skipped_cursor):
2621 return Cursor(_cursor_pb=self.__skipped_cursor)
2622 elif index > 0 and self.__result_cursors:
2623 return Cursor(_cursor_pb=self.__result_cursors[index - 1])
2625 elif index == len(self.__results):
2626 return self.__end_cursor
2627 else:
2632 return self.__start_cursor.advance(index + self._skipped_results,
2633 self._batch_shared.query,
2634 self._batch_shared.conn)
2636 def next_batch_async(self, fetch_options=None):
2637 """Asynchronously get the next batch or None if there are no more batches.
2639 Args:
2640 fetch_options: Optional fetch options to use when fetching the next batch.
2641 Merged with both the fetch options on the original call and the
2642 connection.
2644 Returns:
2645 An async object that can be used to get the next Batch or None if either
2646 the next batch has already been fetched or there are no more results.
2648 if not self.__datastore_cursor:
2649 return None
2651 fetch_options, next_batch = self._make_next_batch(fetch_options)
2652 req = self._to_pb(fetch_options)
2654 config = self._batch_shared.query_options.merge(fetch_options)
2655 return next_batch._make_query_result_rpc_call('Next', config, req)
2657 def _to_pb(self, fetch_options=None):
2658 req = datastore_pb.NextRequest()
2660 if FetchOptions.produce_cursors(fetch_options,
2661 self._batch_shared.query_options,
2662 self._batch_shared.conn.config):
2663 req.set_compile(True)
2665 count = FetchOptions.batch_size(fetch_options,
2666 self._batch_shared.query_options,
2667 self._batch_shared.conn.config)
2668 if count is not None:
2669 req.set_count(count)
2671 if fetch_options is not None and fetch_options.offset:
2672 req.set_offset(fetch_options.offset)
2674 req.mutable_cursor().CopyFrom(self.__datastore_cursor)
2675 self.__datastore_cursor = None
2676 return req
2678 def _extend(self, next_batch):
2679 """Combines the current batch with the next one. Called by batcher."""
2680 self.__datastore_cursor = next_batch.__datastore_cursor
2681 next_batch.__datastore_cursor = None
2682 self.__more_results = next_batch.__more_results
2683 if not self.__results:
2684 self.__skipped_cursor = next_batch.__skipped_cursor
2685 self.__results.extend(next_batch.__results)
2686 self.__result_cursors.extend(next_batch.__result_cursors)
2687 self.__end_cursor = next_batch.__end_cursor
2688 self._skipped_results += next_batch._skipped_results
2690 def _make_query_result_rpc_call(self, name, config, req):
2691 """Makes either a RunQuery or Next call that will modify the instance.
2693 Args:
2694 name: A string, the name of the call to invoke.
2695 config: The datastore_rpc.Configuration to use for the call.
2696 req: The request to send with the call.
2698 Returns:
2699 A UserRPC object that can be used to fetch the result of the RPC.
2701 return self._batch_shared.conn._make_rpc_call(config, name, req,
2702 datastore_pb.QueryResult(),
2703 self.__query_result_hook)
2705 _need_index_header = 'The suggested index for this query is:'
2707 def __query_result_hook(self, rpc):
2708 """Internal method used as get_result_hook for RunQuery/Next operation."""
2709 try:
2710 self._batch_shared.conn.check_rpc_success(rpc)
2711 except datastore_errors.NeedIndexError, exc:
2713 if isinstance(rpc.request, datastore_pb.Query):
2714 _, kind, ancestor, props = datastore_index.CompositeIndexForQuery(
2715 rpc.request)
2717 props = datastore_index.GetRecommendedIndexProperties(props)
2718 yaml = datastore_index.IndexYamlForQuery(kind, ancestor, props)
2719 xml = datastore_index.IndexXmlForQuery(kind, ancestor, props)
2722 raise datastore_errors.NeedIndexError(
2723 '\n'.join([str(exc), self._need_index_header, yaml]),
2724 original_message=str(exc), header=self._need_index_header,
2725 yaml_index=yaml, xml_index=xml)
2726 raise
2727 query_result = rpc.response
2729 self._batch_shared.process_query_result_if_first(query_result)
2731 if query_result.has_skipped_results_compiled_cursor():
2732 self.__skipped_cursor = query_result.skipped_results_compiled_cursor()
2734 self.__result_cursors = list(query_result.result_compiled_cursor_list())
2735 self.__end_cursor = Cursor._from_query_result(query_result)
2736 self._skipped_results = query_result.skipped_results()
2738 if query_result.more_results():
2739 self.__datastore_cursor = query_result.cursor()
2740 self.__more_results = True
2741 else:
2742 self._end()
2744 self.__results = self._process_results(query_result.result_list())
2745 return self
2747 def _end(self):
2748 """Changes the internal state so that no more batches can be produced."""
2749 self.__datastore_cursor = None
2750 self.__more_results = False
2752 def _make_next_batch(self, fetch_options):
2753 """Creates the object to store the next batch.
2755 Args:
2756 fetch_options: The datastore_query.FetchOptions passed in by the user or
2757 None.
2759 Returns:
2760 A tuple containing the fetch options that should be used internally and
2761 the object that should be used to contain the next batch.
2763 return fetch_options, Batch(self._batch_shared,
2764 start_cursor=self.__end_cursor)
2766 def _process_results(self, results):
2767 """Converts the datastore results into results returned to the user.
2769 Args:
2770 results: A list of entity_pb.EntityProto's returned by the datastore
2772 Returns:
2773 A list of results that should be returned to the user.
2775 pb_to_query_result = self._batch_shared.conn.adapter.pb_to_query_result
2776 return [pb_to_query_result(result, self._batch_shared.query_options)
2777 for result in results]
2779 def __getstate__(self):
2780 raise pickle.PicklingError(
2781 'Pickling of datastore_query.Batch is unsupported.')
2784 class _AugmentedBatch(Batch):
2785 """A batch produced by a datastore_query._AugmentedQuery."""
2787 @classmethod
2788 @datastore_rpc._positional(5)
2789 def create_async(cls, augmented_query, query_options, conn, req,
2790 in_memory_offset, in_memory_limit, start_cursor):
2791 batch_shared = _BatchShared(augmented_query._query,
2792 query_options,
2793 conn,
2794 augmented_query)
2795 batch0 = cls(batch_shared,
2796 in_memory_offset=in_memory_offset,
2797 in_memory_limit=in_memory_limit,
2798 start_cursor=start_cursor)
2799 return batch0._make_query_result_rpc_call('RunQuery', query_options, req)
2801 @datastore_rpc._positional(2)
2802 def __init__(self, batch_shared,
2803 in_memory_offset=None,
2804 in_memory_limit=None,
2805 next_index=0,
2806 start_cursor=Cursor()):
2807 """A Constructor for datastore_query._AugmentedBatch.
2809 Constructed by datastore_query._AugmentedQuery. Should not be called
2810 directly.
2812 super(_AugmentedBatch, self).__init__(batch_shared,
2813 start_cursor=start_cursor)
2814 self.__in_memory_offset = in_memory_offset
2815 self.__in_memory_limit = in_memory_limit
2816 self.__next_index = next_index
2818 @property
2819 def query(self):
2820 """The query the current batch came from."""
2821 return self._batch_shared.augmented_query
2823 def cursor(self, index):
2824 raise NotImplementedError
2826 def _extend(self, next_batch):
2827 super(_AugmentedBatch, self)._extend(next_batch)
2828 self.__in_memory_limit = next_batch.__in_memory_limit
2829 self.__in_memory_offset = next_batch.__in_memory_offset
2830 self.__next_index = next_batch.__next_index
2832 def _process_results(self, results):
2834 in_memory_filter = self._batch_shared.augmented_query._in_memory_filter
2835 if in_memory_filter:
2836 results = filter(in_memory_filter, results)
2839 in_memory_results = self._batch_shared.augmented_query._in_memory_results
2840 if in_memory_results and self.__next_index < len(in_memory_results):
2842 original_query = super(_AugmentedBatch, self).query
2843 if original_query._order:
2845 if results:
2846 next_result = in_memory_results[self.__next_index]
2847 next_key = original_query._order.key(next_result)
2848 i = 0
2849 while i < len(results):
2850 result = results[i]
2851 result_key = original_query._order.key(result)
2852 while next_key <= result_key:
2853 results.insert(i, next_result)
2854 i += 1
2855 self.__next_index += 1
2856 if self.__next_index >= len(in_memory_results):
2857 break
2858 next_result = in_memory_results[self.__next_index]
2859 next_key = original_query._order.key(next_result)
2860 i += 1
2861 elif results or not super(_AugmentedBatch, self).more_results:
2863 results = in_memory_results + results
2864 self.__next_index = len(in_memory_results)
2867 if self.__in_memory_offset:
2868 assert not self._skipped_results
2869 offset = min(self.__in_memory_offset, len(results))
2870 if offset:
2871 self._skipped_results += offset
2872 self.__in_memory_offset -= offset
2873 results = results[offset:]
2875 if self.__in_memory_limit is not None:
2876 results = results[:self.__in_memory_limit]
2877 self.__in_memory_limit -= len(results)
2878 if self.__in_memory_limit <= 0:
2879 self._end()
2881 return super(_AugmentedBatch, self)._process_results(results)
2883 def _make_next_batch(self, fetch_options):
2884 in_memory_offset = FetchOptions.offset(fetch_options)
2885 augmented_query = self._batch_shared.augmented_query
2886 if in_memory_offset and (augmented_query._in_memory_filter or
2887 augmented_query._in_memory_results):
2888 fetch_options = FetchOptions(offset=0)
2889 else:
2890 in_memory_offset = None
2891 return (fetch_options,
2892 _AugmentedBatch(self._batch_shared,
2893 in_memory_offset=in_memory_offset,
2894 in_memory_limit=self.__in_memory_limit,
2895 start_cursor=self.end_cursor,
2896 next_index=self.__next_index))
2899 class Batcher(object):
2900 """A class that implements the Iterator interface for Batches.
2902 Typically constructed by a call to Query.run().
2904 The class hides the "best effort" nature of the datastore by potentially
2905 making multiple requests to the datastore and merging the resulting batches.
2906 This is accomplished efficiently by prefetching results and mixing both
2907 non-blocking and blocking calls to the datastore as needed.
2909 Iterating through batches is almost always more efficient than pulling all
2910 results at once as RPC latency is hidden by asynchronously prefetching
2911 results.
2913 The batches produce by this class cannot be used to fetch the next batch
2914 (through Batch.next_batch()) as before the current batch is returned the
2915 request for the next batch has already been sent.
2919 ASYNC_ONLY = None
2920 AT_LEAST_OFFSET = 0
2921 AT_LEAST_ONE = object()
2923 def __init__(self, query_options, first_async_batch):
2924 """Constructor.
2926 Although this class can be manually constructed, it is preferable to use
2927 Query.run(query_options).
2929 Args:
2930 query_options: The QueryOptions used to create the first batch.
2931 first_async_batch: The first batch produced by
2932 Query.run_async(query_options).
2934 self.__next_batch = first_async_batch
2935 self.__initial_offset = QueryOptions.offset(query_options) or 0
2936 self.__skipped_results = 0
2938 def next(self):
2939 """Get the next batch. See .next_batch()."""
2940 return self.next_batch(self.AT_LEAST_ONE)
2942 def next_batch(self, min_batch_size):
2943 """Get the next batch.
2945 The batch returned by this function cannot be used to fetch the next batch
2946 (through Batch.next_batch()). Instead this function will always return None.
2947 To retrieve the next batch use .next() or .next_batch(N).
2949 This function may return a batch larger than min_to_fetch, but will never
2950 return smaller unless there are no more results.
2952 Special values can be used for min_batch_size:
2953 ASYNC_ONLY - Do not perform any synchrounous fetches from the datastore
2954 even if the this produces a batch with no results.
2955 AT_LEAST_OFFSET - Only pull enough results to satifiy the offset.
2956 AT_LEAST_ONE - Pull batches until at least one result is returned.
2958 Args:
2959 min_batch_size: The minimum number of results to retrieve or one of
2960 (ASYNC_ONLY, AT_LEAST_OFFSET, AT_LEAST_ONE)
2962 Returns:
2963 The next Batch of results.
2965 if min_batch_size in (Batcher.ASYNC_ONLY, Batcher.AT_LEAST_OFFSET,
2966 Batcher.AT_LEAST_ONE):
2967 exact = False
2968 else:
2969 exact = True
2970 datastore_types.ValidateInteger(min_batch_size,
2971 'min_batch_size',
2972 datastore_errors.BadArgumentError)
2973 if not self.__next_batch:
2974 raise StopIteration
2977 batch = self.__next_batch.get_result()
2978 self.__next_batch = None
2979 self.__skipped_results += batch.skipped_results
2981 if min_batch_size is not Batcher.ASYNC_ONLY:
2982 if min_batch_size is Batcher.AT_LEAST_ONE:
2983 min_batch_size = 1
2985 needed_results = min_batch_size - len(batch.results)
2986 while (batch.more_results and
2987 (self.__skipped_results < self.__initial_offset or
2988 needed_results > 0)):
2989 if batch.query_options.batch_size:
2991 batch_size = max(batch.query_options.batch_size, needed_results)
2992 elif exact:
2993 batch_size = needed_results
2994 else:
2995 batch_size = None
2997 self.__next_batch = batch.next_batch_async(FetchOptions(
2998 offset=max(0, self.__initial_offset - self.__skipped_results),
2999 batch_size=batch_size))
3000 next_batch = self.__next_batch.get_result()
3001 self.__next_batch = None
3002 self.__skipped_results += next_batch.skipped_results
3003 needed_results = max(0, needed_results - len(next_batch.results))
3004 batch._extend(next_batch)
3011 self.__next_batch = batch.next_batch_async()
3012 return batch
3014 def __getstate__(self):
3015 raise pickle.PicklingError(
3016 'Pickling of datastore_query.Batcher is unsupported.')
3018 def __iter__(self):
3019 return self
3022 class ResultsIterator(object):
3023 """An iterator over the results from Batches obtained from a Batcher.
3025 ResultsIterator implements Python's iterator protocol, so results can be
3026 accessed with the for-statement:
3028 > it = ResultsIterator(Query(kind='Person').run())
3029 > for person in it:
3030 > print 'Hi, %s!' % person['name']
3032 At any time ResultsIterator.cursor() can be used to grab the Cursor that
3033 points just after the last result returned by the iterator.
3036 __current_batch = None
3037 __current_pos = 0
3038 __last_cursor = None
3040 def __init__(self, batcher):
3041 """Constructor.
3043 Args:
3044 batcher: A datastore_query.Batcher
3046 if not isinstance(batcher, Batcher):
3047 raise datastore_errors.BadArgumentError(
3048 'batcher argument should be datastore_query.Batcher (%r)' %
3049 (batcher,))
3050 self.__batcher = batcher
3052 def index_list(self):
3053 """Returns the list of indexes used to perform the query.
3054 Possibly None when the adapter does not implement pb_to_index.
3056 return self._ensure_current_batch().index_list
3058 def cursor(self):
3059 """Returns a cursor that points just after the last result returned.
3061 If next() throws an exception, this function returns the end_cursor from
3062 the last successful batch or throws the same exception if no batch was
3063 successful.
3065 return (self.__last_cursor or
3066 self._ensure_current_batch().cursor(self.__current_pos))
3068 def _ensure_current_batch(self):
3069 if not self.__current_batch:
3070 self.__current_batch = self.__batcher.next_batch(Batcher.AT_LEAST_OFFSET)
3071 self.__current_pos = 0
3072 return self.__current_batch
3074 def _compiled_query(self):
3075 """Returns the compiled query associated with the iterator.
3077 Internal only do not use.
3079 return self._ensure_current_batch()._compiled_query()
3082 def next(self):
3083 """Returns the next query result."""
3084 while (not self.__current_batch or
3085 self.__current_pos >= len(self.__current_batch.results)):
3087 try:
3091 next_batch = self.__batcher.next_batch(Batcher.AT_LEAST_OFFSET)
3092 except:
3094 if self.__current_batch:
3095 self.__last_cursor = self.__current_batch.end_cursor
3096 raise
3097 self.__current_pos = 0
3098 self.__current_batch = next_batch
3100 result = self.__current_batch.results[self.__current_pos]
3101 self.__current_pos += 1
3102 return result
3104 def __iter__(self):
3105 return self