3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """A thin wrapper around datastore query RPC calls.
23 This provides wrappers around the internal only datastore_pb library and is
24 designed to be the lowest-level API to be used by all Python datastore client
25 libraries for executing queries. It provides a layer of protection so the actual
26 RPC syntax can change without affecting client libraries.
28 Any class, function, field or argument starting with an '_' is for INTERNAL use
29 only and should not be used by developers!
61 from google
.appengine
.datastore
import entity_pb
63 from google
.appengine
.api
import datastore_errors
64 from google
.appengine
.api
import datastore_types
65 from google
.appengine
.api
.search
import geo_util
66 from google
.appengine
.datastore
import datastore_index
67 from google
.appengine
.datastore
import datastore_pb
68 from google
.appengine
.datastore
import datastore_rpc
71 class _BaseComponent(object):
72 """A base class for query components.
74 Currently just implements basic == and != functions.
77 def __eq__(self
, other
):
78 if self
.__class
__ is not other
.__class
__:
80 return self
is other
or self
.__dict
__ == other
.__dict
__
82 def __ne__(self
, other
):
83 equal
= self
.__eq
__(other
)
84 if equal
is NotImplemented:
89 def make_filter(name
, op
, values
):
90 """Constructs a FilterPredicate from the given name, op and values.
93 name: A non-empty string, the name of the property to filter.
94 op: One of PropertyFilter._OPERATORS.keys(), the operator to use.
95 values: A supported value, the value to compare against.
98 if values is a list, a CompositeFilter that uses AND to combine all
99 values, otherwise a PropertyFilter for the single value.
102 datastore_errors.BadPropertyError: if the property name is invalid.
103 datastore_errors.BadValueError: if the property did not validate correctly
104 or the value was an empty list.
105 Other exception types (like OverflowError): if the property value does not
106 meet type-specific criteria.
108 datastore_types
.ValidateProperty(name
, values
)
109 properties
= datastore_types
.ToPropertyPb(name
, values
)
110 if isinstance(properties
, list):
111 filters
= [PropertyFilter(op
, prop
) for prop
in properties
]
112 return CompositeFilter(CompositeFilter
.AND
, filters
)
114 return PropertyFilter(op
, properties
)
117 def _make_key_value_map(entity
, property_names
):
118 """Extracts key values from the given entity.
121 entity: The entity_pb.EntityProto to extract values from.
122 property_names: The names of the properties from which to extract values.
125 A dict mapping property names to a lists of key values.
127 value_map
= dict((name
, []) for name
in property_names
)
130 for prop
in entity
.property_list():
131 if prop
.name() in value_map
:
132 value_map
[prop
.name()].append(
133 datastore_types
.PropertyValueToKeyValue(prop
.value()))
136 if datastore_types
.KEY_SPECIAL_PROPERTY
in value_map
:
137 value_map
[datastore_types
.KEY_SPECIAL_PROPERTY
] = [
138 datastore_types
.ReferenceToKeyValue(entity
.key())]
143 class _PropertyComponent(_BaseComponent
):
144 """A component that operates on a specific set of properties."""
146 def _get_prop_names(self
):
147 """Returns a set of property names used by the filter."""
148 raise NotImplementedError
151 class FilterPredicate(_PropertyComponent
):
152 """An abstract base class for all query filters.
154 All sub-classes must be immutable as these are often stored without creating a
158 def __call__(self
, entity
):
159 """Applies the filter predicate to the given entity.
162 entity: the datastore_pb.EntityProto to test.
165 True if the given entity matches the filter, False otherwise.
167 return self
._apply
(_make_key_value_map(entity
, self
._get
_prop
_names
()))
169 def _apply(self
, key_value_map
):
170 """Apply the given component to the comparable value map.
172 A filter matches a list of values if at least one value in the list
173 matches the filter, for example:
174 'prop: [1, 2]' matches both 'prop = 1' and 'prop = 2' but not 'prop = 3'
176 Note: the values are actually represented as tuples whose first item
177 encodes the type; see datastore_types.PropertyValueToKeyValue().
180 key_value_map: A dict mapping property names to a list of
184 A boolean indicating if the given map matches the filter.
186 raise NotImplementedError
188 def _prune(self
, key_value_map
):
189 """Removes values from the given map that do not match the filter.
191 When doing a scan in the datastore, only index values that match the filters
192 are seen. When multiple values that point to the same entity are seen, the
193 entity only appears where the first value is found. This function removes
194 all values that don't match the query so that the first value in the map
195 is the same one the datastore would see first.
198 key_value_map: the comparable value map from which to remove
199 values. Does not need to contain values for all filtered properties.
202 A value that evaluates to False if every value in a single list was
203 completely removed. This effectively applies the filter but is less
204 efficient than _apply().
206 raise NotImplementedError
209 """Internal only function to generate a pb."""
210 raise NotImplementedError(
211 'This filter only supports in memory operations (%r)' % self
)
214 """Internal only function to generate a list of pbs."""
215 return [self
._to
_pb
()]
218 class _SinglePropertyFilter(FilterPredicate
):
219 """Base class for a filter that operates on a single property."""
221 def _get_prop_name(self
):
222 """Returns the name of the property being filtered."""
223 raise NotImplementedError
225 def _apply_to_value(self
, value
):
226 """Apply the filter to the given value.
229 value: The comparable value to check.
232 A boolean indicating if the given value matches the filter.
234 raise NotImplementedError
236 def _get_prop_names(self
):
237 return set([self
._get
_prop
_name
()])
239 def _apply(self
, value_map
):
240 for other_value
in value_map
[self
._get
_prop
_name
()]:
241 if self
._apply
_to
_value
(other_value
):
245 def _prune(self
, value_map
):
250 if self
._get
_prop
_name
() not in value_map
:
252 values
= [value
for value
in value_map
[self
._get
_prop
_name
()]
253 if self
._apply
_to
_value
(value
)]
254 value_map
[self
._get
_prop
_name
()] = values
258 class PropertyFilter(_SinglePropertyFilter
):
259 """An immutable filter predicate that constrains a single property."""
262 '<': datastore_pb
.Query_Filter
.LESS_THAN
,
263 '<=': datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
,
264 '>': datastore_pb
.Query_Filter
.GREATER_THAN
,
265 '>=': datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
,
266 '=': datastore_pb
.Query_Filter
.EQUAL
,
269 _OPERATORS_INVERSE
= dict((value
, key
)
270 for key
, value
in _OPERATORS
.iteritems())
272 _OPERATORS_TO_PYTHON_OPERATOR
= {
273 datastore_pb
.Query_Filter
.LESS_THAN
: '<',
274 datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
: '<=',
275 datastore_pb
.Query_Filter
.GREATER_THAN
: '>',
276 datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
: '>=',
277 datastore_pb
.Query_Filter
.EQUAL
: '==',
280 _INEQUALITY_OPERATORS
= frozenset(['<', '<=', '>', '>='])
282 _INEQUALITY_OPERATORS_ENUM
= frozenset([
283 datastore_pb
.Query_Filter
.LESS_THAN
,
284 datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
,
285 datastore_pb
.Query_Filter
.GREATER_THAN
,
286 datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
,
289 _UPPERBOUND_INEQUALITY_OPERATORS
= frozenset(['<', '<='])
291 def __init__(self
, op
, value
):
295 op: A string representing the operator to use.
296 value: A entity_pb.Property, the property and value to compare against.
299 datastore_errors.BadArgumentError if op has an unsupported value or value
300 is not an entity_pb.Property.
302 if op
not in self
._OPERATORS
:
303 raise datastore_errors
.BadArgumentError('unknown operator: %r' % (op
,))
304 if not isinstance(value
, entity_pb
.Property
):
305 raise datastore_errors
.BadArgumentError(
306 'value argument should be entity_pb.Property (%r)' % (value
,))
308 super(PropertyFilter
, self
).__init
__()
309 self
._filter
= datastore_pb
.Query_Filter()
310 self
._filter
.set_op(self
._OPERATORS
[op
])
311 self
._filter
.add_property().CopyFrom(value
)
315 raw_op
= self
._filter
.op()
316 return self
._OPERATORS
_INVERSE
.get(raw_op
, str(raw_op
))
321 return self
._filter
.property(0)
326 value
= datastore_types
.FromPropertyPb(prop
)
327 return '%s(%r, <%r, %r>)' % (self
.__class
__.__name
__, self
.op
, name
, value
)
329 def _get_prop_name(self
):
330 return self
._filter
.property(0).name()
332 def _apply_to_value(self
, value
):
333 if not hasattr(self
, '_cmp_value'):
334 if self
._filter
.op() == datastore_pb
.Query_Filter
.EXISTS
:
337 self
._cmp
_value
= datastore_types
.PropertyValueToKeyValue(
338 self
._filter
.property(0).value())
339 self
._condition
= ('value %s self._cmp_value' %
340 self
._OPERATORS
_TO
_PYTHON
_OPERATOR
[self
._filter
.op()])
341 return eval(self
._condition
)
343 def _has_inequality(self
):
344 """Returns True if the filter predicate contains inequalities filters."""
345 return self
._filter
.op() in self
._INEQUALITY
_OPERATORS
_ENUM
348 def _from_pb(cls
, filter_pb
):
350 self
= cls
.__new
__(cls
)
351 self
._filter
= filter_pb
355 """Returns the internal only pb representation."""
358 def __getstate__(self
):
359 raise pickle
.PicklingError(
360 'Pickling of datastore_query.PropertyFilter is unsupported.')
362 def __eq__(self
, other
):
365 if self
.__class
__ is not other
.__class
__:
366 if other
.__class
__ is _PropertyRangeFilter
:
367 return [self
._filter
] == other
._to
_pbs
()
368 return NotImplemented
369 return self
._filter
== other
._filter
372 class _PropertyRangeFilter(_SinglePropertyFilter
):
373 """A filter predicate that represents a range of values.
375 Since we allow multi-valued properties there is a large difference between
376 "x > 0 AND x < 1" and "0 < x < 1." An entity with x = [-1, 2] will match the
377 first but not the second.
379 Since the datastore only allows a single inequality filter, multiple
380 in-equality filters are merged into a single range filter in the
381 datastore (unlike equality filters). This class is used by
382 datastore_query.CompositeFilter to implement the same logic.
385 _start_key_value
= None
386 _end_key_value
= None
388 @datastore_rpc._positional
(1)
389 def __init__(self
, start
=None, start_incl
=True, end
=None, end_incl
=True):
390 """Constructs a range filter using start and end properties.
393 start: A entity_pb.Property to use as a lower bound or None to indicate
395 start_incl: A boolean that indicates if the lower bound is inclusive.
396 end: A entity_pb.Property to use as an upper bound or None to indicate
398 end_incl: A boolean that indicates if the upper bound is inclusive.
400 if start
is not None and not isinstance(start
, entity_pb
.Property
):
401 raise datastore_errors
.BadArgumentError(
402 'start argument should be entity_pb.Property (%r)' % (start
,))
403 if end
is not None and not isinstance(end
, entity_pb
.Property
):
404 raise datastore_errors
.BadArgumentError(
405 'start argument should be entity_pb.Property (%r)' % (end
,))
406 if start
and end
and start
.name() != end
.name():
407 raise datastore_errors
.BadArgumentError(
408 'start and end arguments must be on the same property (%s != %s)' %
409 (start
.name(), end
.name()))
410 if not start
and not end
:
411 raise datastore_errors
.BadArgumentError(
412 'Unbounded ranges are not supported.')
414 super(_PropertyRangeFilter
, self
).__init
__()
416 self
._start
_incl
= start_incl
418 self
._end
_incl
= end_incl
421 def from_property_filter(cls
, prop_filter
):
422 op
= prop_filter
._filter
.op()
423 if op
== datastore_pb
.Query_Filter
.GREATER_THAN
:
424 return cls(start
=prop_filter
._filter
.property(0), start_incl
=False)
425 elif op
== datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
:
426 return cls(start
=prop_filter
._filter
.property(0))
427 elif op
== datastore_pb
.Query_Filter
.LESS_THAN
:
428 return cls(end
=prop_filter
._filter
.property(0), end_incl
=False)
429 elif op
== datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
:
430 return cls(end
=prop_filter
._filter
.property(0))
432 raise datastore_errors
.BadArgumentError(
433 'Unsupported operator (%s)' % (op
,))
435 def intersect(self
, other
):
436 """Returns a filter representing the intersection of self and other."""
437 if isinstance(other
, PropertyFilter
):
438 other
= self
.from_property_filter(other
)
439 elif not isinstance(other
, _PropertyRangeFilter
):
440 raise datastore_errors
.BadArgumentError(
441 'other argument should be a _PropertyRangeFilter (%r)' % (other
,))
443 if other
._get
_prop
_name
() != self
._get
_prop
_name
():
444 raise datastore_errors
.BadArgumentError(
445 'other argument must be on the same property (%s != %s)' %
446 (other
._get
_prop
_name
(), self
._get
_prop
_name
()))
451 result
= cmp(self
._get
_start
_key
_value
(), other
._get
_start
_key
_value
())
453 result
= cmp(other
._start
_incl
, self
._start
_incl
)
466 result
= cmp(self
._get
_end
_key
_value
(), other
._get
_end
_key
_value
())
468 result
= cmp(self
._end
_incl
, other
._end
_incl
)
479 if end_source
in (start_source
, None):
482 result
= _PropertyRangeFilter(start
=start_source
._start
,
483 start_incl
=start_source
._start
_incl
,
485 end_incl
=end_source
._end
_incl
)
487 result
._start
_key
_value
= start_source
._start
_key
_value
488 result
._end
_key
_value
= end_source
._end
_key
_value
491 return end_source
or self
493 def _get_start_key_value(self
):
494 if self
._start
_key
_value
is None:
495 self
._start
_key
_value
= datastore_types
.PropertyValueToKeyValue(
497 return self
._start
_key
_value
499 def _get_end_key_value(self
):
500 if self
._end
_key
_value
is None:
501 self
._end
_key
_value
= datastore_types
.PropertyValueToKeyValue(
503 return self
._end
_key
_value
505 def _apply_to_value(self
, value
):
506 """Apply the filter to the given value.
509 value: The comparable value to check.
512 A boolean indicating if the given value matches the filter.
515 result
= cmp(self
._get
_start
_key
_value
(), value
)
516 if result
> 0 or (result
== 0 and not self
._start
_incl
):
520 result
= cmp(self
._get
_end
_key
_value
(), value
)
521 if result
< 0 or (result
== 0 and not self
._end
_incl
):
526 def _get_prop_name(self
):
528 return self
._start
.name()
530 return self
._end
.name()
537 op
= datastore_pb
.Query_Filter
.GREATER_THAN_OR_EQUAL
539 op
= datastore_pb
.Query_Filter
.GREATER_THAN
540 pb
= datastore_pb
.Query_Filter()
542 pb
.add_property().CopyFrom(self
._start
)
547 op
= datastore_pb
.Query_Filter
.LESS_THAN_OR_EQUAL
549 op
= datastore_pb
.Query_Filter
.LESS_THAN
550 pb
= datastore_pb
.Query_Filter()
552 pb
.add_property().CopyFrom(self
._end
)
557 def __getstate__(self
):
558 raise pickle
.PicklingError(
559 'Pickling of %r is unsupported.' % self
)
561 def __eq__(self
, other
):
564 if self
.__class
__ is not other
.__class
__:
565 return NotImplemented
566 return (self
._start
== other
._start
and
567 self
._end
== other
._end
and
568 (self
._start
_incl
== other
._start
_incl
or self
._start
is None) and
569 (self
._end
_incl
== other
._end
_incl
or self
._end
is None))
572 class _PropertyExistsFilter(FilterPredicate
):
573 """A FilterPredicate that matches entities containing specific properties.
575 Only works as an in-memory filter. Used internally to filter out entities
576 that don't have all properties in a given Order.
579 def __init__(self
, names
):
580 super(_PropertyExistsFilter
, self
).__init
__()
581 self
._names
= frozenset(names
)
583 def _apply(self
, value_map
):
584 for name
in self
._names
:
585 if not value_map
.get(name
):
589 def _get_prop_names(self
):
594 raise NotImplementedError
596 def __getstate__(self
):
597 raise pickle
.PicklingError(
598 'Pickling of %r is unsupported.' % self
)
601 class CorrelationFilter(FilterPredicate
):
602 """A filter that isolates correlated values and applies a sub-filter on them.
604 This filter assumes that every property used by the sub-filter should be
605 grouped before being passed to the sub-filter. The default grouping puts
606 each value in its own group. Consider:
607 e = {a: [1, 2], b: [2, 1, 3], c: 4}
609 A correlation filter with a sub-filter that operates on (a, b) will be tested
610 against the following 3 sets of values:
615 In this case CorrelationFilter('a = 2 AND b = 2') won't match this entity but
616 CorrelationFilter('a = 2 AND b = 1') will. To apply an uncorrelated filter on
617 c, the filter must be applied in parallel to the correlation filter. For
619 CompositeFilter(AND, [CorrelationFilter('a = 2 AND b = 1'), 'c = 3'])
621 If 'c = 3' was included in the correlation filter, c would be grouped as well.
622 This would result in the following values:
627 If any set of correlated values match the sub-filter then the entity matches
628 the correlation filter.
631 def __init__(self
, subfilter
):
635 subfilter: A FilterPredicate to apply to the correlated values
637 self
._subfilter
= subfilter
641 return self
._subfilter
644 return '%s(%r)' % (self
.__class
__.__name
__, self
.subfilter
)
646 def _apply(self
, value_map
):
649 base_map
= dict((prop
, []) for prop
in self
._get
_prop
_names
())
653 for prop
in base_map
:
655 grouped
= self
._group
_values
(prop
, value_map
[prop
])
657 while len(value_maps
) < len(grouped
):
658 value_maps
.append(base_map
.copy())
660 for value
, map in zip(grouped
, value_maps
):
663 return self
._apply
_correlated
(value_maps
)
665 def _apply_correlated(self
, value_maps
):
666 """Applies sub-filter to the correlated value maps.
668 The default implementation matches when any value_map in value_maps
669 matches the sub-filter.
672 value_maps: A list of correlated value_maps.
674 True if any the entity matches the correlation filter.
677 for map in value_maps
:
678 if self
._subfilter
._apply
(map):
682 def _group_values(self
, prop
, values
):
683 """A function that groups the given values.
685 Override this function to introduce custom grouping logic. The default
686 implementation assumes each value belongs in its own group.
689 prop: The name of the property who's values are being grouped.
690 values: A list of opaque values.
693 A list of lists of grouped values.
695 return [[value
] for value
in values
]
697 def _get_prop_names(self
):
698 return self
._subfilter
._get
_prop
_names
()
701 class CompositeFilter(FilterPredicate
):
702 """An immutable filter predicate that combines other predicates.
704 This class proactively merges sub-filters that are combined using the same
705 operator. For example:
706 CompositeFilter(AND, [f1, f2, CompositeFilter(AND, [f3, f4]), f5, f6])
708 CompositeFilter(AND, [f1, f2, f3, f4, f5, f6])
710 Currently filters can only be combined using an AND operator.
714 _OPERATORS
= frozenset([AND
])
716 def __init__(self
, op
, filters
):
720 op: The operator to use to combine the given filters
721 filters: A list of one or more filters to combine
724 datastore_errors.BadArgumentError if op is not in CompsiteFilter.OPERATORS
725 or filters is not a non-empty list containing only FilterPredicates.
727 if not op
in self
._OPERATORS
:
728 raise datastore_errors
.BadArgumentError('unknown operator (%s)' % (op
,))
729 if not filters
or not isinstance(filters
, (list, tuple)):
730 raise datastore_errors
.BadArgumentError(
731 'filters argument should be a non-empty list (%r)' % (filters
,))
733 super(CompositeFilter
, self
).__init
__()
739 if isinstance(f
, CompositeFilter
) and f
._op
== self
._op
:
742 flattened
.extend(f
._filters
)
743 elif isinstance(f
, FilterPredicate
):
746 raise datastore_errors
.BadArgumentError(
747 'filters argument must be a list of FilterPredicates, found (%r)' %
757 if (isinstance(f
, _PropertyRangeFilter
) or
758 (isinstance(f
, PropertyFilter
) and f
._has
_inequality
())):
759 name
= f
._get
_prop
_name
()
760 index
= ineq_map
.get(name
)
761 if index
is not None:
762 range_filter
= flattened
[index
]
763 flattened
[index
] = range_filter
.intersect(f
)
765 if isinstance(f
, PropertyFilter
):
766 range_filter
= _PropertyRangeFilter
.from_property_filter(f
)
769 ineq_map
[name
] = len(flattened
)
770 flattened
.append(range_filter
)
774 self
._filters
= tuple(flattened
)
790 return '%s(%s, %r)' % (self
.__class
__.__name
__, op
, list(self
.filters
))
792 def _get_prop_names(self
):
794 for f
in self
._filters
:
795 names |
= f
._get
_prop
_names
()
798 def _apply(self
, value_map
):
799 if self
._op
== self
.AND
:
800 for f
in self
._filters
:
801 if not f
._apply
(value_map
):
804 raise NotImplementedError
806 def _prune(self
, value_map
):
810 if self
._op
== self
.AND
:
822 matches
= collections
.defaultdict(set)
823 for f
in self
._filters
:
824 props
= f
._get
_prop
_names
()
825 local_value_map
= dict((k
, v
) for k
, v
in value_map
.iteritems()
828 if not f
._prune
(local_value_map
):
832 for (prop
, values
) in local_value_map
.iteritems():
833 matches
[prop
].update(values
)
836 for prop
, value_set
in matches
.iteritems():
838 value_map
[prop
] = sorted(value_set
)
840 raise NotImplementedError
843 """Returns the internal only pb representation."""
848 for f
in self
._filters
:
849 pbs
.extend(f
._to
_pbs
())
852 def __eq__(self
, other
):
853 if self
.__class
__ is other
.__class
__:
854 return super(CompositeFilter
, self
).__eq
__(other
)
857 if len(self
._filters
) == 1:
858 result
= self
._filters
[0].__eq
__(other
)
859 if result
is NotImplemented and hasattr(other
, '__eq__'):
860 return other
.__eq
__(self
._filters
[0])
862 return NotImplemented
865 class _IgnoreFilter(_SinglePropertyFilter
):
866 """A filter that removes all entities with the given keys."""
868 def __init__(self
, key_value_set
):
869 super(_IgnoreFilter
, self
).__init
__()
870 self
._keys
= key_value_set
872 def _get_prop_name(self
):
873 return datastore_types
.KEY_SPECIAL_PROPERTY
875 def _apply_to_value(self
, value
):
876 return value
not in self
._keys
879 class _DedupingFilter(_IgnoreFilter
):
880 """A filter that removes duplicate keys."""
882 def __init__(self
, key_value_set
=None):
883 super(_DedupingFilter
, self
).__init
__(key_value_set
or set())
885 def _apply_to_value(self
, value
):
886 if super(_DedupingFilter
, self
)._apply
_to
_value
(value
):
887 self
._keys
.add(value
)
892 class _BoundingCircleFilter(_SinglePropertyFilter
):
893 """An immutable bounding circle filter for geo locations.
895 An immutable filter predicate that constrains a geo location property to a
896 bounding circle region. The filter is inclusive at the border. The property
897 has to be of type V3 PointValue. V4 GeoPoints converts to this type.
905 def __init__(self
, property_name
, latitude
, longitude
, radius_meters
):
906 self
._property
_name
= property_name
907 self
._lat
_lng
= geo_util
.LatLng(latitude
, longitude
)
908 self
._radius
_meters
= radius_meters
910 if not radius_meters
>= 0:
911 raise datastore_errors
.BadArgumentError(
912 'invalid radius: %r' % radius_meters
)
915 def _from_v4_pb(cls
, bounding_circle_v4_pb
):
916 return _BoundingCircleFilter(bounding_circle_v4_pb
.property().name(),
917 bounding_circle_v4_pb
.center().latitude(),
918 bounding_circle_v4_pb
.center().longitude(),
919 bounding_circle_v4_pb
.radius_meters())
921 def _get_prop_name(self
):
922 return self
._property
_name
924 def _apply_to_value(self
, value
):
928 if value
[0] != entity_pb
.PropertyValue
.kPointValueGroup
:
931 _
, latitude
, longitude
= value
933 lat_lng
= geo_util
.LatLng(latitude
, longitude
)
936 return self
._lat
_lng
- lat_lng
<= self
._radius
_meters
939 class _BoundingBoxFilter(_SinglePropertyFilter
):
940 """An immutable bounding box filter for geo locations.
942 An immutable filter predicate that constrains a geo location property to a
943 bounding box region. The filter is inclusive at the border. The property
944 has to be of type V3 PointValue. V4 GeoPoints converts to this type.
949 def __init__(self
, property_name
, southwest
, northeast
):
950 """Initializes a _BoundingBoxFilter.
953 property_name: the name of the property to filter on.
954 southwest: The south-west corner of the bounding box. The type is
955 datastore_types.GeoPt.
956 northeast: The north-east corner of the bounding box. The type is
957 datastore_types.GeoPt.
960 datastore_errors.BadArgumentError if the south-west coordinate is on top
961 of the north-east coordinate.
965 if southwest
.lat
> northeast
.lat
:
966 raise datastore_errors
.BadArgumentError(
967 'the south-west coordinate is on top of the north-east coordinate')
969 self
._property
_name
= property_name
970 self
._southwest
= southwest
971 self
._northeast
= northeast
974 def _from_v4_pb(cls
, bounding_box_v4_pb
):
975 sw
= datastore_types
.GeoPt(bounding_box_v4_pb
.southwest().latitude(),
976 bounding_box_v4_pb
.southwest().longitude())
977 ne
= datastore_types
.GeoPt(bounding_box_v4_pb
.northeast().latitude(),
978 bounding_box_v4_pb
.northeast().longitude())
979 return _BoundingBoxFilter(bounding_box_v4_pb
.property().name(), sw
, ne
)
981 def _get_prop_name(self
):
982 return self
._property
_name
984 def _apply_to_value(self
, value
):
988 if value
[0] != entity_pb
.PropertyValue
.kPointValueGroup
:
991 _
, latitude
, longitude
= value
995 if not self
._southwest
.lat
<= latitude
<= self
._northeast
.lat
:
999 if self
._southwest
.lon
> self
._northeast
.lon
:
1000 return (longitude
<= self
._northeast
.lon
1001 or longitude
>= self
._southwest
.lon
)
1003 return self
._southwest
.lon
<= longitude
<= self
._northeast
.lon
1006 class Order(_PropertyComponent
):
1007 """A base class that represents a sort order on a query.
1009 All sub-classes must be immutable as these are often stored without creating a
1012 This class can be used as either the cmp or key arg in sorted() or
1013 list.sort(). To provide a stable ordering a trailing key ascending order is
1017 @datastore_rpc._positional
(1)
1018 def reversed(self
, group_by
=None):
1019 """Constructs an order representing the reverse of the current order.
1021 This function takes into account the effects of orders on properties not in
1022 the group_by clause of a query. For example, consider:
1023 SELECT A, First(B) ... GROUP BY A ORDER BY A, B
1024 Changing the order of B would effect which value is listed in the 'First(B)'
1025 column which would actually change the results instead of just reversing
1029 group_by: If specified, only orders on properties in group_by will be
1033 A new order representing the reverse direction.
1035 raise NotImplementedError
1037 def _key(self
, lhs_value_map
):
1038 """Creates a key for the given value map."""
1039 raise NotImplementedError
1041 def _cmp(self
, lhs_value_map
, rhs_value_map
):
1042 """Compares the given value maps."""
1043 raise NotImplementedError
1046 """Internal only function to generate a filter pb."""
1047 raise NotImplementedError
1049 def key_for_filter(self
, filter_predicate
):
1050 if filter_predicate
:
1051 return lambda x
: self
.key(x
, filter_predicate
)
1054 def cmp_for_filter(self
, filter_predicate
):
1055 if filter_predicate
:
1056 return lambda x
, y
: self
.cmp(x
, y
, filter_predicate
)
1059 def key(self
, entity
, filter_predicate
=None):
1060 """Constructs a "key" value for the given entity based on the current order.
1062 This function can be used as the key argument for list.sort() and sorted().
1065 entity: The entity_pb.EntityProto to convert
1066 filter_predicate: A FilterPredicate used to prune values before comparing
1070 A key value that identifies the position of the entity when sorted by
1073 names
= self
._get
_prop
_names
()
1074 names
.add(datastore_types
.KEY_SPECIAL_PROPERTY
)
1075 if filter_predicate
is not None:
1076 names |
= filter_predicate
._get
_prop
_names
()
1078 value_map
= _make_key_value_map(entity
, names
)
1079 if filter_predicate
is not None:
1080 filter_predicate
._prune
(value_map
)
1081 return (self
._key
(value_map
),
1082 value_map
[datastore_types
.KEY_SPECIAL_PROPERTY
])
1084 def cmp(self
, lhs
, rhs
, filter_predicate
=None):
1085 """Compares the given values taking into account any filters.
1087 This function can be used as the cmp argument for list.sort() and sorted().
1089 This function is slightly more efficient that Order.key when comparing two
1090 entities, however it is much less efficient when sorting a list of entities.
1093 lhs: An entity_pb.EntityProto
1094 rhs: An entity_pb.EntityProto
1095 filter_predicate: A FilterPredicate used to prune values before comparing
1099 An integer <, = or > 0 representing the operator that goes in between lhs
1100 and rhs that to create a true statement.
1103 names
= self
._get
_prop
_names
()
1104 if filter_predicate
is not None:
1105 names |
= filter_predicate
._get
_prop
_names
()
1107 lhs_value_map
= _make_key_value_map(lhs
, names
)
1108 rhs_value_map
= _make_key_value_map(rhs
, names
)
1109 if filter_predicate
is not None:
1110 filter_predicate
._prune
(lhs_value_map
)
1111 filter_predicate
._prune
(rhs_value_map
)
1112 result
= self
._cmp
(lhs_value_map
, rhs_value_map
)
1116 if not lhs
.has_key() and not rhs
.has_key():
1121 lhs_key
= (lhs_value_map
.get(datastore_types
.KEY_SPECIAL_PROPERTY
) or
1122 datastore_types
.ReferenceToKeyValue(lhs
.key()))
1123 rhs_key
= (rhs_value_map
.get(datastore_types
.KEY_SPECIAL_PROPERTY
) or
1124 datastore_types
.ReferenceToKeyValue(rhs
.key()))
1126 return cmp(lhs_key
, rhs_key
)
1129 class _ReverseOrder(_BaseComponent
):
1130 """Reverses the comparison for the given object."""
1132 def __init__(self
, obj
):
1133 """Constructor for _ReverseOrder.
1136 obj: Any comparable and hashable object.
1138 super(_ReverseOrder
, self
).__init
__()
1142 return hash(self
._obj
)
1144 def __cmp__(self
, other
):
1145 assert self
.__class
__ == other
.__class
__, (
1146 'A datastore_query._ReverseOrder object can only be compared to '
1147 'an object of the same type.')
1148 return -cmp(self
._obj
, other
._obj
)
1151 class PropertyOrder(Order
):
1152 """An immutable class that represents a sort order for a single property."""
1154 ASCENDING
= datastore_pb
.Query_Order
.ASCENDING
1155 DESCENDING
= datastore_pb
.Query_Order
.DESCENDING
1156 _DIRECTIONS
= frozenset([ASCENDING
, DESCENDING
])
1158 def __init__(self
, prop
, direction
=ASCENDING
):
1162 prop: the name of the prop by which to sort.
1163 direction: the direction in which to sort the given prop.
1166 datastore_errors.BadArgumentError if the prop name or direction is
1169 datastore_types
.ValidateString(prop
,
1171 datastore_errors
.BadArgumentError
)
1172 if not direction
in self
._DIRECTIONS
:
1173 raise datastore_errors
.BadArgumentError('unknown direction: %r' %
1175 super(PropertyOrder
, self
).__init
__()
1176 self
.__order
= datastore_pb
.Query_Order()
1177 self
.__order
.set_property(prop
.encode('utf-8'))
1178 self
.__order
.set_direction(direction
)
1182 return self
.__order
.property()
1185 def direction(self
):
1186 return self
.__order
.direction()
1190 direction
= self
.direction
1192 if direction
== self
.DESCENDING
:
1193 extra
= ', DESCENDING'
1194 name
= repr(name
).encode('utf-8')[1:-1]
1195 return '%s(<%s>%s)' % (self
.__class
__.__name
__, name
, extra
)
1197 @datastore_rpc._positional
(1)
1198 def reversed(self
, group_by
=None):
1199 if group_by
and self
.__order
.property() not in group_by
:
1202 if self
.__order
.direction() == self
.ASCENDING
:
1203 return PropertyOrder(self
.__order
.property().decode('utf-8'),
1206 return PropertyOrder(self
.__order
.property().decode('utf-8'),
1209 def _get_prop_names(self
):
1210 return set([self
.__order
.property()])
1212 def _key(self
, lhs_value_map
):
1213 lhs_values
= lhs_value_map
[self
.__order
.property()]
1215 raise datastore_errors
.BadArgumentError(
1216 'Missing value for property (%s)' % self
.__order
.property())
1218 if self
.__order
.direction() == self
.ASCENDING
:
1219 return min(lhs_values
)
1221 return _ReverseOrder(max(lhs_values
))
1223 def _cmp(self
, lhs_value_map
, rhs_value_map
):
1224 lhs_values
= lhs_value_map
[self
.__order
.property()]
1225 rhs_values
= rhs_value_map
[self
.__order
.property()]
1227 if not lhs_values
and not rhs_values
:
1231 raise datastore_errors
.BadArgumentError(
1232 'LHS missing value for property (%s)' % self
.__order
.property())
1235 raise datastore_errors
.BadArgumentError(
1236 'RHS missing value for property (%s)' % self
.__order
.property())
1238 if self
.__order
.direction() == self
.ASCENDING
:
1239 return cmp(min(lhs_values
), min(rhs_values
))
1241 return cmp(max(rhs_values
), max(lhs_values
))
1244 def _from_pb(cls
, order_pb
):
1246 self
= cls
.__new
__(cls
)
1247 self
.__order
= order_pb
1251 """Returns the internal only pb representation."""
1254 def __getstate__(self
):
1255 raise pickle
.PicklingError(
1256 'Pickling of datastore_query.PropertyOrder is unsupported.')
1259 class CompositeOrder(Order
):
1260 """An immutable class that represents a sequence of Orders.
1262 This class proactively flattens sub-orders that are of type CompositeOrder.
1264 CompositeOrder([O1, CompositeOrder([02, 03]), O4])
1266 CompositeOrder([O1, 02, 03, O4])
1269 def __init__(self
, orders
):
1273 orders: A list of Orders which are applied in order.
1275 if not isinstance(orders
, (list, tuple)):
1276 raise datastore_errors
.BadArgumentError(
1277 'orders argument should be list or tuple (%r)' % (orders
,))
1279 super(CompositeOrder
, self
).__init
__()
1281 for order
in orders
:
1282 if isinstance(order
, CompositeOrder
):
1283 flattened
.extend(order
._orders
)
1284 elif isinstance(order
, Order
):
1285 flattened
.append(order
)
1287 raise datastore_errors
.BadArgumentError(
1288 'orders argument should only contain Order (%r)' % (order
,))
1289 self
._orders
= tuple(flattened
)
1296 return '%s(%r)' % (self
.__class
__.__name
__, list(self
.orders
))
1298 @datastore_rpc._positional
(1)
1299 def reversed(self
, group_by
=None):
1300 return CompositeOrder([order
.reversed(group_by
=group_by
)
1301 for order
in self
._orders
])
1303 def _get_prop_names(self
):
1305 for order
in self
._orders
:
1306 names |
= order
._get
_prop
_names
()
1309 def _key(self
, lhs_value_map
):
1311 for order
in self
._orders
:
1312 result
.append(order
._key
(lhs_value_map
))
1313 return tuple(result
)
1315 def _cmp(self
, lhs_value_map
, rhs_value_map
):
1316 for order
in self
._orders
:
1317 result
= order
._cmp
(lhs_value_map
, rhs_value_map
)
1323 """Returns the number of sub-orders the instance contains."""
1324 return len(self
._orders
)
1327 """Returns an ordered list of internal only pb representations."""
1328 return [order
._to
_pb
() for order
in self
._orders
]
1330 def __eq__(self
, other
):
1331 if self
.__class
__ is other
.__class
__:
1332 return super(CompositeOrder
, self
).__eq
__(other
)
1335 if len(self
._orders
) == 1:
1336 result
= self
._orders
[0].__eq
__(other
)
1337 if result
is NotImplemented and hasattr(other
, '__eq__'):
1338 return other
.__eq
__(self
._orders
[0])
1341 return NotImplemented
1344 class FetchOptions(datastore_rpc
.Configuration
):
1345 """An immutable class that contains all options for fetching results.
1347 These options apply to any request that pulls results from a query.
1349 This class reserves the right to define configuration options of any name
1350 except those that start with 'user_'. External subclasses should only define
1351 function or variables with names that start with in 'user_'.
1353 Options are set by passing keyword arguments to the constructor corresponding
1354 to the configuration options defined below and in datastore_rpc.Configuration.
1356 This object can be used as the default config for a datastore_rpc.Connection
1357 but in that case some options will be ignored, see option documentation below
1361 @datastore_rpc.ConfigOption
1362 def produce_cursors(value
):
1363 """If a Cursor should be returned with the fetched results.
1366 datastore_errors.BadArgumentError if value is not a bool.
1368 if not isinstance(value
, bool):
1369 raise datastore_errors
.BadArgumentError(
1370 'produce_cursors argument should be bool (%r)' % (value
,))
1373 @datastore_rpc.ConfigOption
1375 """The number of results to skip before returning the first result.
1377 Only applies to the first request it is used with and is ignored if present
1378 on datastore_rpc.Connection.config.
1381 datastore_errors.BadArgumentError if value is not a integer or is less
1384 datastore_types
.ValidateInteger(value
,
1386 datastore_errors
.BadArgumentError
,
1390 @datastore_rpc.ConfigOption
1391 def batch_size(value
):
1392 """The number of results to attempt to retrieve in a batch.
1395 datastore_errors.BadArgumentError if value is not a integer or is not
1398 datastore_types
.ValidateInteger(value
,
1400 datastore_errors
.BadArgumentError
)
1404 class QueryOptions(FetchOptions
):
1405 """An immutable class that contains all options for running a query.
1407 This class contains options that control execution process (deadline,
1408 batch_size, read_policy, etc) and what part of the query results are returned
1409 (keys_only, projection, offset, limit, etc) Options that control the contents
1410 of the query results are specified on the datastore_query.Query directly.
1412 This class reserves the right to define configuration options of any name
1413 except those that start with 'user_'. External subclasses should only define
1414 function or variables with names that start with in 'user_'.
1416 Options are set by passing keyword arguments to the constructor corresponding
1417 to the configuration options defined below and in FetchOptions and
1418 datastore_rpc.Configuration.
1420 This object can be used as the default config for a datastore_rpc.Connection
1421 but in that case some options will be ignored, see below for details.
1425 ORDER_FIRST
= datastore_pb
.Query
.ORDER_FIRST
1426 ANCESTOR_FIRST
= datastore_pb
.Query
.ANCESTOR_FIRST
1427 FILTER_FIRST
= datastore_pb
.Query
.FILTER_FIRST
1428 _HINTS
= frozenset([ORDER_FIRST
, ANCESTOR_FIRST
, FILTER_FIRST
])
1430 @datastore_rpc.ConfigOption
1431 def keys_only(value
):
1432 """If the query should only return keys.
1435 datastore_errors.BadArgumentError if value is not a bool.
1437 if not isinstance(value
, bool):
1438 raise datastore_errors
.BadArgumentError(
1439 'keys_only argument should be bool (%r)' % (value
,))
1442 @datastore_rpc.ConfigOption
1443 def projection(value
):
1444 """A list or tuple of property names to project.
1446 If None, the entire entity is returned.
1448 Specifying a projection:
1449 - may change the index requirements for the given query;
1450 - will cause a partial entity to be returned;
1451 - will cause only entities that contain those properties to be returned;
1453 A partial entities only contain the property name and value for properties
1454 in the projection (meaning and multiple will not be set). They will also
1455 only contain a single value for any multi-valued property. However, if a
1456 multi-valued property is specified in the order, an inequality property, or
1457 the projected properties, the entity will be returned multiple times. Once
1458 for each unique combination of values.
1460 However, projection queries are significantly faster than normal queries.
1463 datastore_errors.BadArgumentError if value is empty or not a list or tuple
1466 if isinstance(value
, list):
1467 value
= tuple(value
)
1468 elif not isinstance(value
, tuple):
1469 raise datastore_errors
.BadArgumentError(
1470 'projection argument should be a list or tuple (%r)' % (value
,))
1472 raise datastore_errors
.BadArgumentError(
1473 'projection argument cannot be empty')
1475 if not isinstance(prop
, basestring
):
1476 raise datastore_errors
.BadArgumentError(
1477 'projection argument should contain only strings (%r)' % (prop
,))
1481 @datastore_rpc.ConfigOption
1483 """Limit on the number of results to return.
1486 datastore_errors.BadArgumentError if value is not an integer or is less
1489 datastore_types
.ValidateInteger(value
,
1491 datastore_errors
.BadArgumentError
,
1495 @datastore_rpc.ConfigOption
1496 def prefetch_size(value
):
1497 """Number of results to attempt to return on the initial request.
1500 datastore_errors.BadArgumentError if value is not an integer or is not
1503 datastore_types
.ValidateInteger(value
,
1505 datastore_errors
.BadArgumentError
,
1509 @datastore_rpc.ConfigOption
1510 def start_cursor(value
):
1511 """Cursor to use a start position.
1513 Ignored if present on datastore_rpc.Connection.config.
1516 datastore_errors.BadArgumentError if value is not a Cursor.
1518 if not isinstance(value
, Cursor
):
1519 raise datastore_errors
.BadArgumentError(
1520 'start_cursor argument should be datastore_query.Cursor (%r)' %
1524 @datastore_rpc.ConfigOption
1525 def end_cursor(value
):
1526 """Cursor to use as an end position.
1528 Ignored if present on datastore_rpc.Connection.config.
1531 datastore_errors.BadArgumentError if value is not a Cursor.
1533 if not isinstance(value
, Cursor
):
1534 raise datastore_errors
.BadArgumentError(
1535 'end_cursor argument should be datastore_query.Cursor (%r)' %
1539 @datastore_rpc.ConfigOption
1541 """Hint on how the datastore should plan the query.
1544 datastore_errors.BadArgumentError if value is not a known hint.
1546 if value
not in QueryOptions
._HINTS
:
1547 raise datastore_errors
.BadArgumentError('Unknown query hint (%r)' %
1552 class Cursor(_BaseComponent
):
1553 """An immutable class that represents a relative position in a query.
1555 The position denoted by a Cursor is relative to a result in a query even
1556 if the result has been removed from the given query. Usually to position
1557 immediately after the last result returned by a batch.
1559 A cursor should only be used on a query with an identical signature to the
1560 one that produced it.
1563 @datastore_rpc._positional
(1)
1564 def __init__(self
, _cursor_pb
=None, urlsafe
=None, _cursor_bytes
=None):
1567 A Cursor constructed with no arguments points the first result of any
1568 query. If such a Cursor is used as an end_cursor no results will ever be
1573 super(Cursor
, self
).__init
__()
1574 if ((urlsafe
is not None) + (_cursor_pb
is not None)
1575 + (_cursor_bytes
is not None) > 1):
1576 raise datastore_errors
.BadArgumentError(
1577 'Can only specify one of _cursor_pb, urlsafe, and _cursor_bytes')
1578 if urlsafe
is not None:
1579 _cursor_bytes
= self
._urlsafe
_to
_bytes
(urlsafe
)
1580 if _cursor_pb
is not None:
1581 if not isinstance(_cursor_pb
, datastore_pb
.CompiledCursor
):
1582 raise datastore_errors
.BadArgumentError(
1583 '_cursor_pb argument should be datastore_pb.CompiledCursor (%r)' %
1585 _cursor_bytes
= _cursor_pb
.Encode()
1586 if _cursor_bytes
is not None:
1587 if _cursor_pb
is None and urlsafe
is None:
1591 Cursor
._bytes
_to
_cursor
_pb
(_cursor_bytes
)
1592 self
.__cursor
_bytes
= _cursor_bytes
1594 self
.__cursor
_bytes
= ''
1597 arg
= self
.to_websafe_string()
1600 return '%s(%s)' % (self
.__class
__.__name
__, arg
)
1604 """Creates a cursor for use in a query with a reversed sort order."""
1605 compiled_cursor
= self
._to
_pb
()
1606 if compiled_cursor
.has_position():
1607 pos
= compiled_cursor
.position()
1608 if pos
.has_start_key():
1609 raise datastore_errors
.BadRequestError('Cursor cannot be reversed.')
1610 pos
.set_start_inclusive(not pos
.start_inclusive())
1611 return Cursor(_cursor_pb
=compiled_cursor
)
1614 """Serialize cursor as a byte string."""
1615 return self
.__cursor
_bytes
1618 def from_bytes(cursor
):
1619 """Gets a Cursor given its byte string serialized form.
1621 The serialized form of a cursor may change in a non-backwards compatible
1622 way. In this case cursors must be regenerated from a new Query request.
1625 cursor: A serialized cursor as returned by .to_bytes.
1631 datastore_errors.BadValueError if the cursor argument does not represent a
1634 return Cursor(_cursor_bytes
=cursor
)
1638 def _bytes_to_cursor_pb(cursor
):
1641 cursor_pb
= datastore_pb
.CompiledCursor(cursor
)
1642 except (ValueError, TypeError), e
:
1643 raise datastore_errors
.BadValueError(
1644 'Invalid cursor (%r). Details: %s' % (cursor
, e
))
1645 except Exception, e
:
1652 if e
.__class
__.__name
__ == 'ProtocolBufferDecodeError':
1653 raise datastore_errors
.BadValueError(
1654 'Invalid cursor %s. Details: %s' % (cursor
, e
))
1660 """Serialize cursor as a websafe string.
1663 A base64-encoded serialized cursor.
1665 return base64
.urlsafe_b64encode(self
.to_bytes())
1666 to_websafe_string
= urlsafe
1669 def from_websafe_string(cursor
):
1670 """Gets a Cursor given its websafe serialized form.
1672 The serialized form of a cursor may change in a non-backwards compatible
1673 way. In this case cursors must be regenerated from a new Query request.
1676 cursor: A serialized cursor as returned by .to_websafe_string.
1682 datastore_errors.BadValueError if the cursor argument is not a string
1683 type of does not represent a serialized cursor.
1685 decoded_bytes
= Cursor
._urlsafe
_to
_bytes
(cursor
)
1686 return Cursor
.from_bytes(decoded_bytes
)
1689 def _urlsafe_to_bytes(cursor
):
1691 if not isinstance(cursor
, basestring
):
1692 raise datastore_errors
.BadValueError(
1693 'cursor argument should be str or unicode (%r)' % (cursor
,))
1698 decoded_bytes
= base64
.b64decode(str(cursor
).replace('-', '+').replace('_', '/'))
1699 except (ValueError, TypeError), e
:
1700 raise datastore_errors
.BadValueError(
1701 'Invalid cursor %s. Details: %s' % (cursor
, e
))
1702 return decoded_bytes
1705 def _from_query_result(query_result
):
1706 if query_result
.has_compiled_cursor():
1707 return Cursor(_cursor_pb
=query_result
.compiled_cursor())
1710 def advance(self
, offset
, query
, conn
):
1711 """Advances a Cursor by the given offset.
1714 offset: The amount to advance the current query.
1715 query: A Query identical to the one this cursor was created from.
1716 conn: The datastore_rpc.Connection to use.
1719 A new cursor that is advanced by offset using the given query.
1721 datastore_types
.ValidateInteger(offset
,
1723 datastore_errors
.BadArgumentError
)
1724 if not isinstance(query
, Query
):
1725 raise datastore_errors
.BadArgumentError(
1726 'query argument should be datastore_query.Query (%r)' % (query
,))
1728 query_options
= QueryOptions(
1729 start_cursor
=self
, offset
=offset
, limit
=0, produce_cursors
=True)
1730 return query
.run(conn
, query_options
).next_batch(
1731 Batcher
.AT_LEAST_OFFSET
).cursor(0)
1735 """Returns the internal only pb representation."""
1736 return Cursor
._bytes
_to
_cursor
_pb
(self
.__cursor
_bytes
)
1738 def __setstate__(self
, state
):
1739 if '_Cursor__compiled_cursor' in state
:
1741 self
.__cursor
_bytes
= state
['_Cursor__compiled_cursor'].Encode()
1743 self
.__dict
__ = state
1746 class _QueryKeyFilter(_BaseComponent
):
1747 """A class that implements the key filters available on a Query."""
1749 @datastore_rpc._positional
(1)
1750 def __init__(self
, app
=None, namespace
=None, kind
=None, ancestor
=None):
1751 """Constructs a _QueryKeyFilter.
1753 If app/namespace and ancestor are not defined, the app/namespace set in the
1754 environment is used.
1757 app: a string representing the required app id or None.
1758 namespace: a string representing the required namespace or None.
1759 kind: a string representing the required kind or None.
1760 ancestor: a entity_pb.Reference representing the required ancestor or
1764 datastore_erros.BadArgumentError if app and ancestor.app() do not match or
1765 an unexpected type is passed in for any argument.
1767 if kind
is not None:
1768 datastore_types
.ValidateString(
1769 kind
, 'kind', datastore_errors
.BadArgumentError
)
1771 if ancestor
is not None:
1772 if not isinstance(ancestor
, entity_pb
.Reference
):
1773 raise datastore_errors
.BadArgumentError(
1774 'ancestor argument should be entity_pb.Reference (%r)' %
1777 app
= ancestor
.app()
1778 elif app
!= ancestor
.app():
1779 raise datastore_errors
.BadArgumentError(
1780 'ancestor argument should match app ("%r" != "%r")' %
1781 (ancestor
.app(), app
))
1783 if namespace
is None:
1784 namespace
= ancestor
.name_space()
1785 elif namespace
!= ancestor
.name_space():
1786 raise datastore_errors
.BadArgumentError(
1787 'ancestor argument should match namespace ("%r" != "%r")' %
1788 (ancestor
.name_space(), namespace
))
1790 pb
= entity_pb
.Reference()
1791 pb
.CopyFrom(ancestor
)
1793 self
.__ancestor
= ancestor
1794 self
.__path
= ancestor
.path().element_list()
1796 self
.__ancestor
= None
1799 super(_QueryKeyFilter
, self
).__init
__()
1800 self
.__app
= datastore_types
.ResolveAppId(app
).encode('utf-8')
1801 self
.__namespace
= (
1802 datastore_types
.ResolveNamespace(namespace
).encode('utf-8'))
1803 self
.__kind
= kind
and kind
.encode('utf-8')
1810 def namespace(self
):
1811 return self
.__namespace
1820 return self
.__ancestor
1822 def __call__(self
, entity_or_reference
):
1823 """Apply the filter.
1825 Accepts either an entity or a reference to avoid the need to extract keys
1826 from entities when we have a list of entities (which is a common case).
1829 entity_or_reference: Either an entity_pb.EntityProto or
1830 entity_pb.Reference.
1832 if isinstance(entity_or_reference
, entity_pb
.Reference
):
1833 key
= entity_or_reference
1834 elif isinstance(entity_or_reference
, entity_pb
.EntityProto
):
1835 key
= entity_or_reference
.key()
1837 raise datastore_errors
.BadArgumentError(
1838 'entity_or_reference argument must be an entity_pb.EntityProto ' +
1839 'or entity_pb.Reference (%r)' % (entity_or_reference
))
1840 return (key
.app() == self
.__app
and
1841 key
.name_space() == self
.__namespace
and
1843 key
.path().element_list()[-1].type() == self
.__kind
) and
1845 key
.path().element_list()[0:len(self
.__path
)] == self
.__path
))
1848 pb
= datastore_pb
.Query()
1850 pb
.set_app(self
.__app
)
1851 datastore_types
.SetNamespace(pb
, self
.__namespace
)
1852 if self
.__kind
is not None:
1853 pb
.set_kind(self
.__kind
)
1855 ancestor
= pb
.mutable_ancestor()
1856 ancestor
.CopyFrom(self
.__ancestor
)
1861 class _BaseQuery(_BaseComponent
):
1862 """A base class for query implementations."""
1864 def run(self
, conn
, query_options
=None):
1865 """Runs the query using provided datastore_rpc.Connection.
1868 conn: The datastore_rpc.Connection to use
1869 query_options: Optional query options to use
1872 A Batcher that implicitly fetches query results asynchronously.
1875 datastore_errors.BadArgumentError if any of the arguments are invalid.
1877 return Batcher(query_options
, self
.run_async(conn
, query_options
))
1879 def run_async(self
, conn
, query_options
=None):
1880 """Runs the query using the provided datastore_rpc.Connection.
1883 conn: the datastore_rpc.Connection on which to run the query.
1884 query_options: Optional QueryOptions with which to run the query.
1887 An async object that can be used to grab the first Batch. Additional
1888 batches can be retrieved by calling Batch.next_batch/next_batch_async.
1891 datastore_errors.BadArgumentError if any of the arguments are invalid.
1893 raise NotImplementedError
1895 def __getstate__(self
):
1896 raise pickle
.PicklingError(
1897 'Pickling of %r is unsupported.' % self
)
1900 class Query(_BaseQuery
):
1901 """An immutable class that represents a query signature.
1903 A query signature consists of a source of entities (specified as app,
1904 namespace and optionally kind and ancestor) as well as a FilterPredicate,
1905 grouping and a desired ordering.
1908 @datastore_rpc._positional
(1)
1909 def __init__(self
, app
=None, namespace
=None, kind
=None, ancestor
=None,
1910 filter_predicate
=None, group_by
=None, order
=None):
1914 app: Optional app to query, derived from the environment if not specified.
1915 namespace: Optional namespace to query, derived from the environment if
1917 kind: Optional kind to query.
1918 ancestor: Optional ancestor to query, an entity_pb.Reference.
1919 filter_predicate: Optional FilterPredicate by which to restrict the query.
1920 order: Optional Order in which to return results.
1921 group_by: Optional list of properties to group the results by.
1924 datastore_errors.BadArgumentError if any argument is invalid.
1926 super(Query
, self
).__init
__()
1929 if filter_predicate
is not None and not isinstance(filter_predicate
,
1931 raise datastore_errors
.BadArgumentError(
1932 'filter_predicate should be datastore_query.FilterPredicate (%r)' %
1933 (filter_predicate
,))
1936 if isinstance(order
, CompositeOrder
):
1937 if order
.size() == 0:
1939 elif isinstance(order
, Order
):
1940 order
= CompositeOrder([order
])
1941 elif order
is not None:
1942 raise datastore_errors
.BadArgumentError(
1943 'order should be Order (%r)' % (order
,))
1946 if group_by
is not None:
1947 if isinstance(group_by
, list):
1948 group_by
= tuple(group_by
)
1949 elif not isinstance(group_by
, tuple):
1950 raise datastore_errors
.BadArgumentError(
1951 'group_by argument should be a list or tuple (%r)' % (group_by
,))
1953 raise datastore_errors
.BadArgumentError(
1954 'group_by argument cannot be empty')
1955 for prop
in group_by
:
1956 if not isinstance(prop
, basestring
):
1957 raise datastore_errors
.BadArgumentError(
1958 'group_by argument should contain only strings (%r)' % (prop
,))
1960 self
._key
_filter
= _QueryKeyFilter(app
=app
, namespace
=namespace
, kind
=kind
,
1963 self
._filter
_predicate
= filter_predicate
1964 self
._group
_by
= group_by
1968 return self
._key
_filter
.app
1971 def namespace(self
):
1972 return self
._key
_filter
.namespace
1976 return self
._key
_filter
.kind
1980 return self
._key
_filter
.ancestor
1983 def filter_predicate(self
):
1984 return self
._filter
_predicate
1992 return self
._group
_by
1996 args
.append('app=%r' % self
.app
)
1999 args
.append('namespace=%r' % ns
)
2001 if kind
is not None:
2002 args
.append('kind=%r' % kind
)
2003 ancestor
= self
.ancestor
2004 if ancestor
is not None:
2005 websafe
= base64
.urlsafe_b64encode(ancestor
.Encode())
2006 args
.append('ancestor=<%s>' % websafe
)
2007 filter_predicate
= self
.filter_predicate
2008 if filter_predicate
is not None:
2009 args
.append('filter_predicate=%r' % filter_predicate
)
2011 if order
is not None:
2012 args
.append('order=%r' % order
)
2013 group_by
= self
.group_by
2014 if group_by
is not None:
2015 args
.append('group_by=%r' % (group_by
,))
2016 return '%s(%s)' % (self
.__class
__.__name
__, ', '.join(args
))
2018 def run_async(self
, conn
, query_options
=None):
2019 if not isinstance(conn
, datastore_rpc
.BaseConnection
):
2020 raise datastore_errors
.BadArgumentError(
2021 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn
,))
2023 if not QueryOptions
.is_configuration(query_options
):
2026 query_options
= QueryOptions(config
=query_options
)
2028 start_cursor
= query_options
.start_cursor
2029 if not start_cursor
and query_options
.produce_cursors
:
2030 start_cursor
= Cursor()
2032 req
= self
._to
_pb
(conn
, query_options
)
2033 return Batch
.create_async(self
, query_options
, conn
, req
,
2034 start_cursor
=start_cursor
)
2037 def _from_pb(cls
, query_pb
):
2038 kind
= query_pb
.has_kind() and query_pb
.kind().decode('utf-8') or None
2039 ancestor
= query_pb
.has_ancestor() and query_pb
.ancestor() or None
2041 filter_predicate
= None
2042 if query_pb
.filter_size() > 0:
2043 filter_predicate
= CompositeFilter(
2044 CompositeFilter
.AND
,
2045 [PropertyFilter
._from
_pb
(filter_pb
)
2046 for filter_pb
in query_pb
.filter_list()])
2049 if query_pb
.order_size() > 0:
2050 order
= CompositeOrder([PropertyOrder
._from
_pb
(order_pb
)
2051 for order_pb
in query_pb
.order_list()])
2054 if query_pb
.group_by_property_name_size() > 0:
2055 group_by
= tuple(name
.decode('utf-8')
2056 for name
in query_pb
.group_by_property_name_list())
2058 return Query(app
=query_pb
.app().decode('utf-8'),
2059 namespace
=query_pb
.name_space().decode('utf-8'),
2062 filter_predicate
=filter_predicate
,
2066 def _to_pb(self
, conn
, query_options
):
2067 """Returns the internal only pb representation."""
2068 pb
= self
._key
_filter
._to
_pb
()
2071 if self
._filter
_predicate
:
2072 for f
in self
._filter
_predicate
._to
_pbs
():
2073 pb
.add_filter().CopyFrom(f
)
2077 for order
in self
._order
._to
_pbs
():
2078 pb
.add_order().CopyFrom(order
)
2082 pb
.group_by_property_name_list().extend(self
._group
_by
)
2085 if QueryOptions
.keys_only(query_options
, conn
.config
):
2086 pb
.set_keys_only(True)
2088 projection
= QueryOptions
.projection(query_options
, conn
.config
)
2091 extra
= set(projection
) - set(self
._group
_by
)
2093 raise datastore_errors
.BadRequestError(
2094 'projections includes properties not in the group_by argument: %s'
2096 pb
.property_name_list().extend(projection
)
2097 elif self
._group
_by
:
2098 raise datastore_errors
.BadRequestError(
2099 'cannot specify group_by without a projection')
2101 if QueryOptions
.produce_cursors(query_options
, conn
.config
):
2102 pb
.set_compile(True)
2104 limit
= QueryOptions
.limit(query_options
, conn
.config
)
2105 if limit
is not None:
2108 count
= QueryOptions
.prefetch_size(query_options
, conn
.config
)
2110 count
= QueryOptions
.batch_size(query_options
, conn
.config
)
2111 if count
is not None:
2115 if query_options
.offset
:
2116 pb
.set_offset(query_options
.offset
)
2119 if query_options
.start_cursor
is not None:
2120 pb
.mutable_compiled_cursor().CopyFrom(query_options
.start_cursor
._to
_pb
())
2123 if query_options
.end_cursor
is not None:
2124 pb
.mutable_end_compiled_cursor().CopyFrom(
2125 query_options
.end_cursor
._to
_pb
())
2128 if ((query_options
.hint
== QueryOptions
.ORDER_FIRST
and pb
.order_size()) or
2129 (query_options
.hint
== QueryOptions
.ANCESTOR_FIRST
and
2130 pb
.has_ancestor()) or
2131 (query_options
.hint
== QueryOptions
.FILTER_FIRST
and
2132 pb
.filter_size() > 0)):
2133 pb
.set_hint(query_options
.hint
)
2136 conn
._set
_request
_read
_policy
(pb
, query_options
)
2137 conn
._set
_request
_transaction
(pb
)
2142 def apply_query(query
, entities
):
2143 """Performs the given query on a set of in-memory entities.
2145 This function can perform queries impossible in the datastore (e.g a query
2146 with multiple inequality filters on different properties) because all
2147 operations are done in memory. For queries that can also be executed on the
2148 the datastore, the results produced by this function may not use the same
2149 implicit ordering as the datastore. To ensure compatibility, explicit
2150 ordering must be used (e.g. 'ORDER BY ineq_prop, ..., __key__').
2152 Order by __key__ should always be used when a consistent result is desired
2153 (unless there is a sort order on another globally unique property).
2156 query: a datastore_query.Query to apply
2157 entities: a list of entity_pb.EntityProto on which to apply the query.
2160 A list of entity_pb.EntityProto contain the results of the query.
2162 if not isinstance(query
, Query
):
2163 raise datastore_errors
.BadArgumentError(
2164 "query argument must be a datastore_query.Query (%r)" % (query
,))
2166 if not isinstance(entities
, list):
2167 raise datastore_errors
.BadArgumentError(
2168 "entities argument must be a list (%r)" % (entities
,))
2170 filtered_entities
= filter(query
._key
_filter
, entities
)
2172 if not query
._order
:
2177 if query
._filter
_predicate
:
2178 return filter(query
._filter
_predicate
, filtered_entities
)
2179 return filtered_entities
2185 names
= query
._order
._get
_prop
_names
()
2186 if query
._filter
_predicate
:
2187 names |
= query
._filter
_predicate
._get
_prop
_names
()
2190 exists_filter
= _PropertyExistsFilter(names
)
2193 for entity
in filtered_entities
:
2194 value_map
= _make_key_value_map(entity
, names
)
2198 if exists_filter
._apply
(value_map
) and (
2199 not query
._filter
_predicate
or
2200 query
._filter
_predicate
._prune
(value_map
)):
2201 value_map
['__entity__'] = entity
2202 value_maps
.append(value_map
)
2204 value_maps
.sort(query
._order
._cmp
)
2205 return [value_map
['__entity__'] for value_map
in value_maps
]
2208 class _AugmentedQuery(_BaseQuery
):
2209 """A query that combines a datastore query with in-memory filters/results."""
2211 @datastore_rpc._positional
(2)
2212 def __init__(self
, query
, in_memory_results
=None, in_memory_filter
=None,
2213 max_filtered_count
=None):
2214 """Constructor for _AugmentedQuery.
2216 Do not call directly. Use the utility functions instead (e.g.
2217 datastore_query.inject_results)
2220 query: A datastore_query.Query object to augment.
2221 in_memory_results: a list of pre- sorted and filtered result to add to the
2222 stream of datastore results or None .
2223 in_memory_filter: a set of in-memory filters to apply to the datastore
2225 max_filtered_count: the maximum number of datastore entities that will be
2226 filtered out by in_memory_filter if known.
2228 if not isinstance(query
, Query
):
2229 raise datastore_errors
.BadArgumentError(
2230 'query argument should be datastore_query.Query (%r)' % (query
,))
2231 if (in_memory_filter
is not None and
2232 not isinstance(in_memory_filter
, FilterPredicate
)):
2233 raise datastore_errors
.BadArgumentError(
2234 'in_memory_filter argument should be ' +
2235 'datastore_query.FilterPredicate (%r)' % (in_memory_filter
,))
2236 if (in_memory_results
is not None and
2237 not isinstance(in_memory_results
, list)):
2238 raise datastore_errors
.BadArgumentError(
2239 'in_memory_results argument should be a list of' +
2240 'datastore_pv.EntityProto (%r)' % (in_memory_results
,))
2241 datastore_types
.ValidateInteger(max_filtered_count
,
2242 'max_filtered_count',
2246 self
._max
_filtered
_count
= max_filtered_count
2247 self
._in
_memory
_filter
= in_memory_filter
2248 self
._in
_memory
_results
= in_memory_results
2252 return self
._query
._key
_filter
.app
2255 def namespace(self
):
2256 return self
._query
._key
_filter
.namespace
2260 return self
._query
._key
_filter
.kind
2264 return self
._query
._key
_filter
.ancestor
2267 def filter_predicate(self
):
2268 return self
._query
._filter
_predicate
2272 return self
._query
._order
2276 return self
._query
._group
_by
2278 def run_async(self
, conn
, query_options
=None):
2279 if not isinstance(conn
, datastore_rpc
.BaseConnection
):
2280 raise datastore_errors
.BadArgumentError(
2281 'conn should be a datastore_rpc.BaseConnection (%r)' % (conn
,))
2283 if not QueryOptions
.is_configuration(query_options
):
2286 query_options
= QueryOptions(config
=query_options
)
2288 if self
._query
._order
:
2291 changes
= {'keys_only': False}
2295 if self
._in
_memory
_filter
or self
._in
_memory
_results
:
2299 in_memory_offset
= query_options
.offset
2300 in_memory_limit
= query_options
.limit
2302 if in_memory_limit
is not None:
2303 if self
._in
_memory
_filter
is None:
2305 changes
['limit'] = in_memory_limit
2306 elif self
._max
_filtered
_count
is not None:
2309 changes
['limit'] = in_memory_limit
+ self
._max
_filtered
_count
2312 changes
['limit'] = None
2314 if in_memory_offset
:
2316 changes
['offset'] = None
2317 if changes
.get('limit', None) is not None:
2318 changes
['limit'] += in_memory_offset
2320 in_memory_offset
= None
2322 in_memory_offset
= None
2323 in_memory_limit
= None
2325 req
= self
._query
._to
_pb
(
2326 conn
, QueryOptions(config
=query_options
, **changes
))
2328 start_cursor
= query_options
.start_cursor
2329 if not start_cursor
and query_options
.produce_cursors
:
2330 start_cursor
= Cursor()
2332 return _AugmentedBatch
.create_async(self
, query_options
, conn
, req
,
2333 in_memory_offset
=in_memory_offset
,
2334 in_memory_limit
=in_memory_limit
,
2335 start_cursor
=start_cursor
)
2338 @datastore_rpc._positional
(1)
2339 def inject_results(query
, updated_entities
=None, deleted_keys
=None):
2340 """Creates a query object that will inject changes into results.
2343 query: The datastore_query.Query to augment
2344 updated_entities: A list of entity_pb.EntityProto's that have been updated
2345 and should take priority over any values returned by query.
2346 deleted_keys: A list of entity_pb.Reference's for entities that have been
2347 deleted and should be removed from query results.
2350 A datastore_query.AugmentedQuery if in memory filtering is requred,
2353 if not isinstance(query
, Query
):
2354 raise datastore_errors
.BadArgumentError(
2355 'query argument should be datastore_query.Query (%r)' % (query
,))
2357 overriden_keys
= set()
2359 if deleted_keys
is not None:
2360 if not isinstance(deleted_keys
, list):
2361 raise datastore_errors
.BadArgumentError(
2362 'deleted_keys argument must be a list (%r)' % (deleted_keys
,))
2363 deleted_keys
= filter(query
._key
_filter
, deleted_keys
)
2364 for key
in deleted_keys
:
2365 overriden_keys
.add(datastore_types
.ReferenceToKeyValue(key
))
2367 if updated_entities
is not None:
2368 if not isinstance(updated_entities
, list):
2369 raise datastore_errors
.BadArgumentError(
2370 'updated_entities argument must be a list (%r)' % (updated_entities
,))
2373 updated_entities
= filter(query
._key
_filter
, updated_entities
)
2374 for entity
in updated_entities
:
2375 overriden_keys
.add(datastore_types
.ReferenceToKeyValue(entity
.key()))
2377 updated_entities
= apply_query(query
, updated_entities
)
2379 updated_entities
= []
2381 if not overriden_keys
:
2384 return _AugmentedQuery(query
,
2385 in_memory_filter
=_IgnoreFilter(overriden_keys
),
2386 in_memory_results
=updated_entities
,
2387 max_filtered_count
=len(overriden_keys
))
2390 class _BatchShared(object):
2391 """Data shared among the batches of a query."""
2393 def __init__(self
, query
, query_options
, conn
, augmented_query
=None):
2394 self
.__query
= query
2395 self
.__query
_options
= query_options
2397 self
.__augmented
_query
= augmented_query
2398 self
.__was
_first
_result
_processed
= False
2405 def query_options(self
):
2406 return self
.__query
_options
2413 def augmented_query(self
):
2414 return self
.__augmented
_query
2417 def keys_only(self
):
2418 return self
.__keys
_only
2421 def compiled_query(self
):
2422 return self
.__compiled
_query
2425 def index_list(self
):
2426 """Returns the list of indexes used by the query.
2427 Possibly None when the adapter does not implement pb_to_index.
2429 return self
.__index
_list
2431 def process_query_result_if_first(self
, query_result
):
2432 if not self
.__was
_first
_result
_processed
:
2433 self
.__was
_first
_result
_processed
= True
2434 self
.__keys
_only
= query_result
.keys_only()
2435 if query_result
.has_compiled_query():
2436 self
.__compiled
_query
= query_result
.compiled_query
2438 self
.__compiled
_query
= None
2440 self
.__index
_list
= [self
.__conn
.adapter
.pb_to_index(index_pb
)
2441 for index_pb
in query_result
.index_list()]
2442 except NotImplementedError:
2444 self
.__index
_list
= None
2447 class Batch(object):
2448 """A batch of results returned by a query.
2450 This class contains a batch of results returned from the datastore and
2451 relevant metadata. This metadata includes:
2452 query: The query that produced this batch
2453 query_options: The QueryOptions used to run the query. This does not
2454 contained any options passed to the .next_batch() call that created the
2456 start_cursor, end_cursor: These are the cursors that can be used
2457 with a query to re-fetch this batch. They can also be used to
2458 find all entities before or after the given batch (by use start_cursor as
2459 an end cursor or vice versa). start_cursor can also be advanced to
2460 point to a position within the batch using Cursor.advance().
2461 skipped_results: the number of result skipped because of the offset
2462 given to the request that generated it. This can be set either on
2463 the original Query.run() request or in subsequent .next_batch() calls.
2464 more_results: If this is true there are more results that can be retrieved
2465 either by .next_batch() or Batcher.next().
2467 This class is also able to fetch the next batch of the query using
2468 .next_batch(). As batches of results must be fetched serially, .next_batch()
2469 can only be called once. Additional calls to .next_batch() will return None.
2470 When there are no more batches .next_batch() will return None as well. Note
2471 that batches returned by iterating over Batcher will always return None for
2472 .next_batch() as the Bather handles fetching the next batch automatically.
2474 A Batch typically represents the result of a single RPC request. The datastore
2475 operates on a "best effort" basis so the batch returned by .next_batch()
2476 or Query.run_async().get_result() may not have satisfied the requested offset
2477 or number of results (specified through FetchOptions.offset and
2478 FetchOptions.batch_size respectively). To satisfy these restrictions
2479 additional batches may be needed (with FetchOptions that specify the remaining
2480 offset or results needed). The Batcher class hides these limitations.
2483 __skipped_cursor
= None
2486 @datastore_rpc._positional
(5)
2487 def create_async(cls
, query
, query_options
, conn
, req
,
2489 batch_shared
= _BatchShared(query
, query_options
, conn
)
2490 batch0
= cls(batch_shared
, start_cursor
=start_cursor
)
2491 return batch0
._make_query_result_rpc_call('RunQuery', query_options
, req
)
2493 @datastore_rpc._positional
(2)
2494 def __init__(self
, batch_shared
, start_cursor
=Cursor()):
2497 This class is constructed in stages (one when an RPC is sent and another
2498 when an rpc is completed) and should not be constructed directly!!
2499 Use Query.run_async().get_result() to create a Batch or Query.run()
2502 This constructor does not perform verification.
2505 batch_shared: Data shared between batches for a a single query run.
2506 start_cursor: Optional cursor pointing before this batch.
2510 self
._batch
_shared
= batch_shared
2511 self
.__start
_cursor
= start_cursor
2514 def query_options(self
):
2515 """The QueryOptions used to retrieve the first batch."""
2516 return self
._batch
_shared
.query_options
2520 """The query the current batch came from."""
2521 return self
._batch
_shared
.query
2525 """A list of entities in this batch."""
2526 return self
.__results
2529 def keys_only(self
):
2530 """Whether the entities in this batch only contain keys."""
2531 return self
._batch
_shared
.keys_only
2534 def index_list(self
):
2535 """Returns the list of indexes used to peform this batch's query.
2536 Possibly None when the adapter does not implement pb_to_index.
2538 return self
._batch
_shared
.index_list
2541 def start_cursor(self
):
2542 """A cursor that points to the position just before the current batch."""
2543 return self
.__start
_cursor
2546 def end_cursor(self
):
2547 """A cursor that points to the position just after the current batch."""
2548 return self
.__end
_cursor
2551 def skipped_results(self
):
2552 """The number of results skipped because of an offset in the request.
2554 An offset is satisfied before any results are returned. The start_cursor
2555 points to the position in the query before the skipped results.
2557 return self
._skipped
_results
2560 def more_results(self
):
2561 """Whether more results can be retrieved from the query."""
2562 return self
.__more
_results
2564 def next_batch(self
, fetch_options
=None):
2565 """Synchronously get the next batch or None if there are no more batches.
2568 fetch_options: Optional fetch options to use when fetching the next batch.
2569 Merged with both the fetch options on the original call and the
2573 A new Batch of results or None if either the next batch has already been
2574 fetched or there are no more results.
2576 async = self
.next_batch_async(fetch_options
)
2579 return async.get_result()
2581 def _compiled_query(self
):
2582 return self
._batch
_shared
.compiled_query
2584 def cursor(self
, index
):
2585 """Gets the cursor that points just after the result at index - 1.
2587 The index is relative to first result in .results. Since start_cursor
2588 points to the position before the first skipped result, the range of
2589 indexes this function supports is limited to
2590 [-skipped_results, len(results)].
2592 For example, using start_cursor=batch.cursor(i) and
2593 end_cursor=batch.cursor(j) will return the results found in
2594 batch.results[i:j]. Note that any result added in the range (i-1, j]
2595 will appear in the new query's results.
2597 Warning: Any index in the range (-skipped_results, 0) may cause
2598 continuation to miss or duplicate results if outside a transaction.
2601 index: An int, the index relative to the first result before which the
2602 cursor should point.
2605 A Cursor that points to a position just after the result index - 1,
2606 which if used as a start_cursor will cause the first result to be
2607 batch.result[index].
2609 if not isinstance(index
, (int, long)):
2610 raise datastore_errors
.BadArgumentError(
2611 'index argument should be entity_pb.Reference (%r)' % (index
,))
2612 if not -self
._skipped
_results
<= index
<= len(self
.__results
):
2613 raise datastore_errors
.BadArgumentError(
2614 'index argument must be in the inclusive range [%d, %d]' %
2615 (-self
._skipped
_results
, len(self
.__results
)))
2617 if index
== -self
._skipped
_results
:
2618 return self
.__start
_cursor
2619 elif (index
== 0 and
2620 self
.__skipped
_cursor
):
2621 return Cursor(_cursor_pb
=self
.__skipped
_cursor
)
2622 elif index
> 0 and self
.__result
_cursors
:
2623 return Cursor(_cursor_pb
=self
.__result
_cursors
[index
- 1])
2625 elif index
== len(self
.__results
):
2626 return self
.__end
_cursor
2632 return self
.__start
_cursor
.advance(index
+ self
._skipped
_results
,
2633 self
._batch
_shared
.query
,
2634 self
._batch
_shared
.conn
)
2636 def next_batch_async(self
, fetch_options
=None):
2637 """Asynchronously get the next batch or None if there are no more batches.
2640 fetch_options: Optional fetch options to use when fetching the next batch.
2641 Merged with both the fetch options on the original call and the
2645 An async object that can be used to get the next Batch or None if either
2646 the next batch has already been fetched or there are no more results.
2648 if not self
.__datastore
_cursor
:
2651 fetch_options
, next_batch
= self
._make
_next
_batch
(fetch_options
)
2652 req
= self
._to
_pb
(fetch_options
)
2654 config
= self
._batch
_shared
.query_options
.merge(fetch_options
)
2655 return next_batch
._make
_query
_result
_rpc
_call
('Next', config
, req
)
2657 def _to_pb(self
, fetch_options
=None):
2658 req
= datastore_pb
.NextRequest()
2660 if FetchOptions
.produce_cursors(fetch_options
,
2661 self
._batch
_shared
.query_options
,
2662 self
._batch
_shared
.conn
.config
):
2663 req
.set_compile(True)
2665 count
= FetchOptions
.batch_size(fetch_options
,
2666 self
._batch
_shared
.query_options
,
2667 self
._batch
_shared
.conn
.config
)
2668 if count
is not None:
2669 req
.set_count(count
)
2671 if fetch_options
is not None and fetch_options
.offset
:
2672 req
.set_offset(fetch_options
.offset
)
2674 req
.mutable_cursor().CopyFrom(self
.__datastore
_cursor
)
2675 self
.__datastore
_cursor
= None
2678 def _extend(self
, next_batch
):
2679 """Combines the current batch with the next one. Called by batcher."""
2680 self
.__datastore
_cursor
= next_batch
.__datastore
_cursor
2681 next_batch
.__datastore
_cursor
= None
2682 self
.__more
_results
= next_batch
.__more
_results
2683 if not self
.__results
:
2684 self
.__skipped
_cursor
= next_batch
.__skipped
_cursor
2685 self
.__results
.extend(next_batch
.__results
)
2686 self
.__result
_cursors
.extend(next_batch
.__result
_cursors
)
2687 self
.__end
_cursor
= next_batch
.__end
_cursor
2688 self
._skipped
_results
+= next_batch
._skipped
_results
2690 def _make_query_result_rpc_call(self
, name
, config
, req
):
2691 """Makes either a RunQuery or Next call that will modify the instance.
2694 name: A string, the name of the call to invoke.
2695 config: The datastore_rpc.Configuration to use for the call.
2696 req: The request to send with the call.
2699 A UserRPC object that can be used to fetch the result of the RPC.
2701 return self
._batch
_shared
.conn
._make
_rpc
_call
(config
, name
, req
,
2702 datastore_pb
.QueryResult(),
2703 self
.__query
_result
_hook
)
2705 _need_index_header
= 'The suggested index for this query is:'
2707 def __query_result_hook(self
, rpc
):
2708 """Internal method used as get_result_hook for RunQuery/Next operation."""
2710 self
._batch
_shared
.conn
.check_rpc_success(rpc
)
2711 except datastore_errors
.NeedIndexError
, exc
:
2713 if isinstance(rpc
.request
, datastore_pb
.Query
):
2714 _
, kind
, ancestor
, props
= datastore_index
.CompositeIndexForQuery(
2717 props
= datastore_index
.GetRecommendedIndexProperties(props
)
2718 yaml
= datastore_index
.IndexYamlForQuery(kind
, ancestor
, props
)
2719 xml
= datastore_index
.IndexXmlForQuery(kind
, ancestor
, props
)
2722 raise datastore_errors
.NeedIndexError(
2723 '\n'.join([str(exc
), self
._need
_index
_header
, yaml
]),
2724 original_message
=str(exc
), header
=self
._need
_index
_header
,
2725 yaml_index
=yaml
, xml_index
=xml
)
2727 query_result
= rpc
.response
2729 self
._batch
_shared
.process_query_result_if_first(query_result
)
2731 if query_result
.has_skipped_results_compiled_cursor():
2732 self
.__skipped
_cursor
= query_result
.skipped_results_compiled_cursor()
2734 self
.__result
_cursors
= list(query_result
.result_compiled_cursor_list())
2735 self
.__end
_cursor
= Cursor
._from
_query
_result
(query_result
)
2736 self
._skipped
_results
= query_result
.skipped_results()
2738 if query_result
.more_results():
2739 self
.__datastore
_cursor
= query_result
.cursor()
2740 self
.__more
_results
= True
2744 self
.__results
= self
._process
_results
(query_result
.result_list())
2748 """Changes the internal state so that no more batches can be produced."""
2749 self
.__datastore
_cursor
= None
2750 self
.__more
_results
= False
2752 def _make_next_batch(self
, fetch_options
):
2753 """Creates the object to store the next batch.
2756 fetch_options: The datastore_query.FetchOptions passed in by the user or
2760 A tuple containing the fetch options that should be used internally and
2761 the object that should be used to contain the next batch.
2763 return fetch_options
, Batch(self
._batch
_shared
,
2764 start_cursor
=self
.__end
_cursor
)
2766 def _process_results(self
, results
):
2767 """Converts the datastore results into results returned to the user.
2770 results: A list of entity_pb.EntityProto's returned by the datastore
2773 A list of results that should be returned to the user.
2775 pb_to_query_result
= self
._batch
_shared
.conn
.adapter
.pb_to_query_result
2776 return [pb_to_query_result(result
, self
._batch
_shared
.query_options
)
2777 for result
in results
]
2779 def __getstate__(self
):
2780 raise pickle
.PicklingError(
2781 'Pickling of datastore_query.Batch is unsupported.')
2784 class _AugmentedBatch(Batch
):
2785 """A batch produced by a datastore_query._AugmentedQuery."""
2788 @datastore_rpc._positional
(5)
2789 def create_async(cls
, augmented_query
, query_options
, conn
, req
,
2790 in_memory_offset
, in_memory_limit
, start_cursor
):
2791 batch_shared
= _BatchShared(augmented_query
._query
,
2795 batch0
= cls(batch_shared
,
2796 in_memory_offset
=in_memory_offset
,
2797 in_memory_limit
=in_memory_limit
,
2798 start_cursor
=start_cursor
)
2799 return batch0
._make_query_result_rpc_call('RunQuery', query_options
, req
)
2801 @datastore_rpc._positional
(2)
2802 def __init__(self
, batch_shared
,
2803 in_memory_offset
=None,
2804 in_memory_limit
=None,
2806 start_cursor
=Cursor()):
2807 """A Constructor for datastore_query._AugmentedBatch.
2809 Constructed by datastore_query._AugmentedQuery. Should not be called
2812 super(_AugmentedBatch
, self
).__init
__(batch_shared
,
2813 start_cursor
=start_cursor
)
2814 self
.__in
_memory
_offset
= in_memory_offset
2815 self
.__in
_memory
_limit
= in_memory_limit
2816 self
.__next
_index
= next_index
2820 """The query the current batch came from."""
2821 return self
._batch
_shared
.augmented_query
2823 def cursor(self
, index
):
2824 raise NotImplementedError
2826 def _extend(self
, next_batch
):
2827 super(_AugmentedBatch
, self
)._extend
(next_batch
)
2828 self
.__in
_memory
_limit
= next_batch
.__in
_memory
_limit
2829 self
.__in
_memory
_offset
= next_batch
.__in
_memory
_offset
2830 self
.__next
_index
= next_batch
.__next
_index
2832 def _process_results(self
, results
):
2834 in_memory_filter
= self
._batch
_shared
.augmented_query
._in
_memory
_filter
2835 if in_memory_filter
:
2836 results
= filter(in_memory_filter
, results
)
2839 in_memory_results
= self
._batch
_shared
.augmented_query
._in
_memory
_results
2840 if in_memory_results
and self
.__next
_index
< len(in_memory_results
):
2842 original_query
= super(_AugmentedBatch
, self
).query
2843 if original_query
._order
:
2846 next_result
= in_memory_results
[self
.__next
_index
]
2847 next_key
= original_query
._order
.key(next_result
)
2849 while i
< len(results
):
2851 result_key
= original_query
._order
.key(result
)
2852 while next_key
<= result_key
:
2853 results
.insert(i
, next_result
)
2855 self
.__next
_index
+= 1
2856 if self
.__next
_index
>= len(in_memory_results
):
2858 next_result
= in_memory_results
[self
.__next
_index
]
2859 next_key
= original_query
._order
.key(next_result
)
2861 elif results
or not super(_AugmentedBatch
, self
).more_results
:
2863 results
= in_memory_results
+ results
2864 self
.__next
_index
= len(in_memory_results
)
2867 if self
.__in
_memory
_offset
:
2868 assert not self
._skipped
_results
2869 offset
= min(self
.__in
_memory
_offset
, len(results
))
2871 self
._skipped
_results
+= offset
2872 self
.__in
_memory
_offset
-= offset
2873 results
= results
[offset
:]
2875 if self
.__in
_memory
_limit
is not None:
2876 results
= results
[:self
.__in
_memory
_limit
]
2877 self
.__in
_memory
_limit
-= len(results
)
2878 if self
.__in
_memory
_limit
<= 0:
2881 return super(_AugmentedBatch
, self
)._process
_results
(results
)
2883 def _make_next_batch(self
, fetch_options
):
2884 in_memory_offset
= FetchOptions
.offset(fetch_options
)
2885 augmented_query
= self
._batch
_shared
.augmented_query
2886 if in_memory_offset
and (augmented_query
._in
_memory
_filter
or
2887 augmented_query
._in
_memory
_results
):
2888 fetch_options
= FetchOptions(offset
=0)
2890 in_memory_offset
= None
2891 return (fetch_options
,
2892 _AugmentedBatch(self
._batch
_shared
,
2893 in_memory_offset
=in_memory_offset
,
2894 in_memory_limit
=self
.__in
_memory
_limit
,
2895 start_cursor
=self
.end_cursor
,
2896 next_index
=self
.__next
_index
))
2899 class Batcher(object):
2900 """A class that implements the Iterator interface for Batches.
2902 Typically constructed by a call to Query.run().
2904 The class hides the "best effort" nature of the datastore by potentially
2905 making multiple requests to the datastore and merging the resulting batches.
2906 This is accomplished efficiently by prefetching results and mixing both
2907 non-blocking and blocking calls to the datastore as needed.
2909 Iterating through batches is almost always more efficient than pulling all
2910 results at once as RPC latency is hidden by asynchronously prefetching
2913 The batches produce by this class cannot be used to fetch the next batch
2914 (through Batch.next_batch()) as before the current batch is returned the
2915 request for the next batch has already been sent.
2921 AT_LEAST_ONE
= object()
2923 def __init__(self
, query_options
, first_async_batch
):
2926 Although this class can be manually constructed, it is preferable to use
2927 Query.run(query_options).
2930 query_options: The QueryOptions used to create the first batch.
2931 first_async_batch: The first batch produced by
2932 Query.run_async(query_options).
2934 self
.__next
_batch
= first_async_batch
2935 self
.__initial
_offset
= QueryOptions
.offset(query_options
) or 0
2936 self
.__skipped
_results
= 0
2939 """Get the next batch. See .next_batch()."""
2940 return self
.next_batch(self
.AT_LEAST_ONE
)
2942 def next_batch(self
, min_batch_size
):
2943 """Get the next batch.
2945 The batch returned by this function cannot be used to fetch the next batch
2946 (through Batch.next_batch()). Instead this function will always return None.
2947 To retrieve the next batch use .next() or .next_batch(N).
2949 This function may return a batch larger than min_to_fetch, but will never
2950 return smaller unless there are no more results.
2952 Special values can be used for min_batch_size:
2953 ASYNC_ONLY - Do not perform any synchrounous fetches from the datastore
2954 even if the this produces a batch with no results.
2955 AT_LEAST_OFFSET - Only pull enough results to satifiy the offset.
2956 AT_LEAST_ONE - Pull batches until at least one result is returned.
2959 min_batch_size: The minimum number of results to retrieve or one of
2960 (ASYNC_ONLY, AT_LEAST_OFFSET, AT_LEAST_ONE)
2963 The next Batch of results.
2965 if min_batch_size
in (Batcher
.ASYNC_ONLY
, Batcher
.AT_LEAST_OFFSET
,
2966 Batcher
.AT_LEAST_ONE
):
2970 datastore_types
.ValidateInteger(min_batch_size
,
2972 datastore_errors
.BadArgumentError
)
2973 if not self
.__next
_batch
:
2977 batch
= self
.__next
_batch
.get_result()
2978 self
.__next
_batch
= None
2979 self
.__skipped
_results
+= batch
.skipped_results
2981 if min_batch_size
is not Batcher
.ASYNC_ONLY
:
2982 if min_batch_size
is Batcher
.AT_LEAST_ONE
:
2985 needed_results
= min_batch_size
- len(batch
.results
)
2986 while (batch
.more_results
and
2987 (self
.__skipped
_results
< self
.__initial
_offset
or
2988 needed_results
> 0)):
2989 if batch
.query_options
.batch_size
:
2991 batch_size
= max(batch
.query_options
.batch_size
, needed_results
)
2993 batch_size
= needed_results
2997 self
.__next
_batch
= batch
.next_batch_async(FetchOptions(
2998 offset
=max(0, self
.__initial
_offset
- self
.__skipped
_results
),
2999 batch_size
=batch_size
))
3000 next_batch
= self
.__next
_batch
.get_result()
3001 self
.__next
_batch
= None
3002 self
.__skipped
_results
+= next_batch
.skipped_results
3003 needed_results
= max(0, needed_results
- len(next_batch
.results
))
3004 batch
._extend
(next_batch
)
3011 self
.__next
_batch
= batch
.next_batch_async()
3014 def __getstate__(self
):
3015 raise pickle
.PicklingError(
3016 'Pickling of datastore_query.Batcher is unsupported.')
3022 class ResultsIterator(object):
3023 """An iterator over the results from Batches obtained from a Batcher.
3025 ResultsIterator implements Python's iterator protocol, so results can be
3026 accessed with the for-statement:
3028 > it = ResultsIterator(Query(kind='Person').run())
3030 > print 'Hi, %s!' % person['name']
3032 At any time ResultsIterator.cursor() can be used to grab the Cursor that
3033 points just after the last result returned by the iterator.
3036 __current_batch
= None
3038 __last_cursor
= None
3040 def __init__(self
, batcher
):
3044 batcher: A datastore_query.Batcher
3046 if not isinstance(batcher
, Batcher
):
3047 raise datastore_errors
.BadArgumentError(
3048 'batcher argument should be datastore_query.Batcher (%r)' %
3050 self
.__batcher
= batcher
3052 def index_list(self
):
3053 """Returns the list of indexes used to perform the query.
3054 Possibly None when the adapter does not implement pb_to_index.
3056 return self
._ensure
_current
_batch
().index_list
3059 """Returns a cursor that points just after the last result returned.
3061 If next() throws an exception, this function returns the end_cursor from
3062 the last successful batch or throws the same exception if no batch was
3065 return (self
.__last
_cursor
or
3066 self
._ensure
_current
_batch
().cursor(self
.__current
_pos
))
3068 def _ensure_current_batch(self
):
3069 if not self
.__current
_batch
:
3070 self
.__current
_batch
= self
.__batcher
.next_batch(Batcher
.AT_LEAST_OFFSET
)
3071 self
.__current
_pos
= 0
3072 return self
.__current
_batch
3074 def _compiled_query(self
):
3075 """Returns the compiled query associated with the iterator.
3077 Internal only do not use.
3079 return self
._ensure
_current
_batch
()._compiled
_query
()
3083 """Returns the next query result."""
3084 while (not self
.__current
_batch
or
3085 self
.__current
_pos
>= len(self
.__current
_batch
.results
)):
3091 next_batch
= self
.__batcher
.next_batch(Batcher
.AT_LEAST_OFFSET
)
3094 if self
.__current
_batch
:
3095 self
.__last
_cursor
= self
.__current
_batch
.end_cursor
3097 self
.__current
_pos
= 0
3098 self
.__current
_batch
= next_batch
3100 result
= self
.__current
_batch
.results
[self
.__current
_pos
]
3101 self
.__current
_pos
+= 1