1 """Higher-level Query wrapper.
3 There are perhaps too many query APIs in the world.
5 The fundamental API here overloads the 6 comparisons operators to
6 represent filters on property values, and supports AND and OR
7 operations (implemented as functions -- Python's 'and' and 'or'
8 operators cannot be overloaded, and the '&' and '|' operators have a
9 priority that conflicts with the priority of comparison operators).
12 class Employee(Model):
13 name = StringProperty()
14 age = IntegerProperty()
15 rank = IntegerProperty()
18 def demographic(cls, min_age, max_age):
19 return cls.query().filter(AND(cls.age >= min_age, cls.age <= max_age))
22 def ranked(cls, rank):
23 return cls.query(cls.rank == rank).order(cls.age)
25 for emp in Employee.seniors(42, 5):
26 print emp.name, emp.age, emp.rank
28 The 'in' operator cannot be overloaded, but is supported through the
29 IN() method. For example:
31 Employee.query().filter(Employee.rank.IN([4, 5, 6]))
33 Sort orders are supported through the order() method; unary minus is
34 overloaded on the Property class to represent a descending order:
36 Employee.query().order(Employee.name, -Employee.age)
38 Besides using AND() and OR(), filters can also be combined by
39 repeatedly calling .filter():
41 q1 = Employee.query() # A query that returns all employees
42 q2 = q1.filter(Employee.age >= 30) # Only those over 30
43 q3 = q2.filter(Employee.age < 40) # Only those in their 30s
45 A further shortcut is calling .filter() with multiple arguments; this
48 q1 = Employee.query() # A query that returns all employees
49 q3 = q1.filter(Employee.age >= 30,
50 Employee.age < 40) # Only those in their 30s
52 And finally you can also pass one or more filter expressions directly
53 to the .query() method:
55 q3 = Employee.query(Employee.age >= 30,
56 Employee.age < 40) # Only those in their 30s
58 Query objects are immutable, so these methods always return a new
59 Query object; the above calls to filter() do not affect q1. (On the
60 other hand, operations that are effectively no-ops may return the
61 original Query object.)
63 Sort orders can also be combined this way, and .filter() and .order()
64 calls may be intermixed:
66 q4 = q3.order(-Employee.age)
67 q5 = q4.order(Employee.name)
68 q6 = q5.filter(Employee.rank == 5)
70 Again, multiple .order() calls can be combined:
72 q5 = q3.order(-Employee.age, Employee.name)
74 The simplest way to retrieve Query results is a for-loop:
77 print emp.name, emp.age
79 Some other methods to run a query and access its results:
81 q.iter() # Return an iterator; same as iter(q) but more flexible
82 q.map(callback) # Call the callback function for each query result
83 q.fetch(N) # Return a list of the first N results
84 q.get() # Return the first result
85 q.count(N) # Return the number of results, with a maximum of N
86 q.fetch_page(N, start_cursor=cursor) # Return (results, cursor, has_more)
88 All of the above methods take a standard set of additional query
89 options, either in the form of keyword arguments such as
90 keys_only=True, or as QueryOptions object passed with
91 options=QueryOptions(...). The most important query options are:
93 keys_only: bool, if set the results are keys instead of entities
94 limit: int, limits the number of results returned
95 offset: int, skips this many results first
96 start_cursor: Cursor, start returning results after this position
97 end_cursor: Cursor, stop returning results after this position
98 batch_size: int, hint for the number of results returned per RPC
99 prefetch_size: int, hint for the number of results in the first RPC
100 produce_cursors: bool, return Cursor objects with the results
102 For additional (obscure) query options and more details on them,
103 including an explanation of Cursors, see datastore_query.py.
105 All of the above methods except for iter() have asynchronous variants
106 as well, which return a Future; to get the operation's ultimate
107 result, yield the Future (when inside a tasklet) or call the Future's
108 get_result() method (outside a tasklet):
110 q.map_async(callback) # Callback may be a task or a plain function
114 q.fetch_page_async(N, start_cursor=cursor)
116 Finally, there's an idiom to efficiently loop over the Query results
117 in a tasklet, properly yielding when appropriate:
120 while (yield it.has_next_async()):
122 print emp.name, emp.age
125 from __future__
import with_statement
126 del with_statement
# No need to export this.
128 __author__
= 'guido@google.com (Guido van Rossum)'
135 from .google_imports
import datastore_errors
136 from .google_imports
import datastore_rpc
137 from .google_imports
import datastore_types
138 from .google_imports
import datastore_query
139 from .google_imports
import namespace_manager
142 from . import context
143 from . import tasklets
146 __all__
= ['Query', 'QueryOptions', 'Cursor', 'QueryIterator',
147 'RepeatedStructuredPropertyPredicate',
148 'AND', 'OR', 'ConjunctionNode', 'DisjunctionNode',
149 'FilterNode', 'PostFilterNode', 'FalseNode', 'Node',
150 'ParameterNode', 'ParameterizedThing', 'Parameter',
151 'ParameterizedFunction', 'gql',
154 # Re-export some useful classes from the lower-level module.
155 Cursor
= datastore_query
.Cursor
157 # Some local renamings.
158 _ASC
= datastore_query
.PropertyOrder
.ASCENDING
159 _DESC
= datastore_query
.PropertyOrder
.DESCENDING
160 _AND
= datastore_query
.CompositeFilter
.AND
161 _KEY
= datastore_types
._KEY
_SPECIAL
_PROPERTY
163 # Table of supported comparison operators.
164 _OPS
= frozenset(['=', '!=', '<', '<=', '>', '>=', 'in'])
166 # Default limit value. (Yes, the datastore uses int32!)
167 _MAX_LIMIT
= 2 ** 31 - 1
170 class QueryOptions(context
.ContextOptions
, datastore_query
.QueryOptions
):
171 """Support both context options and query options (esp. use_cache)."""
174 class RepeatedStructuredPropertyPredicate(datastore_query
.FilterPredicate
):
177 def __init__(self
, match_keys
, pb
, key_prefix
):
178 super(RepeatedStructuredPropertyPredicate
, self
).__init
__()
179 self
.match_keys
= match_keys
181 for key
in match_keys
:
182 if not key
.startswith(key_prefix
):
183 raise ValueError('key %r does not begin with the specified prefix of %s'
185 stripped_keys
.append(key
[len(key_prefix
):])
186 value_map
= datastore_query
._make
_key
_value
_map
(pb
, stripped_keys
)
187 self
.match_values
= tuple(value_map
[key
][0] for key
in stripped_keys
)
189 def _get_prop_names(self
):
190 return frozenset(self
.match_keys
)
192 def _apply(self
, key_value_map
):
193 """Apply the filter to values extracted from an entity.
195 Think of self.match_keys and self.match_values as representing a
196 table with one row. For example:
198 match_keys = ('name', 'age', 'rank')
199 match_values = ('Joe', 24, 5)
201 (Except that in reality, the values are represented by tuples
202 produced by datastore_types.PropertyValueToKeyValue().)
204 represents this table:
206 | name | age | rank |
207 +---------+-------+--------+
210 Think of key_value_map as a table with the same structure but
211 (potentially) many rows. This represents a repeated structured
212 property of a single entity. For example:
214 {'name': ['Joe', 'Jane', 'Dick'],
218 represents this table:
220 | name | age | rank |
221 +---------+-------+--------+
226 We must determine wheter at least one row of the second table
227 exactly matches the first table. We need this class because the
228 datastore, when asked to find an entity with name 'Joe', age 24
229 and rank 5, will include entities that have 'Joe' somewhere in the
230 name column, 24 somewhere in the age column, and 5 somewhere in
231 the rank column, but not all aligned on a single row. Such an
232 entity should not be considered a match.
235 for key
in self
.match_keys
:
236 column
= key_value_map
.get(key
)
237 if not column
: # None, or an empty list.
238 return False # If any column is empty there can be no match.
239 columns
.append(column
)
240 # Use izip to transpose the columns into rows.
241 return self
.match_values
in itertools
.izip(*columns
)
243 # Don't implement _prune()! It would mess up the row correspondence
247 class ParameterizedThing(object):
248 """Base class for Parameter and ParameterizedFunction.
250 This exists purely for isinstance() checks.
253 def __eq__(self
, other
):
254 raise NotImplementedError
256 def __ne__(self
, other
):
257 eq
= self
.__eq
__(other
)
258 if eq
is not NotImplemented:
262 class Parameter(ParameterizedThing
):
263 """Represents a bound variable in a GQL query.
265 Parameter(1) corresponds to a slot labeled ":1" in a GQL query.
266 Parameter('xyz') corresponds to a slot labeled ":xyz".
268 The value must be set (bound) separately by calling .set(value).
271 def __init__(self
, key
):
275 key: The Parameter key, must be either an integer or a string.
277 if not isinstance(key
, (int, long, basestring
)):
278 raise TypeError('Parameter key must be an integer or string, not %s' %
283 return '%s(%r)' % (self
.__class
__.__name
__, self
.__key
)
285 def __eq__(self
, other
):
286 if not isinstance(other
, Parameter
):
287 return NotImplemented
288 return self
.__key
== other
.__key
292 """Retrieve the key."""
295 def resolve(self
, bindings
, used
):
297 if key
not in bindings
:
298 raise datastore_errors
.BadArgumentError(
299 'Parameter :%s is not bound.' % key
)
300 value
= bindings
[key
]
305 class ParameterizedFunction(ParameterizedThing
):
306 """Represents a GQL function with parameterized arguments.
308 For example, ParameterizedFunction('key', [Parameter(1)]) stands for
309 the GQL syntax KEY(:1).
312 def __init__(self
, func
, values
):
313 from .google_imports
import gql
# Late import, to avoid name conflict.
315 self
.__values
= values
316 # NOTE: A horrible hack using GQL private variables so we can
317 # reuse GQL's implementations of its built-in functions.
318 gqli
= gql
.GQL('SELECT * FROM Dummy')
319 gql_method
= gqli
._GQL
__cast
_operators
[func
]
320 self
.__method
= getattr(gqli
, '_GQL' + gql_method
.__name
__)
323 return 'ParameterizedFunction(%r, %r)' % (self
.__func
, self
.__values
)
325 def __eq__(self
, other
):
326 if not isinstance(other
, ParameterizedFunction
):
327 return NotImplemented
328 return (self
.__func
== other
.__func
and
329 self
.__values
== other
.__values
)
339 def is_parameterized(self
):
340 for val
in self
.__values
:
341 if isinstance(val
, Parameter
):
345 def resolve(self
, bindings
, used
):
347 for val
in self
.__values
:
348 if isinstance(val
, Parameter
):
349 val
= val
.resolve(bindings
, used
)
351 result
= self
.__method
(values
)
352 # The gql module returns slightly different types in some cases.
353 if self
.__func
== 'key' and isinstance(result
, datastore_types
.Key
):
354 result
= model
.Key
.from_old_key(result
)
355 elif self
.__func
== 'time' and isinstance(result
, datetime
.datetime
):
356 result
= datetime
.time(result
.hour
, result
.minute
,
357 result
.second
, result
.microsecond
)
358 elif self
.__func
== 'date' and isinstance(result
, datetime
.datetime
):
359 result
= datetime
.date(result
.year
, result
.month
, result
.day
)
364 """Base class for filter expression tree nodes.
366 Tree nodes are considered immutable, even though they can contain
367 Parameter instances, which are not. In particular, two identical
368 trees may be represented by the same Node object in different
374 raise TypeError('Cannot instantiate Node, only a subclass.')
375 return super(Node
, cls
).__new
__(cls
)
377 def __eq__(self
, other
):
378 raise NotImplementedError
380 def __ne__(self
, other
):
381 eq
= self
.__eq
__(other
)
382 if eq
is not NotImplemented:
386 def __unordered(self
, unused_other
):
387 raise TypeError('Nodes cannot be ordered')
388 __le__
= __lt__
= __ge__
= __gt__
= __unordered
390 def _to_filter(self
, post
=False):
391 """Helper to convert to datastore_query.Filter, or None."""
392 raise NotImplementedError
394 def _post_filters(self
):
395 """Helper to extract post-filter Nodes, if any."""
398 def resolve(self
, bindings
, used
):
399 """Return a Node with Parameters replaced by the selected values.
402 bindings: A dict mapping integers and strings to values.
403 used: A dict into which use of use of a binding is recorded.
411 class FalseNode(Node
):
412 """Tree node for an always-failing filter."""
414 def __eq__(self
, other
):
415 if not isinstance(other
, FalseNode
):
416 return NotImplemented
419 def _to_filter(self
, post
=False):
422 # Because there's no point submitting a query that will never
424 raise datastore_errors
.BadQueryError(
425 'Cannot convert FalseNode to predicate')
428 class ParameterNode(Node
):
429 """Tree node for a parameterized filter."""
431 def __new__(cls
, prop
, op
, param
):
432 if not isinstance(prop
, model
.Property
):
433 raise TypeError('Expected a Property, got %r' % (prop
,))
435 raise TypeError('Expected a valid operator, got %r' % (op
,))
436 if not isinstance(param
, ParameterizedThing
):
437 raise TypeError('Expected a ParameterizedThing, got %r' % (param
,))
438 obj
= super(ParameterNode
, cls
).__new
__(cls
)
445 return 'ParameterNode(%r, %r, %r)' % (self
.__prop
, self
.__op
, self
.__param
)
447 def __eq__(self
, other
):
448 if not isinstance(other
, ParameterNode
):
449 return NotImplemented
450 return (self
.__prop
._name
== other
.__prop
._name
and
451 self
.__op
== other
.__op
and
452 self
.__param
== other
.__param
)
454 def _to_filter(self
, post
=False):
455 raise datastore_errors
.BadArgumentError(
456 'Parameter :%s is not bound.' % (self
.__param
.key
,))
458 def resolve(self
, bindings
, used
):
459 value
= self
.__param
.resolve(bindings
, used
)
460 if self
.__op
== 'in':
461 return self
.__prop
._IN
(value
)
463 return self
.__prop
._comparison
(self
.__op
, value
)
466 class FilterNode(Node
):
467 """Tree node for a single filter expression."""
469 def __new__(cls
, name
, opsymbol
, value
):
470 if isinstance(value
, model
.Key
):
471 value
= value
.to_old_key()
473 n1
= FilterNode(name
, '<', value
)
474 n2
= FilterNode(name
, '>', value
)
475 return DisjunctionNode(n1
, n2
)
477 if not isinstance(value
, (list, tuple, set, frozenset)):
478 raise TypeError('in expected a list, tuple or set of values; '
479 'received %r' % value
)
480 nodes
= [FilterNode(name
, '=', v
) for v
in value
]
485 return DisjunctionNode(*nodes
)
486 self
= super(FilterNode
, cls
).__new
__(cls
)
488 self
.__opsymbol
= opsymbol
493 return '%s(%r, %r, %r)' % (self
.__class
__.__name
__,
494 self
.__name
, self
.__opsymbol
, self
.__value
)
496 def __eq__(self
, other
):
497 if not isinstance(other
, FilterNode
):
498 return NotImplemented
499 # TODO: Should nodes with values that compare equal but have
500 # different types really be considered equal? IIUC the datastore
501 # doesn't consider 1 equal to 1.0 when it compares property values.
502 return (self
.__name
== other
.__name
and
503 self
.__opsymbol
== other
.__opsymbol
and
504 self
.__value
== other
.__value
)
506 def _to_filter(self
, post
=False):
509 if self
.__opsymbol
in ('!=', 'in'):
510 raise NotImplementedError('Inequality filters are not single filter '
511 'expressions and therefore cannot be converted '
512 'to a single filter (%r)' % self
.__opsymbol
)
514 return datastore_query
.make_filter(self
.__name
.decode('utf-8'),
515 self
.__opsymbol
, value
)
518 class PostFilterNode(Node
):
519 """Tree node representing an in-memory filtering operation.
521 This is used to represent filters that cannot be executed by the
522 datastore, for example a query for a structured value.
525 def __new__(cls
, predicate
):
526 self
= super(PostFilterNode
, cls
).__new
__(cls
)
527 self
.predicate
= predicate
531 return '%s(%s)' % (self
.__class
__.__name
__, self
.predicate
)
533 def __eq__(self
, other
):
534 if not isinstance(other
, PostFilterNode
):
535 return NotImplemented
538 def _to_filter(self
, post
=False):
540 return self
.predicate
545 class ConjunctionNode(Node
):
546 """Tree node representing a Boolean AND operator on two or more nodes."""
548 def __new__(cls
, *nodes
):
550 raise TypeError('ConjunctionNode() requires at least one node.')
551 elif len(nodes
) == 1:
553 clauses
= [[]] # Outer: Disjunction; inner: Conjunction.
554 # TODO: Remove duplicates?
556 if not isinstance(node
, Node
):
557 raise TypeError('ConjunctionNode() expects Node instances as arguments;'
558 ' received a non-Node instance %r' % node
)
559 if isinstance(node
, DisjunctionNode
):
560 # Apply the distributive law: (X or Y) and (A or B) becomes
561 # (X and A) or (X and B) or (Y and A) or (Y and B).
563 for clause
in clauses
:
565 new_clause
= clause
+ [subnode
]
566 new_clauses
.append(new_clause
)
567 clauses
= new_clauses
568 elif isinstance(node
, ConjunctionNode
):
569 # Apply half of the distributive law: (X or Y) and A becomes
570 # (X and A) or (Y and A).
571 for clause
in clauses
:
572 clause
.extend(node
.__nodes
)
575 for clause
in clauses
:
580 return DisjunctionNode(*[ConjunctionNode(*clause
) for clause
in clauses
])
581 self
= super(ConjunctionNode
, cls
).__new
__(cls
)
582 self
.__nodes
= clauses
[0]
586 return iter(self
.__nodes
)
589 return 'AND(%s)' % (', '.join(map(str, self
.__nodes
)))
591 def __eq__(self
, other
):
592 if not isinstance(other
, ConjunctionNode
):
593 return NotImplemented
594 return self
.__nodes
== other
.__nodes
596 def _to_filter(self
, post
=False):
597 filters
= filter(None,
598 (node
._to
_filter
(post
=post
)
599 for node
in self
.__nodes
600 if isinstance(node
, PostFilterNode
) == post
))
603 if len(filters
) == 1:
605 return datastore_query
.CompositeFilter(_AND
, filters
)
607 def _post_filters(self
):
608 post_filters
= [node
for node
in self
.__nodes
609 if isinstance(node
, PostFilterNode
)]
612 if len(post_filters
) == 1:
613 return post_filters
[0]
614 if post_filters
== self
.__nodes
:
616 return ConjunctionNode(*post_filters
)
618 def resolve(self
, bindings
, used
):
619 nodes
= [node
.resolve(bindings
, used
) for node
in self
.__nodes
]
620 if nodes
== self
.__nodes
:
622 return ConjunctionNode(*nodes
)
625 class DisjunctionNode(Node
):
626 """Tree node representing a Boolean OR operator on two or more nodes."""
628 def __new__(cls
, *nodes
):
630 raise TypeError('DisjunctionNode() requires at least one node')
631 elif len(nodes
) == 1:
633 self
= super(DisjunctionNode
, cls
).__new
__(cls
)
635 # TODO: Remove duplicates?
637 if not isinstance(node
, Node
):
638 raise TypeError('DisjunctionNode() expects Node instances as arguments;'
639 ' received a non-Node instance %r' % node
)
640 if isinstance(node
, DisjunctionNode
):
641 self
.__nodes
.extend(node
.__nodes
)
643 self
.__nodes
.append(node
)
647 return iter(self
.__nodes
)
650 return 'OR(%s)' % (', '.join(map(str, self
.__nodes
)))
652 def __eq__(self
, other
):
653 if not isinstance(other
, DisjunctionNode
):
654 return NotImplemented
655 return self
.__nodes
== other
.__nodes
657 def resolve(self
, bindings
, used
):
658 nodes
= [node
.resolve(bindings
, used
) for node
in self
.__nodes
]
659 if nodes
== self
.__nodes
:
661 return DisjunctionNode(*nodes
)
664 # AND and OR are preferred aliases for these.
665 AND
= ConjunctionNode
669 def _args_to_val(func
, args
):
670 """Helper for GQL parsing to extract values from GQL expressions.
672 This can extract the value from a GQL literal, return a Parameter
673 for a GQL bound parameter (:1 or :foo), and interprets casts like
674 KEY(...) and plain lists of values like (1, 2, 3).
677 func: A string indicating what kind of thing this is.
678 args: One or more GQL values, each integer, string, or GQL literal.
680 from .google_imports
import gql
# Late import, to avoid name conflict.
683 if isinstance(arg
, (int, long, basestring
)):
685 elif isinstance(arg
, gql
.Literal
):
688 raise TypeError('Unexpected arg (%r)' % arg
)
692 raise TypeError('"nop" requires exactly one value')
693 return vals
[0] # May be a Parameter
694 pfunc
= ParameterizedFunction(func
, vals
)
695 if pfunc
.is_parameterized():
698 return pfunc
.resolve({}, {})
701 def _get_prop_from_modelclass(modelclass
, name
):
702 """Helper for FQL parsing to turn a property name into a property object.
705 modelclass: The model class specified in the query.
706 name: The property name. This may contain dots which indicate
707 sub-properties of structured properties.
713 KeyError if the property doesn't exist and the model clas doesn't
716 if name
== '__key__':
717 return modelclass
._key
719 parts
= name
.split('.')
720 part
, more
= parts
[0], parts
[1:]
721 prop
= modelclass
._properties
.get(part
)
723 if issubclass(modelclass
, model
.Expando
):
724 prop
= model
.GenericProperty(part
)
726 raise TypeError('Model %s has no property named %r' %
727 (modelclass
._get
_kind
(), part
))
731 if not isinstance(prop
, model
.StructuredProperty
):
732 raise TypeError('Model %s has no property named %r' %
733 (modelclass
._get
_kind
(), part
))
734 maybe
= getattr(prop
, part
, None)
735 if isinstance(maybe
, model
.Property
) and maybe
._name
== part
:
738 maybe
= prop
._modelclass
._properties
.get(part
)
739 if maybe
is not None:
740 # Must get it this way to get the copy with the long name.
741 # (See StructuredProperty.__getattr__() for details.)
742 prop
= getattr(prop
, maybe
._code
_name
)
744 if issubclass(prop
._modelclass
, model
.Expando
) and not more
:
745 prop
= model
.GenericProperty()
746 prop
._name
= name
# Bypass the restriction on dots.
748 raise KeyError('Model %s has no property named %r' %
749 (prop
._modelclass
._get
_kind
(), part
))
757 Usually constructed by calling Model.query().
759 See module docstring for examples.
761 Note that not all operations on Queries are supported by _MultiQuery
762 instances; the latter are generated as necessary when any of the
763 operators !=, IN or OR is used.
767 def __init__(self
, kind
=None, ancestor
=None, filters
=None, orders
=None,
768 app
=None, namespace
=None, default_options
=None,
769 projection
=None, group_by
=None):
772 kind: Optional kind string.
773 ancestor: Optional ancestor Key.
774 filters: Optional Node representing a filter expression tree.
775 orders: Optional datastore_query.Order object.
776 app: Optional app id.
777 namespace: Optional namespace.
778 default_options: Optional QueryOptions object.
779 projection: Optional list or tuple of properties to project.
780 group_by: Optional list or tuple of properties to group by.
782 # TODO(arfuller): Accept projection=Model.key to mean keys_only.
783 # TODO(arfuller): Consider adding incremental function
784 # group_by_property(*args) and project(*args, distinct=False).
787 if ancestor
is not None:
788 if isinstance(ancestor
, ParameterizedThing
):
789 if isinstance(ancestor
, ParameterizedFunction
):
790 if ancestor
.func
!= 'key':
791 raise TypeError('ancestor cannot be a GQL function other than KEY')
793 if not isinstance(ancestor
, model
.Key
):
794 raise TypeError('ancestor must be a Key; received %r' % (ancestor
,))
795 if not ancestor
.id():
796 raise ValueError('ancestor cannot be an incomplete key')
798 if app
!= ancestor
.app():
799 raise TypeError('app/ancestor mismatch')
800 if namespace
is None:
801 namespace
= ancestor
.namespace()
803 if namespace
!= ancestor
.namespace():
804 raise TypeError('namespace/ancestor mismatch')
805 if filters
is not None:
806 if not isinstance(filters
, Node
):
807 raise TypeError('filters must be a query Node or None; received %r' %
809 if orders
is not None:
810 if not isinstance(orders
, datastore_query
.Order
):
811 raise TypeError('orders must be an Order instance or None; received %r'
813 if default_options
is not None:
814 if not isinstance(default_options
, datastore_rpc
.BaseConfiguration
):
815 raise TypeError('default_options must be a Configuration or None; '
816 'received %r' % (default_options
,))
817 if projection
is not None:
818 if default_options
.projection
is not None:
819 raise TypeError('cannot use projection= and '
820 'default_options.projection at the same time')
821 if default_options
.keys_only
is not None:
822 raise TypeError('cannot use projection= and '
823 'default_options.keys_only at the same time')
825 self
.__kind
= kind
# String.
826 self
.__ancestor
= ancestor
# Key.
827 self
.__filters
= filters
# None or Node subclass.
828 self
.__orders
= orders
# None or datastore_query.Order instance.
830 self
.__namespace
= namespace
831 self
.__default
_options
= default_options
833 # Checked late as _check_properties depends on local state.
834 self
.__projection
= None
835 if projection
is not None:
837 raise TypeError('projection argument cannot be empty')
838 if not isinstance(projection
, (tuple, list)):
840 'projection must be a tuple, list or None; received %r' %
842 self
._check
_properties
(self
._to
_property
_names
(projection
))
843 self
.__projection
= tuple(projection
)
845 self
.__group
_by
= None
846 if group_by
is not None:
848 raise TypeError('group_by argument cannot be empty')
849 if not isinstance(group_by
, (tuple, list)):
851 'group_by must be a tuple, list or None; received %r' % (group_by
,))
852 self
._check
_properties
(self
._to
_property
_names
(group_by
))
853 self
.__group
_by
= tuple(group_by
)
857 if self
.app
is not None:
858 args
.append('app=%r' % self
.app
)
859 if (self
.namespace
is not None and
860 self
.namespace
!= namespace_manager
.get_namespace()):
861 # Only show the namespace if set and not the current namespace.
862 # (This is similar to what Key.__repr__() does.)
863 args
.append('namespace=%r' % self
.namespace
)
864 if self
.kind
is not None:
865 args
.append('kind=%r' % self
.kind
)
866 if self
.ancestor
is not None:
867 args
.append('ancestor=%r' % self
.ancestor
)
868 if self
.filters
is not None:
869 args
.append('filters=%r' % self
.filters
)
870 if self
.orders
is not None:
871 # TODO: Format orders better.
872 args
.append('orders=...') # PropertyOrder doesn't have a good repr().
874 args
.append('projection=%r' % (self
._to
_property
_names
(self
.projection
)))
876 args
.append('group_by=%r' % (self
._to
_property
_names
(self
.group_by
)))
877 if self
.default_options
is not None:
878 args
.append('default_options=%r' % self
.default_options
)
879 return '%s(%s)' % (self
.__class
__.__name
__, ', '.join(args
))
881 def _fix_namespace(self
):
882 """Internal helper to fix the namespace.
884 This is called to ensure that for queries without an explicit
885 namespace, the namespace used by async calls is the one in effect
886 at the time the async call is made, not the one in effect when the
887 the request is actually generated.
889 if self
.namespace
is not None:
891 namespace
= namespace_manager
.get_namespace()
892 return self
.__class
__(kind
=self
.kind
, ancestor
=self
.ancestor
,
893 filters
=self
.filters
, orders
=self
.orders
,
894 app
=self
.app
, namespace
=namespace
,
895 default_options
=self
.default_options
,
896 projection
=self
.projection
, group_by
=self
.group_by
)
898 def _get_query(self
, connection
):
899 self
.bind() # Raises an exception if there are unbound parameters.
901 ancestor
= self
.ancestor
902 if ancestor
is not None:
903 ancestor
= connection
.adapter
.key_to_pb(ancestor
)
904 filters
= self
.filters
906 if filters
is not None:
907 post_filters
= filters
._post
_filters
()
908 filters
= filters
._to
_filter
()
911 group_by
= self
._to
_property
_names
(self
.group_by
)
912 dsquery
= datastore_query
.Query(app
=self
.app
,
913 namespace
=self
.namespace
,
914 kind
=kind
.decode('utf-8') if kind
else None,
916 filter_predicate
=filters
,
919 if post_filters
is not None:
920 dsquery
= datastore_query
._AugmentedQuery
(
922 in_memory_filter
=post_filters
._to
_filter
(post
=True))
926 def run_to_queue(self
, queue
, conn
, options
=None, dsquery
=None):
927 """Run this query, putting entities into the given queue."""
929 multiquery
= self
._maybe
_multi
_query
()
930 if multiquery
is not None:
931 yield multiquery
.run_to_queue(queue
, conn
, options
=options
)
935 dsquery
= self
._get
_query
(conn
)
936 rpc
= dsquery
.run_async(conn
, options
)
937 while rpc
is not None:
939 rpc
= batch
.next_batch_async(options
)
940 for i
, result
in enumerate(batch
.results
):
941 queue
.putq((batch
, i
, result
))
944 except GeneratorExit
:
948 _
, e
, tb
= sys
.exc_info()
949 queue
.set_exception(e
, tb
)
953 def _run_to_list(self
, results
, options
=None):
954 # Internal version of run_to_queue(), without a queue.
955 ctx
= tasklets
.get_context()
957 dsquery
= self
._get
_query
(conn
)
958 rpc
= dsquery
.run_async(conn
, options
)
959 while rpc
is not None:
961 if (batch
.skipped_results
and
962 datastore_query
.FetchOptions
.offset(options
)):
963 offset
= options
.offset
- batch
.skipped_results
964 options
= datastore_query
.FetchOptions(offset
=offset
, config
=options
)
965 rpc
= batch
.next_batch_async(options
)
966 for result
in batch
.results
:
967 result
= ctx
._update
_cache
_from
_query
_result
(result
, options
)
968 if result
is not None:
969 results
.append(result
)
971 raise tasklets
.Return(results
)
973 def _needs_multi_query(self
):
974 filters
= self
.filters
975 return filters
is not None and isinstance(filters
, DisjunctionNode
)
977 def _maybe_multi_query(self
):
978 if not self
._needs
_multi
_query
():
980 # Switch to a _MultiQuery.
981 filters
= self
.filters
983 for subfilter
in filters
:
984 subquery
= self
.__class
__(kind
=self
.kind
, ancestor
=self
.ancestor
,
985 filters
=subfilter
, orders
=self
.orders
,
986 app
=self
.app
, namespace
=self
.namespace
,
987 default_options
=self
.default_options
,
988 projection
=self
.projection
,
989 group_by
=self
.group_by
)
990 subqueries
.append(subquery
)
991 return _MultiQuery(subqueries
)
995 """Accessor for the kind (a string or None)."""
1000 """Accessor for the ancestor (a Key or None)."""
1001 return self
.__ancestor
1005 """Accessor for the filters (a Node or None)."""
1006 return self
.__filters
1010 """Accessor for the filters (a datastore_query.Order or None)."""
1011 return self
.__orders
1015 """Accessor for the app (a string or None)."""
1019 def namespace(self
):
1020 """Accessor for the namespace (a string or None)."""
1021 return self
.__namespace
1024 def default_options(self
):
1025 """Accessor for the default_options (a QueryOptions instance or None)."""
1026 return self
.__default
_options
1030 """Accessor for the group by properties (a tuple instance or None)."""
1031 return self
.__group
_by
1034 def projection(self
):
1035 """Accessor for the projected properties (a tuple instance or None)."""
1036 return self
.__projection
1039 def is_distinct(self
):
1040 """True if results are guaranteed to contain a unique set of property
1043 This happens when every property in the group_by is also in the projection.
1045 return bool(self
.__group
_by
and
1046 set(self
._to
_property
_names
(self
.__group
_by
)) <=
1047 set(self
._to
_property
_names
(self
.__projection
)))
1049 def filter(self
, *args
):
1050 """Return a new Query with additional filter(s) applied."""
1058 if not isinstance(arg
, Node
):
1059 raise TypeError('Cannot filter a non-Node argument; received %r' % arg
)
1063 elif len(preds
) == 1:
1066 pred
= ConjunctionNode(*preds
)
1067 return self
.__class
__(kind
=self
.kind
, ancestor
=self
.ancestor
,
1068 filters
=pred
, orders
=self
.orders
,
1069 app
=self
.app
, namespace
=self
.namespace
,
1070 default_options
=self
.default_options
,
1071 projection
=self
.projection
, group_by
=self
.group_by
)
1073 def order(self
, *args
):
1074 """Return a new Query with additional sort order(s) applied."""
1075 # q.order(Employee.name, -Employee.age)
1083 if isinstance(arg
, model
.Property
):
1084 orders
.append(datastore_query
.PropertyOrder(arg
._name
, _ASC
))
1085 elif isinstance(arg
, datastore_query
.Order
):
1088 raise TypeError('order() expects a Property or query Order; '
1089 'received %r' % arg
)
1092 elif len(orders
) == 1:
1095 orders
= datastore_query
.CompositeOrder(orders
)
1096 return self
.__class
__(kind
=self
.kind
, ancestor
=self
.ancestor
,
1097 filters
=self
.filters
, orders
=orders
,
1098 app
=self
.app
, namespace
=self
.namespace
,
1099 default_options
=self
.default_options
,
1100 projection
=self
.projection
, group_by
=self
.group_by
)
1102 # Datastore API using the default context.
1104 def iter(self
, **q_options
):
1105 """Construct an iterator over the query.
1108 **q_options: All query options keyword arguments are supported.
1111 A QueryIterator object.
1113 self
.bind() # Raises an exception if there are unbound parameters.
1114 return QueryIterator(self
, **q_options
)
1118 @utils.positional(2)
1119 def map(self
, callback
, pass_batch_into_callback
=None,
1120 merge_future
=None, **q_options
):
1121 """Map a callback function or tasklet over the query results.
1124 callback: A function or tasklet to be applied to each result; see below.
1125 merge_future: Optional Future subclass; see below.
1126 **q_options: All query options keyword arguments are supported.
1128 Callback signature: The callback is normally called with an entity
1129 as argument. However if keys_only=True is given, it is called
1130 with a Key. Also, when pass_batch_into_callback is True, it is
1131 called with three arguments: the current batch, the index within
1132 the batch, and the entity or Key at that index. The callback can
1133 return whatever it wants. If the callback is None, a trivial
1134 callback is assumed that just returns the entity or key passed in
1135 (ignoring produce_cursors).
1137 Optional merge future: The merge_future is an advanced argument
1138 that can be used to override how the callback results are combined
1139 into the overall map() return value. By default a list of
1140 callback return values is produced. By substituting one of a
1141 small number of specialized alternatives you can arrange
1142 otherwise. See tasklets.MultiFuture for the default
1143 implementation and a description of the protocol the merge_future
1144 object must implement the default. Alternatives from the same
1145 module include QueueFuture, SerialQueueFuture and ReducingFuture.
1148 When the query has run to completion and all callbacks have
1149 returned, map() returns a list of the results of all callbacks.
1150 (But see 'optional merge future' above.)
1152 return self
.map_async(callback
,
1153 pass_batch_into_callback
=pass_batch_into_callback
,
1154 merge_future
=merge_future
,
1155 **q_options
).get_result()
1157 @utils.positional(2)
1158 def map_async(self
, callback
, pass_batch_into_callback
=None,
1159 merge_future
=None, **q_options
):
1160 """Map a callback function or tasklet over the query results.
1162 This is the asynchronous version of Query.map().
1164 qry
= self
._fix
_namespace
()
1165 return tasklets
.get_context().map_query(
1168 pass_batch_into_callback
=pass_batch_into_callback
,
1169 options
=self
._make
_options
(q_options
),
1170 merge_future
=merge_future
)
1172 @utils.positional(2)
1173 def fetch(self
, limit
=None, **q_options
):
1174 """Fetch a list of query results, up to a limit.
1177 limit: How many results to retrieve at most.
1178 **q_options: All query options keyword arguments are supported.
1183 return self
.fetch_async(limit
, **q_options
).get_result()
1185 @utils.positional(2)
1186 def fetch_async(self
, limit
=None, **q_options
):
1187 """Fetch a list of query results, up to a limit.
1189 This is the asynchronous version of Query.fetch().
1192 default_options
= self
._make
_options
(q_options
)
1193 if default_options
is not None and default_options
.limit
is not None:
1194 limit
= default_options
.limit
1197 q_options
['limit'] = limit
1198 q_options
.setdefault('batch_size', limit
)
1199 if self
._needs
_multi
_query
():
1200 return self
.map_async(None, **q_options
)
1201 # Optimization using direct batches.
1202 options
= self
._make
_options
(q_options
)
1203 qry
= self
._fix
_namespace
()
1204 return qry
._run
_to
_list
([], options
=options
)
1206 def get(self
, **q_options
):
1207 """Get the first query result, if any.
1209 This is similar to calling q.fetch(1) and returning the first item
1210 of the list of results, if any, otherwise None.
1213 **q_options: All query options keyword arguments are supported.
1216 A single result, or None if there are no results.
1218 return self
.get_async(**q_options
).get_result()
1220 def get_async(self
, **q_options
):
1221 """Get the first query result, if any.
1223 This is the asynchronous version of Query.get().
1225 qry
= self
._fix
_namespace
()
1226 return qry
._get
_async
(**q_options
)
1229 def _get_async(self
, **q_options
):
1230 """Internal version of get_async()."""
1231 res
= yield self
.fetch_async(1, **q_options
)
1233 raise tasklets
.Return(None)
1234 raise tasklets
.Return(res
[0])
1236 @utils.positional(2)
1237 def count(self
, limit
=None, **q_options
):
1238 """Count the number of query results, up to a limit.
1240 This returns the same result as len(q.fetch(limit)) but more
1243 Note that you must pass a maximum value to limit the amount of
1244 work done by the query.
1247 limit: How many results to count at most.
1248 **q_options: All query options keyword arguments are supported.
1252 return self
.count_async(limit
, **q_options
).get_result()
1254 @utils.positional(2)
1255 def count_async(self
, limit
=None, **q_options
):
1256 """Count the number of query results, up to a limit.
1258 This is the asynchronous version of Query.count().
1260 qry
= self
._fix
_namespace
()
1261 return qry
._count
_async
(limit
=limit
, **q_options
)
1264 def _count_async(self
, limit
=None, **q_options
):
1265 """Internal version of count_async()."""
1266 # TODO: Support offset by incorporating it to the limit.
1267 if 'offset' in q_options
:
1268 raise NotImplementedError('.count() and .count_async() do not support '
1269 'offsets at present.')
1270 if 'limit' in q_options
:
1271 raise TypeError('Cannot specify limit as a non-keyword argument and as a '
1272 'keyword argument simultaneously.')
1275 if self
._needs
_multi
_query
():
1276 # _MultiQuery does not support iterating over result batches,
1277 # so just fetch results and count them.
1278 # TODO: Use QueryIterator to avoid materializing the results list.
1279 q_options
.setdefault('batch_size', limit
)
1280 q_options
.setdefault('keys_only', True)
1281 results
= yield self
.fetch_async(limit
, **q_options
)
1282 raise tasklets
.Return(len(results
))
1284 # Issue a special query requesting 0 results at a given offset.
1285 # The skipped_results count will tell us how many hits there were
1286 # before that offset without fetching the items.
1287 q_options
['offset'] = limit
1288 q_options
['limit'] = 0
1289 options
= self
._make
_options
(q_options
)
1290 conn
= tasklets
.get_context()._conn
1291 dsquery
= self
._get
_query
(conn
)
1292 rpc
= dsquery
.run_async(conn
, options
)
1294 while rpc
is not None:
1296 rpc
= batch
.next_batch_async(options
)
1297 total
+= batch
.skipped_results
1298 raise tasklets
.Return(total
)
1300 @utils.positional(2)
1301 def fetch_page(self
, page_size
, **q_options
):
1302 """Fetch a page of results.
1304 This is a specialized method for use by paging user interfaces.
1307 page_size: The requested page size. At most this many results
1310 In addition, any keyword argument supported by the QueryOptions
1311 class is supported. In particular, to fetch the next page, you
1312 pass the cursor returned by one call to the next call using
1313 start_cursor=<cursor>. A common idiom is to pass the cursor to
1314 the client using <cursor>.to_websafe_string() and to reconstruct
1315 that cursor on a subsequent request using
1316 Cursor.from_websafe_string(<string>).
1319 A tuple (results, cursor, more) where results is a list of query
1320 results, cursor is a cursor pointing just after the last result
1321 returned, and more is a bool indicating whether there are
1322 (likely) more results after that.
1324 # NOTE: page_size can't be passed as a keyword.
1325 return self
.fetch_page_async(page_size
, **q_options
).get_result()
1327 @utils.positional(2)
1328 def fetch_page_async(self
, page_size
, **q_options
):
1329 """Fetch a page of results.
1331 This is the asynchronous version of Query.fetch_page().
1333 qry
= self
._fix
_namespace
()
1334 return qry
._fetch
_page
_async
(page_size
, **q_options
)
1337 def _fetch_page_async(self
, page_size
, **q_options
):
1338 """Internal version of fetch_page_async()."""
1339 q_options
.setdefault('batch_size', page_size
)
1340 q_options
.setdefault('produce_cursors', True)
1341 it
= self
.iter(limit
=page_size
+ 1, **q_options
)
1343 while (yield it
.has_next_async()):
1344 results
.append(it
.next())
1345 if len(results
) >= page_size
:
1348 cursor
= it
.cursor_after()
1349 except datastore_errors
.BadArgumentError
:
1351 raise tasklets
.Return(results
, cursor
, it
.probably_has_next())
1353 def _make_options(self
, q_options
):
1354 """Helper to construct a QueryOptions object from keyword arguents.
1357 q_options: a dict of keyword arguments.
1359 Note that either 'options' or 'config' can be used to pass another
1360 QueryOptions object, but not both. If another QueryOptions object is
1361 given it provides default values.
1363 If self.default_options is set, it is used to provide defaults,
1364 which have a lower precedence than options set in q_options.
1367 A QueryOptions object, or None if q_options is empty.
1370 return self
.default_options
1371 if 'options' in q_options
:
1372 # Move 'options' to 'config' since that is what QueryOptions() uses.
1373 if 'config' in q_options
:
1374 raise TypeError('You cannot use config= and options= at the same time')
1375 q_options
['config'] = q_options
.pop('options')
1376 if q_options
.get('projection'):
1378 q_options
['projection'] = self
._to
_property
_names
(
1379 q_options
['projection'])
1380 except TypeError, e
:
1381 raise datastore_errors
.BadArgumentError(e
)
1382 self
._check
_properties
(q_options
['projection'])
1383 options
= QueryOptions(**q_options
)
1385 # Populate projection if it hasn't been overridden.
1386 if (options
.keys_only
is None and
1387 options
.projection
is None and
1389 options
= QueryOptions(
1390 projection
=self
._to
_property
_names
(self
.__projection
), config
=options
)
1391 # Populate default options
1392 if self
.default_options
is not None:
1393 options
= self
.default_options
.merge(options
)
1397 def _to_property_names(self
, properties
):
1398 if not isinstance(properties
, (list, tuple)):
1399 properties
= [properties
] # It will be type-checked below.
1401 for proj
in properties
:
1402 if isinstance(proj
, basestring
):
1404 elif isinstance(proj
, model
.Property
):
1405 fixed
.append(proj
._name
)
1408 'Unexpected property (%r); should be string or Property' % (proj
,))
1411 def _check_properties(self
, fixed
, **kwargs
):
1412 modelclass
= model
.Model
._kind
_map
.get(self
.__kind
)
1413 if modelclass
is not None:
1414 modelclass
._check
_properties
(fixed
, **kwargs
)
1417 """Return a list giving the parameters required by a query."""
1418 class MockBindings(dict):
1419 def __contains__(self
, key
):
1422 bindings
= MockBindings()
1424 ancestor
= self
.ancestor
1425 if isinstance(ancestor
, ParameterizedThing
):
1426 ancestor
= ancestor
.resolve(bindings
, used
)
1427 filters
= self
.filters
1428 if filters
is not None:
1429 filters
= filters
.resolve(bindings
, used
)
1430 return sorted(used
) # Returns only the keys.
1432 def bind(self
, *args
, **kwds
):
1433 """Bind parameter values. Returns a new Query object."""
1434 return self
._bind
(args
, kwds
)
1436 def _bind(self
, args
, kwds
):
1437 """Bind parameter values. Returns a new Query object."""
1438 bindings
= dict(kwds
)
1439 for i
, arg
in enumerate(args
):
1440 bindings
[i
+ 1] = arg
1442 ancestor
= self
.ancestor
1443 if isinstance(ancestor
, ParameterizedThing
):
1444 ancestor
= ancestor
.resolve(bindings
, used
)
1445 filters
= self
.filters
1446 if filters
is not None:
1447 filters
= filters
.resolve(bindings
, used
)
1449 for i
in xrange(1, 1 + len(args
)):
1453 raise datastore_errors
.BadArgumentError(
1454 'Positional arguments %s were given but not used.' %
1455 ', '.join(str(i
) for i
in unused
))
1456 return self
.__class
__(kind
=self
.kind
, ancestor
=ancestor
,
1457 filters
=filters
, orders
=self
.orders
,
1458 app
=self
.app
, namespace
=self
.namespace
,
1459 default_options
=self
.default_options
,
1460 projection
=self
.projection
, group_by
=self
.group_by
)
1463 def gql(query_string
, *args
, **kwds
):
1464 """Parse a GQL query string.
1467 query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
1468 *args, **kwds: If present, used to call bind().
1471 An instance of query_class.
1473 qry
= _gql(query_string
)
1475 qry
= qry
._bind
(args
, kwds
)
1479 @utils.positional(1)
1480 def _gql(query_string
, query_class
=Query
):
1481 """Parse a GQL query string (internal version).
1484 query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
1485 query_class: Optional class to use, default Query.
1488 An instance of query_class.
1490 from .google_imports
import gql
# Late import, to avoid name conflict.
1491 gql_qry
= gql
.GQL(query_string
)
1492 kind
= gql_qry
.kind()
1494 # The query must be lacking a "FROM <kind>" class. Let Expando
1495 # stand in for the model class (it won't actually be used to
1496 # construct the results).
1497 modelclass
= model
.Expando
1499 modelclass
= model
.Model
._kind
_map
.get(kind
)
1500 if modelclass
is None:
1501 # If the Adapter has a default model, use it; raise KindError otherwise.
1502 ctx
= tasklets
.get_context()
1503 modelclass
= ctx
._conn
.adapter
.default_model
1504 if modelclass
is None:
1505 raise model
.KindError(
1506 "No model class found for kind %r. Did you forget to import it?" %
1509 # Adjust kind to the model class's kind (for PolyModel).
1510 kind
= modelclass
._get
_kind
()
1512 flt
= gql_qry
.filters()
1513 filters
= list(modelclass
._default
_filters
())
1514 for name_op
in sorted(flt
):
1516 values
= flt
[name_op
]
1518 if op
== 'is' and name
== gql
.GQL
._GQL
__ANCESTOR
:
1519 if len(values
) != 1:
1520 raise ValueError('"is" requires exactly one value')
1521 [(func
, args
)] = values
1522 ancestor
= _args_to_val(func
, args
)
1525 raise NotImplementedError('Operation %r is not supported.' % op
)
1526 for (func
, args
) in values
:
1527 val
= _args_to_val(func
, args
)
1528 prop
= _get_prop_from_modelclass(modelclass
, name
)
1529 if prop
._name
!= name
:
1530 raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
1531 'returned a property whose name is %r?!' %
1532 (modelclass
.__name
__, name
, prop
._name
))
1533 if isinstance(val
, ParameterizedThing
):
1534 node
= ParameterNode(prop
, op
, val
)
1536 node
= prop
._IN
(val
)
1538 node
= prop
._comparison
(op
, val
)
1539 filters
.append(node
)
1541 filters
= ConjunctionNode(*filters
)
1544 orders
= _orderings_to_orders(gql_qry
.orderings(), modelclass
)
1545 offset
= gql_qry
.offset()
1546 limit
= gql_qry
.limit()
1549 keys_only
= gql_qry
._keys
_only
1552 options
= QueryOptions(offset
=offset
, limit
=limit
, keys_only
=keys_only
)
1553 projection
= gql_qry
.projection()
1554 if gql_qry
.is_distinct():
1555 group_by
= projection
1558 qry
= query_class(kind
=kind
,
1562 default_options
=options
,
1563 projection
=projection
,
1568 class QueryIterator(object):
1569 """This iterator works both for synchronous and async callers!
1571 For synchronous callers, just use:
1573 for entity in Account.query():
1576 Async callers use this idiom:
1578 it = iter(Account.query())
1579 while (yield it.has_next_async()):
1583 You can also use q.iter([options]) instead of iter(q); this allows
1584 passing query options such as keys_only or produce_cursors.
1586 When keys_only is set, it.next() returns a key instead of an entity.
1588 When produce_cursors is set, the methods it.cursor_before() and
1589 it.cursor_after() return Cursor objects corresponding to the query
1590 position just before and after the item returned by it.next().
1591 Before it.next() is called for the first time, both raise an
1592 exception. Once the loop is exhausted, both return the cursor after
1593 the last item returned. Calling it.has_next() does not affect the
1594 cursors; you must call it.next() before the cursors move. Note that
1595 sometimes requesting a cursor requires a datastore roundtrip (but
1596 not if you happen to request a cursor corresponding to a batch
1597 boundary). If produce_cursors is not set, both methods always raise
1600 Note that queries requiring in-memory merging of multiple queries
1601 (i.e. queries using the IN, != or OR operators) do not support query
1605 # When produce_cursors is set, _lookahead collects (batch, index)
1606 # pairs passed to _extended_callback(), and (_batch, _index)
1607 # contain the info pertaining to the current item.
1612 # Indicate the loop is exhausted.
1615 @utils.positional(2)
1616 def __init__(self
, query
, **q_options
):
1617 """Constructor. Takes a Query and query options.
1619 This is normally called by Query.iter() or Query.__iter__().
1621 ctx
= tasklets
.get_context()
1623 options
= query
._make
_options
(q_options
)
1624 callback
= self
._extended
_callback
1625 self
._iter
= ctx
.iter_query(query
,
1627 pass_batch_into_callback
=True,
1631 def _extended_callback(self
, batch
, index
, ent
):
1633 raise RuntimeError('QueryIterator is already exhausted')
1634 # TODO: Make _lookup a deque.
1635 if self
._lookahead
is None:
1636 self
._lookahead
= []
1637 self
._lookahead
.append((batch
, index
))
1640 def _consume_item(self
):
1642 self
._batch
, self
._index
= self
._lookahead
.pop(0)
1644 self
._batch
= self
._index
= None
1646 def cursor_before(self
):
1647 """Return the cursor before the current item.
1649 You must pass a QueryOptions object with produce_cursors=True
1652 If there is no cursor or no current item, raise BadArgumentError.
1653 Before next() has returned there is no cursor. Once the loop is
1654 exhausted, this returns the cursor after the last item.
1656 if self
._batch
is None:
1657 raise datastore_errors
.BadArgumentError('There is no cursor currently')
1658 # TODO: if cursor_after() was called for the previous item
1659 # reuse that result instead of computing it from scratch.
1660 # (Some cursor() calls make a datastore roundtrip.)
1661 # TODO: reimplement the cursor() call to use NDB async I/O;
1662 # perhaps even add async versions of cursor_before/after.
1663 return self
._batch
.cursor(self
._index
+ self
._exhausted
)
1665 def cursor_after(self
):
1666 """Return the cursor after the current item.
1668 You must pass a QueryOptions object with produce_cursors=True
1671 If there is no cursor or no current item, raise BadArgumentError.
1672 Before next() has returned there is no cursor. Once the loop is
1673 exhausted, this returns the cursor after the last item.
1675 if self
._batch
is None:
1676 raise datastore_errors
.BadArgumentError('There is no cursor currently')
1677 return self
._batch
.cursor(self
._index
+ 1) # TODO: inline this as async.
1679 def index_list(self
):
1680 """Return the list of indexes used for this query.
1682 This returns a list of index representations, where an index
1683 representation is the same as what is returned by get_indexes().
1685 Before the first result, the information is unavailable, and then
1686 None is returned. This is not the same as an empty list -- the
1687 empty list means that no index was used to execute the query. (In
1688 the dev_appserver, an empty list may also mean that only built-in
1689 indexes were used; metadata queries also return an empty list
1692 Proper use is as follows:
1693 q = <modelclass>.query(<filters>)
1697 except Stopiteration:
1699 indexes = i.index_list()
1700 assert isinstance(indexes, list)
1703 - Forcing produce_cursors=False makes this always return None.
1704 - This always returns None for a multi-query.
1706 # TODO: Technically it is possible to implement this for
1707 # multi-query by merging all the index lists from each subquery.
1708 # Return None if the batch has no attribute index_list.
1709 # This also applies when the batch itself is None.
1710 return getattr(self
._batch
, 'index_list', None)
1713 """Iterator protocol: get the iterator for this iterator, i.e. self."""
1716 def probably_has_next(self
):
1717 """Return whether a next item is (probably) available.
1719 This is not quite the same as has_next(), because when
1720 produce_cursors is set, some shortcuts are possible. However, in
1721 some cases (e.g. when the query has a post_filter) we can get a
1722 false positive (returns True but next() will raise StopIteration).
1723 There are no false negatives, if Batch.more_results doesn't lie.
1727 if self
._batch
is not None:
1728 return self
._batch
.more_results
1729 return self
.has_next()
1732 """Return whether a next item is available.
1734 See the module docstring for the usage pattern.
1736 return self
.has_next_async().get_result()
1739 def has_next_async(self
):
1740 """Return a Future whose result will say whether a next item is available.
1742 See the module docstring for the usage pattern.
1744 if self
._fut
is None:
1745 self
._fut
= self
._iter
.getq()
1751 raise tasklets
.Return(flag
)
1754 """Iterator protocol: get next item or raise StopIteration."""
1755 if self
._fut
is None:
1756 self
._fut
= self
._iter
.getq()
1759 ent
= self
._fut
.get_result()
1760 self
._consume
_item
()
1763 self
._exhausted
= True
1769 class _SubQueryIteratorState(object):
1770 """Helper class for _MultiQuery."""
1772 def __init__(self
, batch_i_entity
, iterator
, dsquery
, orders
):
1773 batch
, index
, entity
= batch_i_entity
1776 self
.entity
= entity
1777 self
.iterator
= iterator
1778 self
.dsquery
= dsquery
1779 self
.orders
= orders
1781 def __cmp__(self
, other
):
1782 if not isinstance(other
, _SubQueryIteratorState
):
1783 raise NotImplementedError('Can only compare _SubQueryIteratorState '
1784 'instances to other _SubQueryIteratorState '
1785 'instances; not %r' % other
)
1786 if not self
.orders
== other
.orders
:
1787 raise NotImplementedError('Cannot compare _SubQueryIteratorStates with '
1788 'differing orders (%r != %r)' %
1789 (self
.orders
, other
.orders
))
1790 lhs
= self
.entity
._orig
_pb
1791 rhs
= other
.entity
._orig
_pb
1792 lhs_filter
= self
.dsquery
._filter
_predicate
1793 rhs_filter
= other
.dsquery
._filter
_predicate
1794 names
= self
.orders
._get
_prop
_names
()
1795 # TODO: In some future version, there won't be a need to add the
1797 if lhs_filter
is not None:
1798 names |
= lhs_filter
._get
_prop
_names
()
1799 if rhs_filter
is not None:
1800 names |
= rhs_filter
._get
_prop
_names
()
1801 lhs_value_map
= datastore_query
._make
_key
_value
_map
(lhs
, names
)
1802 rhs_value_map
= datastore_query
._make
_key
_value
_map
(rhs
, names
)
1803 if lhs_filter
is not None:
1804 lhs_filter
._prune
(lhs_value_map
)
1805 if rhs_filter
is not None:
1806 rhs_filter
._prune
(rhs_value_map
)
1807 return self
.orders
._cmp
(lhs_value_map
, rhs_value_map
)
1810 class _MultiQuery(object):
1811 """Helper class to run queries involving !=, IN or OR operators."""
1813 # This is not instantiated by the user directly, but implicitly when
1814 # iterating over a query with at least one filter using an IN, OR or
1815 # != operator. Note that some options must be interpreted by
1816 # _MultiQuery instead of passed to the underlying Queries' methods,
1817 # e.g. offset (though not necessarily limit, and I'm not sure about
1820 # TODO: Need a way to specify the unification of two queries that
1821 # are identical except one has an ancestor and the other doesn't.
1822 # The HR datastore makes that a useful special case.
1824 def __init__(self
, subqueries
):
1825 if not isinstance(subqueries
, list):
1826 raise TypeError('subqueries must be a list; received %r' % subqueries
)
1827 for subq
in subqueries
:
1828 if not isinstance(subq
, Query
):
1829 raise TypeError('Each subquery must be a Query instances; received %r'
1831 first_subquery
= subqueries
[0]
1832 kind
= first_subquery
.kind
1833 orders
= first_subquery
.orders
1835 raise ValueError('Subquery kind cannot be missing')
1836 for subq
in subqueries
[1:]:
1837 if subq
.kind
!= kind
:
1838 raise ValueError('Subqueries must be for a common kind (%s != %s)' %
1840 elif subq
.orders
!= orders
:
1841 raise ValueError('Subqueries must have the same order(s) (%s != %s)' %
1842 (subq
.orders
, orders
))
1843 # TODO: Ensure that app and namespace match, when we support them.
1844 self
.__subqueries
= subqueries
1845 self
.__orders
= orders
1846 self
.ancestor
= None # Hack for map_query().
1848 def _make_options(self
, q_options
):
1849 return self
.__subqueries
[0].default_options
1853 return self
.__orders
1856 def default_options(self
):
1857 return self
.__subqueries
[0].default_options
1860 def run_to_queue(self
, queue
, conn
, options
=None):
1861 """Run this query, putting entities into the given queue."""
1868 # Capture options we need to simulate.
1869 offset
= options
.offset
1870 limit
= options
.limit
1871 keys_only
= options
.keys_only
1873 # Cursors are supported for certain orders only.
1874 if (options
.start_cursor
or options
.end_cursor
or
1875 options
.produce_cursors
):
1877 if self
.__orders
is not None:
1878 names
= self
.__orders
._get
_prop
_names
()
1879 if '__key__' not in names
:
1880 raise datastore_errors
.BadArgumentError(
1881 '_MultiQuery with cursors requires __key__ order')
1883 # Decide if we need to modify the options passed to subqueries.
1884 # NOTE: It would seem we can sometimes let the datastore handle
1885 # the offset natively, but this would thwart the duplicate key
1886 # detection, so we always have to emulate the offset here.
1887 # We can set the limit we pass along to offset + limit though,
1888 # since that is the maximum number of results from a single
1889 # subquery we will ever have to consider.
1892 modifiers
['offset'] = None
1893 if limit
is not None:
1894 modifiers
['limit'] = min(_MAX_LIMIT
, offset
+ limit
)
1895 if keys_only
and self
.__orders
is not None:
1896 modifiers
['keys_only'] = None
1898 options
= QueryOptions(config
=options
, **modifiers
)
1906 if self
.__orders
is None:
1907 # Run the subqueries sequentially; there is no order to keep.
1909 for subq
in self
.__subqueries
:
1912 subit
= tasklets
.SerialQueueFuture('_MultiQuery.run_to_queue[ser]')
1913 subq
.run_to_queue(subit
, conn
, options
=options
)
1916 batch
, index
, result
= yield subit
.getq()
1923 if key
not in keys_seen
:
1929 queue
.putq((None, None, result
))
1933 # This with-statement causes the adapter to set _orig_pb on all
1934 # entities it converts from protobuf.
1935 # TODO: Does this interact properly with the cache?
1937 # Start running all the sub-queries.
1938 todo
= [] # List of (subit, dsquery) tuples.
1939 for subq
in self
.__subqueries
:
1940 dsquery
= subq
._get
_query
(conn
)
1941 subit
= tasklets
.SerialQueueFuture('_MultiQuery.run_to_queue[par]')
1942 subq
.run_to_queue(subit
, conn
, options
=options
, dsquery
=dsquery
)
1943 todo
.append((subit
, dsquery
))
1945 # Create a list of (first-entity, subquery-iterator) tuples.
1946 state
= [] # List of _SubQueryIteratorState instances.
1947 for subit
, dsquery
in todo
:
1949 thing
= yield subit
.getq()
1953 state
.append(_SubQueryIteratorState(thing
, subit
, dsquery
,
1956 # Now turn it into a sorted heap. The heapq module claims that
1957 # calling heapify() is more efficient than calling heappush() for
1959 heapq
.heapify(state
)
1961 # Repeatedly yield the lowest entity from the state vector,
1962 # filtering duplicates. This is essentially a multi-way merge
1963 # sort. One would think it should be possible to filter
1964 # duplicates simply by dropping other entities already in the
1965 # state vector that are equal to the lowest entity, but because of
1966 # the weird sorting of repeated properties, we have to explicitly
1967 # keep a set of all keys, so we can remove later occurrences.
1968 # Note that entities will still be sorted correctly, within the
1969 # constraints given by the sort order.
1971 while state
and limit
> 0:
1972 item
= heapq
.heappop(state
)
1975 entity
= item
.entity
1977 if key
not in keys_seen
:
1984 queue
.putq((batch
, index
, key
))
1986 queue
.putq((batch
, index
, entity
))
1987 subit
= item
.iterator
1989 batch
, index
, entity
= yield subit
.getq()
1995 item
.entity
= entity
1996 heapq
.heappush(state
, item
)
1999 # Datastore API using the default context.
2001 def iter(self
, **q_options
):
2002 return QueryIterator(self
, **q_options
)
2006 # TODO: Add fetch() etc.?
2009 # Helper functions to convert between orders and orderings. An order
2010 # is a datastore_query.Order instance. An ordering is a
2011 # (property_name, direction) tuple.
2013 def _order_to_ordering(order
):
2015 return pb
.property(), pb
.direction() # TODO: What about UTF-8?
2018 def _orders_to_orderings(orders
):
2021 if isinstance(orders
, datastore_query
.PropertyOrder
):
2022 return [_order_to_ordering(orders
)]
2023 if isinstance(orders
, datastore_query
.CompositeOrder
):
2024 # TODO: What about UTF-8?
2025 return [(pb
.property(), pb
.direction()) for pb
in orders
._to
_pbs
()]
2026 raise ValueError('Bad order: %r' % (orders
,))
2029 def _ordering_to_order(ordering
, modelclass
):
2030 name
, direction
= ordering
2031 prop
= _get_prop_from_modelclass(modelclass
, name
)
2032 if prop
._name
!= name
:
2033 raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
2034 'returned a property whose name is %r?!' %
2035 (modelclass
.__name
__, name
, prop
._name
))
2036 return datastore_query
.PropertyOrder(name
, direction
)
2039 def _orderings_to_orders(orderings
, modelclass
):
2040 orders
= [_ordering_to_order(o
, modelclass
) for o
in orderings
]
2043 if len(orders
) == 1:
2045 return datastore_query
.CompositeOrder(orders
)