App Engine Python SDK version 1.8.1
[gae.git] / python / google / appengine / ext / ndb / query.py
blob7dcaf7e0f19eea7d8def08c640fc7cc47603c019
1 """Higher-level Query wrapper.
3 There are perhaps too many query APIs in the world.
5 The fundamental API here overloads the 6 comparisons operators to
6 represent filters on property values, and supports AND and OR
7 operations (implemented as functions -- Python's 'and' and 'or'
8 operators cannot be overloaded, and the '&' and '|' operators have a
9 priority that conflicts with the priority of comparison operators).
10 For example:
12 class Employee(Model):
13 name = StringProperty()
14 age = IntegerProperty()
15 rank = IntegerProperty()
17 @classmethod
18 def demographic(cls, min_age, max_age):
19 return cls.query().filter(AND(cls.age >= min_age, cls.age <= max_age))
21 @classmethod
22 def ranked(cls, rank):
23 return cls.query(cls.rank == rank).order(cls.age)
25 for emp in Employee.seniors(42, 5):
26 print emp.name, emp.age, emp.rank
28 The 'in' operator cannot be overloaded, but is supported through the
29 IN() method. For example:
31 Employee.query().filter(Employee.rank.IN([4, 5, 6]))
33 Sort orders are supported through the order() method; unary minus is
34 overloaded on the Property class to represent a descending order:
36 Employee.query().order(Employee.name, -Employee.age)
38 Besides using AND() and OR(), filters can also be combined by
39 repeatedly calling .filter():
41 q1 = Employee.query() # A query that returns all employees
42 q2 = q1.filter(Employee.age >= 30) # Only those over 30
43 q3 = q2.filter(Employee.age < 40) # Only those in their 30s
45 A further shortcut is calling .filter() with multiple arguments; this
46 implies AND():
48 q1 = Employee.query() # A query that returns all employees
49 q3 = q1.filter(Employee.age >= 30,
50 Employee.age < 40) # Only those in their 30s
52 And finally you can also pass one or more filter expressions directly
53 to the .query() method:
55 q3 = Employee.query(Employee.age >= 30,
56 Employee.age < 40) # Only those in their 30s
58 Query objects are immutable, so these methods always return a new
59 Query object; the above calls to filter() do not affect q1. (On the
60 other hand, operations that are effectively no-ops may return the
61 original Query object.)
63 Sort orders can also be combined this way, and .filter() and .order()
64 calls may be intermixed:
66 q4 = q3.order(-Employee.age)
67 q5 = q4.order(Employee.name)
68 q6 = q5.filter(Employee.rank == 5)
70 Again, multiple .order() calls can be combined:
72 q5 = q3.order(-Employee.age, Employee.name)
74 The simplest way to retrieve Query results is a for-loop:
76 for emp in q3:
77 print emp.name, emp.age
79 Some other methods to run a query and access its results:
81 q.iter() # Return an iterator; same as iter(q) but more flexible
82 q.map(callback) # Call the callback function for each query result
83 q.fetch(N) # Return a list of the first N results
84 q.get() # Return the first result
85 q.count(N) # Return the number of results, with a maximum of N
86 q.fetch_page(N, start_cursor=cursor) # Return (results, cursor, has_more)
88 All of the above methods take a standard set of additional query
89 options, either in the form of keyword arguments such as
90 keys_only=True, or as QueryOptions object passed with
91 options=QueryOptions(...). The most important query options are:
93 keys_only: bool, if set the results are keys instead of entities
94 limit: int, limits the number of results returned
95 offset: int, skips this many results first
96 start_cursor: Cursor, start returning results after this position
97 end_cursor: Cursor, stop returning results after this position
98 batch_size: int, hint for the number of results returned per RPC
99 prefetch_size: int, hint for the number of results in the first RPC
100 produce_cursors: bool, return Cursor objects with the results
102 For additional (obscure) query options and more details on them,
103 including an explanation of Cursors, see datastore_query.py.
105 All of the above methods except for iter() have asynchronous variants
106 as well, which return a Future; to get the operation's ultimate
107 result, yield the Future (when inside a tasklet) or call the Future's
108 get_result() method (outside a tasklet):
110 q.map_async(callback) # Callback may be a task or a plain function
111 q.fetch_async(N)
112 q.get_async()
113 q.count_async(N)
114 q.fetch_page_async(N, start_cursor=cursor)
116 Finally, there's an idiom to efficiently loop over the Query results
117 in a tasklet, properly yielding when appropriate:
119 it = q.iter()
120 while (yield it.has_next_async()):
121 emp = it.next()
122 print emp.name, emp.age
125 from __future__ import with_statement
126 del with_statement # No need to export this.
128 __author__ = 'guido@google.com (Guido van Rossum)'
130 import datetime
131 import heapq
132 import itertools
133 import sys
135 from .google_imports import datastore_errors
136 from .google_imports import datastore_rpc
137 from .google_imports import datastore_types
138 from .google_imports import datastore_query
139 from .google_imports import namespace_manager
141 from . import model
142 from . import context
143 from . import tasklets
144 from . import utils
146 __all__ = ['Query', 'QueryOptions', 'Cursor', 'QueryIterator',
147 'RepeatedStructuredPropertyPredicate',
148 'AND', 'OR', 'ConjunctionNode', 'DisjunctionNode',
149 'FilterNode', 'PostFilterNode', 'FalseNode', 'Node',
150 'ParameterNode', 'ParameterizedThing', 'Parameter',
151 'ParameterizedFunction', 'gql',
154 # Re-export some useful classes from the lower-level module.
155 Cursor = datastore_query.Cursor
157 # Some local renamings.
158 _ASC = datastore_query.PropertyOrder.ASCENDING
159 _DESC = datastore_query.PropertyOrder.DESCENDING
160 _AND = datastore_query.CompositeFilter.AND
161 _KEY = datastore_types._KEY_SPECIAL_PROPERTY
163 # Table of supported comparison operators.
164 _OPS = frozenset(['=', '!=', '<', '<=', '>', '>=', 'in'])
166 # Default limit value. (Yes, the datastore uses int32!)
167 _MAX_LIMIT = 2 ** 31 - 1
170 class QueryOptions(context.ContextOptions, datastore_query.QueryOptions):
171 """Support both context options and query options (esp. use_cache)."""
174 class RepeatedStructuredPropertyPredicate(datastore_query.FilterPredicate):
175 # Used by model.py.
177 def __init__(self, match_keys, pb, key_prefix):
178 super(RepeatedStructuredPropertyPredicate, self).__init__()
179 self.match_keys = match_keys
180 stripped_keys = []
181 for key in match_keys:
182 if not key.startswith(key_prefix):
183 raise ValueError('key %r does not begin with the specified prefix of %s'
184 % (key, key_prefix))
185 stripped_keys.append(key[len(key_prefix):])
186 value_map = datastore_query._make_key_value_map(pb, stripped_keys)
187 self.match_values = tuple(value_map[key][0] for key in stripped_keys)
189 def _get_prop_names(self):
190 return frozenset(self.match_keys)
192 def _apply(self, key_value_map):
193 """Apply the filter to values extracted from an entity.
195 Think of self.match_keys and self.match_values as representing a
196 table with one row. For example:
198 match_keys = ('name', 'age', 'rank')
199 match_values = ('Joe', 24, 5)
201 (Except that in reality, the values are represented by tuples
202 produced by datastore_types.PropertyValueToKeyValue().)
204 represents this table:
206 | name | age | rank |
207 +---------+-------+--------+
208 | 'Joe' | 24 | 5 |
210 Think of key_value_map as a table with the same structure but
211 (potentially) many rows. This represents a repeated structured
212 property of a single entity. For example:
214 {'name': ['Joe', 'Jane', 'Dick'],
215 'age': [24, 21, 23],
216 'rank': [5, 1, 2]}
218 represents this table:
220 | name | age | rank |
221 +---------+-------+--------+
222 | 'Joe' | 24 | 5 |
223 | 'Jane' | 21 | 1 |
224 | 'Dick' | 23 | 2 |
226 We must determine wheter at least one row of the second table
227 exactly matches the first table. We need this class because the
228 datastore, when asked to find an entity with name 'Joe', age 24
229 and rank 5, will include entities that have 'Joe' somewhere in the
230 name column, 24 somewhere in the age column, and 5 somewhere in
231 the rank column, but not all aligned on a single row. Such an
232 entity should not be considered a match.
234 columns = []
235 for key in self.match_keys:
236 column = key_value_map.get(key)
237 if not column: # None, or an empty list.
238 return False # If any column is empty there can be no match.
239 columns.append(column)
240 # Use izip to transpose the columns into rows.
241 return self.match_values in itertools.izip(*columns)
243 # Don't implement _prune()! It would mess up the row correspondence
244 # within columns.
247 class ParameterizedThing(object):
248 """Base class for Parameter and ParameterizedFunction.
250 This exists purely for isinstance() checks.
253 def __eq__(self, other):
254 raise NotImplementedError
256 def __ne__(self, other):
257 eq = self.__eq__(other)
258 if eq is not NotImplemented:
259 eq = not eq
260 return eq
262 class Parameter(ParameterizedThing):
263 """Represents a bound variable in a GQL query.
265 Parameter(1) corresponds to a slot labeled ":1" in a GQL query.
266 Parameter('xyz') corresponds to a slot labeled ":xyz".
268 The value must be set (bound) separately by calling .set(value).
271 def __init__(self, key):
272 """Constructor.
274 Args:
275 key: The Parameter key, must be either an integer or a string.
277 if not isinstance(key, (int, long, basestring)):
278 raise TypeError('Parameter key must be an integer or string, not %s' %
279 (key,))
280 self.__key = key
282 def __repr__(self):
283 return '%s(%r)' % (self.__class__.__name__, self.__key)
285 def __eq__(self, other):
286 if not isinstance(other, Parameter):
287 return NotImplemented
288 return self.__key == other.__key
290 @property
291 def key(self):
292 """Retrieve the key."""
293 return self.__key
295 def resolve(self, bindings, used):
296 key = self.__key
297 if key not in bindings:
298 raise datastore_errors.BadArgumentError(
299 'Parameter :%s is not bound.' % key)
300 value = bindings[key]
301 used[key] = True
302 return value
305 class ParameterizedFunction(ParameterizedThing):
306 """Represents a GQL function with parameterized arguments.
308 For example, ParameterizedFunction('key', [Parameter(1)]) stands for
309 the GQL syntax KEY(:1).
312 def __init__(self, func, values):
313 from .google_imports import gql # Late import, to avoid name conflict.
314 self.__func = func
315 self.__values = values
316 # NOTE: A horrible hack using GQL private variables so we can
317 # reuse GQL's implementations of its built-in functions.
318 gqli = gql.GQL('SELECT * FROM Dummy')
319 gql_method = gqli._GQL__cast_operators[func]
320 self.__method = getattr(gqli, '_GQL' + gql_method.__name__)
322 def __repr__(self):
323 return 'ParameterizedFunction(%r, %r)' % (self.__func, self.__values)
325 def __eq__(self, other):
326 if not isinstance(other, ParameterizedFunction):
327 return NotImplemented
328 return (self.__func == other.__func and
329 self.__values == other.__values)
331 @property
332 def func(self):
333 return self.__func
335 @property
336 def values(self):
337 return self.__values
339 def is_parameterized(self):
340 for val in self.__values:
341 if isinstance(val, Parameter):
342 return True
343 return False
345 def resolve(self, bindings, used):
346 values = []
347 for val in self.__values:
348 if isinstance(val, Parameter):
349 val = val.resolve(bindings, used)
350 values.append(val)
351 result = self.__method(values)
352 # The gql module returns slightly different types in some cases.
353 if self.__func == 'key' and isinstance(result, datastore_types.Key):
354 result = model.Key.from_old_key(result)
355 elif self.__func == 'time' and isinstance(result, datetime.datetime):
356 result = datetime.time(result.hour, result.minute,
357 result.second, result.microsecond)
358 elif self.__func == 'date' and isinstance(result, datetime.datetime):
359 result = datetime.date(result.year, result.month, result.day)
360 return result
363 class Node(object):
364 """Base class for filter expression tree nodes.
366 Tree nodes are considered immutable, even though they can contain
367 Parameter instances, which are not. In particular, two identical
368 trees may be represented by the same Node object in different
369 contexts.
372 def __new__(cls):
373 if cls is Node:
374 raise TypeError('Cannot instantiate Node, only a subclass.')
375 return super(Node, cls).__new__(cls)
377 def __eq__(self, other):
378 raise NotImplementedError
380 def __ne__(self, other):
381 eq = self.__eq__(other)
382 if eq is not NotImplemented:
383 eq = not eq
384 return eq
386 def __unordered(self, unused_other):
387 raise TypeError('Nodes cannot be ordered')
388 __le__ = __lt__ = __ge__ = __gt__ = __unordered
390 def _to_filter(self, post=False):
391 """Helper to convert to datastore_query.Filter, or None."""
392 raise NotImplementedError
394 def _post_filters(self):
395 """Helper to extract post-filter Nodes, if any."""
396 return None
398 def resolve(self, bindings, used):
399 """Return a Node with Parameters replaced by the selected values.
401 Args:
402 bindings: A dict mapping integers and strings to values.
403 used: A dict into which use of use of a binding is recorded.
405 Returns:
406 A Node instance.
408 return self
411 class FalseNode(Node):
412 """Tree node for an always-failing filter."""
414 def __eq__(self, other):
415 if not isinstance(other, FalseNode):
416 return NotImplemented
417 return True
419 def _to_filter(self, post=False):
420 if post:
421 return None
422 # Because there's no point submitting a query that will never
423 # return anything.
424 raise datastore_errors.BadQueryError(
425 'Cannot convert FalseNode to predicate')
428 class ParameterNode(Node):
429 """Tree node for a parameterized filter."""
431 def __new__(cls, prop, op, param):
432 if not isinstance(prop, model.Property):
433 raise TypeError('Expected a Property, got %r' % (prop,))
434 if op not in _OPS:
435 raise TypeError('Expected a valid operator, got %r' % (op,))
436 if not isinstance(param, ParameterizedThing):
437 raise TypeError('Expected a ParameterizedThing, got %r' % (param,))
438 obj = super(ParameterNode, cls).__new__(cls)
439 obj.__prop = prop
440 obj.__op = op
441 obj.__param = param
442 return obj
444 def __repr__(self):
445 return 'ParameterNode(%r, %r, %r)' % (self.__prop, self.__op, self.__param)
447 def __eq__(self, other):
448 if not isinstance(other, ParameterNode):
449 return NotImplemented
450 return (self.__prop._name == other.__prop._name and
451 self.__op == other.__op and
452 self.__param == other.__param)
454 def _to_filter(self, post=False):
455 raise datastore_errors.BadArgumentError(
456 'Parameter :%s is not bound.' % (self.__param.key,))
458 def resolve(self, bindings, used):
459 value = self.__param.resolve(bindings, used)
460 if self.__op == 'in':
461 return self.__prop._IN(value)
462 else:
463 return self.__prop._comparison(self.__op, value)
466 class FilterNode(Node):
467 """Tree node for a single filter expression."""
469 def __new__(cls, name, opsymbol, value):
470 if isinstance(value, model.Key):
471 value = value.to_old_key()
472 if opsymbol == '!=':
473 n1 = FilterNode(name, '<', value)
474 n2 = FilterNode(name, '>', value)
475 return DisjunctionNode(n1, n2)
476 if opsymbol == 'in':
477 if not isinstance(value, (list, tuple, set, frozenset)):
478 raise TypeError('in expected a list, tuple or set of values; '
479 'received %r' % value)
480 nodes = [FilterNode(name, '=', v) for v in value]
481 if not nodes:
482 return FalseNode()
483 if len(nodes) == 1:
484 return nodes[0]
485 return DisjunctionNode(*nodes)
486 self = super(FilterNode, cls).__new__(cls)
487 self.__name = name
488 self.__opsymbol = opsymbol
489 self.__value = value
490 return self
492 def __repr__(self):
493 return '%s(%r, %r, %r)' % (self.__class__.__name__,
494 self.__name, self.__opsymbol, self.__value)
496 def __eq__(self, other):
497 if not isinstance(other, FilterNode):
498 return NotImplemented
499 # TODO: Should nodes with values that compare equal but have
500 # different types really be considered equal? IIUC the datastore
501 # doesn't consider 1 equal to 1.0 when it compares property values.
502 return (self.__name == other.__name and
503 self.__opsymbol == other.__opsymbol and
504 self.__value == other.__value)
506 def _to_filter(self, post=False):
507 if post:
508 return None
509 if self.__opsymbol in ('!=', 'in'):
510 raise NotImplementedError('Inequality filters are not single filter '
511 'expressions and therefore cannot be converted '
512 'to a single filter (%r)' % self.__opsymbol)
513 value = self.__value
514 return datastore_query.make_filter(self.__name.decode('utf-8'),
515 self.__opsymbol, value)
518 class PostFilterNode(Node):
519 """Tree node representing an in-memory filtering operation.
521 This is used to represent filters that cannot be executed by the
522 datastore, for example a query for a structured value.
525 def __new__(cls, predicate):
526 self = super(PostFilterNode, cls).__new__(cls)
527 self.predicate = predicate
528 return self
530 def __repr__(self):
531 return '%s(%s)' % (self.__class__.__name__, self.predicate)
533 def __eq__(self, other):
534 if not isinstance(other, PostFilterNode):
535 return NotImplemented
536 return self is other
538 def _to_filter(self, post=False):
539 if post:
540 return self.predicate
541 else:
542 return None
545 class ConjunctionNode(Node):
546 """Tree node representing a Boolean AND operator on two or more nodes."""
548 def __new__(cls, *nodes):
549 if not nodes:
550 raise TypeError('ConjunctionNode() requires at least one node.')
551 elif len(nodes) == 1:
552 return nodes[0]
553 clauses = [[]] # Outer: Disjunction; inner: Conjunction.
554 # TODO: Remove duplicates?
555 for node in nodes:
556 if not isinstance(node, Node):
557 raise TypeError('ConjunctionNode() expects Node instances as arguments;'
558 ' received a non-Node instance %r' % node)
559 if isinstance(node, DisjunctionNode):
560 # Apply the distributive law: (X or Y) and (A or B) becomes
561 # (X and A) or (X and B) or (Y and A) or (Y and B).
562 new_clauses = []
563 for clause in clauses:
564 for subnode in node:
565 new_clause = clause + [subnode]
566 new_clauses.append(new_clause)
567 clauses = new_clauses
568 elif isinstance(node, ConjunctionNode):
569 # Apply half of the distributive law: (X or Y) and A becomes
570 # (X and A) or (Y and A).
571 for clause in clauses:
572 clause.extend(node.__nodes)
573 else:
574 # Ditto.
575 for clause in clauses:
576 clause.append(node)
577 if not clauses:
578 return FalseNode()
579 if len(clauses) > 1:
580 return DisjunctionNode(*[ConjunctionNode(*clause) for clause in clauses])
581 self = super(ConjunctionNode, cls).__new__(cls)
582 self.__nodes = clauses[0]
583 return self
585 def __iter__(self):
586 return iter(self.__nodes)
588 def __repr__(self):
589 return 'AND(%s)' % (', '.join(map(str, self.__nodes)))
591 def __eq__(self, other):
592 if not isinstance(other, ConjunctionNode):
593 return NotImplemented
594 return self.__nodes == other.__nodes
596 def _to_filter(self, post=False):
597 filters = filter(None,
598 (node._to_filter(post=post)
599 for node in self.__nodes
600 if isinstance(node, PostFilterNode) == post))
601 if not filters:
602 return None
603 if len(filters) == 1:
604 return filters[0]
605 return datastore_query.CompositeFilter(_AND, filters)
607 def _post_filters(self):
608 post_filters = [node for node in self.__nodes
609 if isinstance(node, PostFilterNode)]
610 if not post_filters:
611 return None
612 if len(post_filters) == 1:
613 return post_filters[0]
614 if post_filters == self.__nodes:
615 return self
616 return ConjunctionNode(*post_filters)
618 def resolve(self, bindings, used):
619 nodes = [node.resolve(bindings, used) for node in self.__nodes]
620 if nodes == self.__nodes:
621 return self
622 return ConjunctionNode(*nodes)
625 class DisjunctionNode(Node):
626 """Tree node representing a Boolean OR operator on two or more nodes."""
628 def __new__(cls, *nodes):
629 if not nodes:
630 raise TypeError('DisjunctionNode() requires at least one node')
631 elif len(nodes) == 1:
632 return nodes[0]
633 self = super(DisjunctionNode, cls).__new__(cls)
634 self.__nodes = []
635 # TODO: Remove duplicates?
636 for node in nodes:
637 if not isinstance(node, Node):
638 raise TypeError('DisjunctionNode() expects Node instances as arguments;'
639 ' received a non-Node instance %r' % node)
640 if isinstance(node, DisjunctionNode):
641 self.__nodes.extend(node.__nodes)
642 else:
643 self.__nodes.append(node)
644 return self
646 def __iter__(self):
647 return iter(self.__nodes)
649 def __repr__(self):
650 return 'OR(%s)' % (', '.join(map(str, self.__nodes)))
652 def __eq__(self, other):
653 if not isinstance(other, DisjunctionNode):
654 return NotImplemented
655 return self.__nodes == other.__nodes
657 def resolve(self, bindings, used):
658 nodes = [node.resolve(bindings, used) for node in self.__nodes]
659 if nodes == self.__nodes:
660 return self
661 return DisjunctionNode(*nodes)
664 # AND and OR are preferred aliases for these.
665 AND = ConjunctionNode
666 OR = DisjunctionNode
669 def _args_to_val(func, args):
670 """Helper for GQL parsing to extract values from GQL expressions.
672 This can extract the value from a GQL literal, return a Parameter
673 for a GQL bound parameter (:1 or :foo), and interprets casts like
674 KEY(...) and plain lists of values like (1, 2, 3).
676 Args:
677 func: A string indicating what kind of thing this is.
678 args: One or more GQL values, each integer, string, or GQL literal.
680 from .google_imports import gql # Late import, to avoid name conflict.
681 vals = []
682 for arg in args:
683 if isinstance(arg, (int, long, basestring)):
684 val = Parameter(arg)
685 elif isinstance(arg, gql.Literal):
686 val = arg.Get()
687 else:
688 raise TypeError('Unexpected arg (%r)' % arg)
689 vals.append(val)
690 if func == 'nop':
691 if len(vals) != 1:
692 raise TypeError('"nop" requires exactly one value')
693 return vals[0] # May be a Parameter
694 pfunc = ParameterizedFunction(func, vals)
695 if pfunc.is_parameterized():
696 return pfunc
697 else:
698 return pfunc.resolve({}, {})
701 def _get_prop_from_modelclass(modelclass, name):
702 """Helper for FQL parsing to turn a property name into a property object.
704 Args:
705 modelclass: The model class specified in the query.
706 name: The property name. This may contain dots which indicate
707 sub-properties of structured properties.
709 Returns:
710 A Property object.
712 Raises:
713 KeyError if the property doesn't exist and the model clas doesn't
714 derive from Expando.
716 if name == '__key__':
717 return modelclass._key
719 parts = name.split('.')
720 part, more = parts[0], parts[1:]
721 prop = modelclass._properties.get(part)
722 if prop is None:
723 if issubclass(modelclass, model.Expando):
724 prop = model.GenericProperty(part)
725 else:
726 raise TypeError('Model %s has no property named %r' %
727 (modelclass._get_kind(), part))
729 while more:
730 part = more.pop(0)
731 if not isinstance(prop, model.StructuredProperty):
732 raise TypeError('Model %s has no property named %r' %
733 (modelclass._get_kind(), part))
734 maybe = getattr(prop, part, None)
735 if isinstance(maybe, model.Property) and maybe._name == part:
736 prop = maybe
737 else:
738 maybe = prop._modelclass._properties.get(part)
739 if maybe is not None:
740 # Must get it this way to get the copy with the long name.
741 # (See StructuredProperty.__getattr__() for details.)
742 prop = getattr(prop, maybe._code_name)
743 else:
744 if issubclass(prop._modelclass, model.Expando) and not more:
745 prop = model.GenericProperty()
746 prop._name = name # Bypass the restriction on dots.
747 else:
748 raise KeyError('Model %s has no property named %r' %
749 (prop._modelclass._get_kind(), part))
751 return prop
754 class Query(object):
755 """Query object.
757 Usually constructed by calling Model.query().
759 See module docstring for examples.
761 Note that not all operations on Queries are supported by _MultiQuery
762 instances; the latter are generated as necessary when any of the
763 operators !=, IN or OR is used.
766 @utils.positional(1)
767 def __init__(self, kind=None, ancestor=None, filters=None, orders=None,
768 app=None, namespace=None, default_options=None,
769 projection=None, group_by=None):
770 """Constructor.
771 Args:
772 kind: Optional kind string.
773 ancestor: Optional ancestor Key.
774 filters: Optional Node representing a filter expression tree.
775 orders: Optional datastore_query.Order object.
776 app: Optional app id.
777 namespace: Optional namespace.
778 default_options: Optional QueryOptions object.
779 projection: Optional list or tuple of properties to project.
780 group_by: Optional list or tuple of properties to group by.
782 # TODO(arfuller): Accept projection=Model.key to mean keys_only.
783 # TODO(arfuller): Consider adding incremental function
784 # group_by_property(*args) and project(*args, distinct=False).
786 # Validating input.
787 if ancestor is not None:
788 if isinstance(ancestor, ParameterizedThing):
789 if isinstance(ancestor, ParameterizedFunction):
790 if ancestor.func != 'key':
791 raise TypeError('ancestor cannot be a GQL function other than KEY')
792 else:
793 if not isinstance(ancestor, model.Key):
794 raise TypeError('ancestor must be a Key; received %r' % (ancestor,))
795 if not ancestor.id():
796 raise ValueError('ancestor cannot be an incomplete key')
797 if app is not None:
798 if app != ancestor.app():
799 raise TypeError('app/ancestor mismatch')
800 if namespace is None:
801 namespace = ancestor.namespace()
802 else:
803 if namespace != ancestor.namespace():
804 raise TypeError('namespace/ancestor mismatch')
805 if filters is not None:
806 if not isinstance(filters, Node):
807 raise TypeError('filters must be a query Node or None; received %r' %
808 (filters,))
809 if orders is not None:
810 if not isinstance(orders, datastore_query.Order):
811 raise TypeError('orders must be an Order instance or None; received %r'
812 % (orders,))
813 if default_options is not None:
814 if not isinstance(default_options, datastore_rpc.BaseConfiguration):
815 raise TypeError('default_options must be a Configuration or None; '
816 'received %r' % (default_options,))
817 if projection is not None:
818 if default_options.projection is not None:
819 raise TypeError('cannot use projection= and '
820 'default_options.projection at the same time')
821 if default_options.keys_only is not None:
822 raise TypeError('cannot use projection= and '
823 'default_options.keys_only at the same time')
825 self.__kind = kind # String.
826 self.__ancestor = ancestor # Key.
827 self.__filters = filters # None or Node subclass.
828 self.__orders = orders # None or datastore_query.Order instance.
829 self.__app = app
830 self.__namespace = namespace
831 self.__default_options = default_options
833 # Checked late as _check_properties depends on local state.
834 self.__projection = None
835 if projection is not None:
836 if not projection:
837 raise TypeError('projection argument cannot be empty')
838 if not isinstance(projection, (tuple, list)):
839 raise TypeError(
840 'projection must be a tuple, list or None; received %r' %
841 (projection,))
842 self._check_properties(self._to_property_names(projection))
843 self.__projection = tuple(projection)
845 self.__group_by = None
846 if group_by is not None:
847 if not group_by:
848 raise TypeError('group_by argument cannot be empty')
849 if not isinstance(group_by, (tuple, list)):
850 raise TypeError(
851 'group_by must be a tuple, list or None; received %r' % (group_by,))
852 self._check_properties(self._to_property_names(group_by))
853 self.__group_by = tuple(group_by)
855 def __repr__(self):
856 args = []
857 if self.app is not None:
858 args.append('app=%r' % self.app)
859 if (self.namespace is not None and
860 self.namespace != namespace_manager.get_namespace()):
861 # Only show the namespace if set and not the current namespace.
862 # (This is similar to what Key.__repr__() does.)
863 args.append('namespace=%r' % self.namespace)
864 if self.kind is not None:
865 args.append('kind=%r' % self.kind)
866 if self.ancestor is not None:
867 args.append('ancestor=%r' % self.ancestor)
868 if self.filters is not None:
869 args.append('filters=%r' % self.filters)
870 if self.orders is not None:
871 # TODO: Format orders better.
872 args.append('orders=...') # PropertyOrder doesn't have a good repr().
873 if self.projection:
874 args.append('projection=%r' % (self._to_property_names(self.projection)))
875 if self.group_by:
876 args.append('group_by=%r' % (self._to_property_names(self.group_by)))
877 if self.default_options is not None:
878 args.append('default_options=%r' % self.default_options)
879 return '%s(%s)' % (self.__class__.__name__, ', '.join(args))
881 def _fix_namespace(self):
882 """Internal helper to fix the namespace.
884 This is called to ensure that for queries without an explicit
885 namespace, the namespace used by async calls is the one in effect
886 at the time the async call is made, not the one in effect when the
887 the request is actually generated.
889 if self.namespace is not None:
890 return self
891 namespace = namespace_manager.get_namespace()
892 return self.__class__(kind=self.kind, ancestor=self.ancestor,
893 filters=self.filters, orders=self.orders,
894 app=self.app, namespace=namespace,
895 default_options=self.default_options,
896 projection=self.projection, group_by=self.group_by)
898 def _get_query(self, connection):
899 self.bind() # Raises an exception if there are unbound parameters.
900 kind = self.kind
901 ancestor = self.ancestor
902 if ancestor is not None:
903 ancestor = connection.adapter.key_to_pb(ancestor)
904 filters = self.filters
905 post_filters = None
906 if filters is not None:
907 post_filters = filters._post_filters()
908 filters = filters._to_filter()
909 group_by = None
910 if self.group_by:
911 group_by = self._to_property_names(self.group_by)
912 dsquery = datastore_query.Query(app=self.app,
913 namespace=self.namespace,
914 kind=kind.decode('utf-8') if kind else None,
915 ancestor=ancestor,
916 filter_predicate=filters,
917 order=self.orders,
918 group_by=group_by)
919 if post_filters is not None:
920 dsquery = datastore_query._AugmentedQuery(
921 dsquery,
922 in_memory_filter=post_filters._to_filter(post=True))
923 return dsquery
925 @tasklets.tasklet
926 def run_to_queue(self, queue, conn, options=None, dsquery=None):
927 """Run this query, putting entities into the given queue."""
928 try:
929 multiquery = self._maybe_multi_query()
930 if multiquery is not None:
931 yield multiquery.run_to_queue(queue, conn, options=options)
932 return
934 if dsquery is None:
935 dsquery = self._get_query(conn)
936 rpc = dsquery.run_async(conn, options)
937 while rpc is not None:
938 batch = yield rpc
939 rpc = batch.next_batch_async(options)
940 for i, result in enumerate(batch.results):
941 queue.putq((batch, i, result))
942 queue.complete()
944 except GeneratorExit:
945 raise
946 except Exception:
947 if not queue.done():
948 _, e, tb = sys.exc_info()
949 queue.set_exception(e, tb)
950 raise
952 @tasklets.tasklet
953 def _run_to_list(self, results, options=None):
954 # Internal version of run_to_queue(), without a queue.
955 ctx = tasklets.get_context()
956 conn = ctx._conn
957 dsquery = self._get_query(conn)
958 rpc = dsquery.run_async(conn, options)
959 while rpc is not None:
960 batch = yield rpc
961 if (batch.skipped_results and
962 datastore_query.FetchOptions.offset(options)):
963 offset = options.offset - batch.skipped_results
964 options = datastore_query.FetchOptions(offset=offset, config=options)
965 rpc = batch.next_batch_async(options)
966 for result in batch.results:
967 result = ctx._update_cache_from_query_result(result, options)
968 if result is not None:
969 results.append(result)
971 raise tasklets.Return(results)
973 def _needs_multi_query(self):
974 filters = self.filters
975 return filters is not None and isinstance(filters, DisjunctionNode)
977 def _maybe_multi_query(self):
978 if not self._needs_multi_query():
979 return None
980 # Switch to a _MultiQuery.
981 filters = self.filters
982 subqueries = []
983 for subfilter in filters:
984 subquery = self.__class__(kind=self.kind, ancestor=self.ancestor,
985 filters=subfilter, orders=self.orders,
986 app=self.app, namespace=self.namespace,
987 default_options=self.default_options,
988 projection=self.projection,
989 group_by=self.group_by)
990 subqueries.append(subquery)
991 return _MultiQuery(subqueries)
993 @property
994 def kind(self):
995 """Accessor for the kind (a string or None)."""
996 return self.__kind
998 @property
999 def ancestor(self):
1000 """Accessor for the ancestor (a Key or None)."""
1001 return self.__ancestor
1003 @property
1004 def filters(self):
1005 """Accessor for the filters (a Node or None)."""
1006 return self.__filters
1008 @property
1009 def orders(self):
1010 """Accessor for the filters (a datastore_query.Order or None)."""
1011 return self.__orders
1013 @property
1014 def app(self):
1015 """Accessor for the app (a string or None)."""
1016 return self.__app
1018 @property
1019 def namespace(self):
1020 """Accessor for the namespace (a string or None)."""
1021 return self.__namespace
1023 @property
1024 def default_options(self):
1025 """Accessor for the default_options (a QueryOptions instance or None)."""
1026 return self.__default_options
1028 @property
1029 def group_by(self):
1030 """Accessor for the group by properties (a tuple instance or None)."""
1031 return self.__group_by
1033 @property
1034 def projection(self):
1035 """Accessor for the projected properties (a tuple instance or None)."""
1036 return self.__projection
1038 @property
1039 def is_distinct(self):
1040 """True if results are guaranteed to contain a unique set of property
1041 values.
1043 This happens when every property in the group_by is also in the projection.
1045 return bool(self.__group_by and
1046 set(self._to_property_names(self.__group_by)) <=
1047 set(self._to_property_names(self.__projection)))
1049 def filter(self, *args):
1050 """Return a new Query with additional filter(s) applied."""
1051 if not args:
1052 return self
1053 preds = []
1054 f = self.filters
1055 if f:
1056 preds.append(f)
1057 for arg in args:
1058 if not isinstance(arg, Node):
1059 raise TypeError('Cannot filter a non-Node argument; received %r' % arg)
1060 preds.append(arg)
1061 if not preds:
1062 pred = None
1063 elif len(preds) == 1:
1064 pred = preds[0]
1065 else:
1066 pred = ConjunctionNode(*preds)
1067 return self.__class__(kind=self.kind, ancestor=self.ancestor,
1068 filters=pred, orders=self.orders,
1069 app=self.app, namespace=self.namespace,
1070 default_options=self.default_options,
1071 projection=self.projection, group_by=self.group_by)
1073 def order(self, *args):
1074 """Return a new Query with additional sort order(s) applied."""
1075 # q.order(Employee.name, -Employee.age)
1076 if not args:
1077 return self
1078 orders = []
1079 o = self.orders
1080 if o:
1081 orders.append(o)
1082 for arg in args:
1083 if isinstance(arg, model.Property):
1084 orders.append(datastore_query.PropertyOrder(arg._name, _ASC))
1085 elif isinstance(arg, datastore_query.Order):
1086 orders.append(arg)
1087 else:
1088 raise TypeError('order() expects a Property or query Order; '
1089 'received %r' % arg)
1090 if not orders:
1091 orders = None
1092 elif len(orders) == 1:
1093 orders = orders[0]
1094 else:
1095 orders = datastore_query.CompositeOrder(orders)
1096 return self.__class__(kind=self.kind, ancestor=self.ancestor,
1097 filters=self.filters, orders=orders,
1098 app=self.app, namespace=self.namespace,
1099 default_options=self.default_options,
1100 projection=self.projection, group_by=self.group_by)
1102 # Datastore API using the default context.
1104 def iter(self, **q_options):
1105 """Construct an iterator over the query.
1107 Args:
1108 **q_options: All query options keyword arguments are supported.
1110 Returns:
1111 A QueryIterator object.
1113 self.bind() # Raises an exception if there are unbound parameters.
1114 return QueryIterator(self, **q_options)
1116 __iter__ = iter
1118 @utils.positional(2)
1119 def map(self, callback, pass_batch_into_callback=None,
1120 merge_future=None, **q_options):
1121 """Map a callback function or tasklet over the query results.
1123 Args:
1124 callback: A function or tasklet to be applied to each result; see below.
1125 merge_future: Optional Future subclass; see below.
1126 **q_options: All query options keyword arguments are supported.
1128 Callback signature: The callback is normally called with an entity
1129 as argument. However if keys_only=True is given, it is called
1130 with a Key. Also, when pass_batch_into_callback is True, it is
1131 called with three arguments: the current batch, the index within
1132 the batch, and the entity or Key at that index. The callback can
1133 return whatever it wants. If the callback is None, a trivial
1134 callback is assumed that just returns the entity or key passed in
1135 (ignoring produce_cursors).
1137 Optional merge future: The merge_future is an advanced argument
1138 that can be used to override how the callback results are combined
1139 into the overall map() return value. By default a list of
1140 callback return values is produced. By substituting one of a
1141 small number of specialized alternatives you can arrange
1142 otherwise. See tasklets.MultiFuture for the default
1143 implementation and a description of the protocol the merge_future
1144 object must implement the default. Alternatives from the same
1145 module include QueueFuture, SerialQueueFuture and ReducingFuture.
1147 Returns:
1148 When the query has run to completion and all callbacks have
1149 returned, map() returns a list of the results of all callbacks.
1150 (But see 'optional merge future' above.)
1152 return self.map_async(callback,
1153 pass_batch_into_callback=pass_batch_into_callback,
1154 merge_future=merge_future,
1155 **q_options).get_result()
1157 @utils.positional(2)
1158 def map_async(self, callback, pass_batch_into_callback=None,
1159 merge_future=None, **q_options):
1160 """Map a callback function or tasklet over the query results.
1162 This is the asynchronous version of Query.map().
1164 qry = self._fix_namespace()
1165 return tasklets.get_context().map_query(
1166 qry,
1167 callback,
1168 pass_batch_into_callback=pass_batch_into_callback,
1169 options=self._make_options(q_options),
1170 merge_future=merge_future)
1172 @utils.positional(2)
1173 def fetch(self, limit=None, **q_options):
1174 """Fetch a list of query results, up to a limit.
1176 Args:
1177 limit: How many results to retrieve at most.
1178 **q_options: All query options keyword arguments are supported.
1180 Returns:
1181 A list of results.
1183 return self.fetch_async(limit, **q_options).get_result()
1185 @utils.positional(2)
1186 def fetch_async(self, limit=None, **q_options):
1187 """Fetch a list of query results, up to a limit.
1189 This is the asynchronous version of Query.fetch().
1191 if limit is None:
1192 default_options = self._make_options(q_options)
1193 if default_options is not None and default_options.limit is not None:
1194 limit = default_options.limit
1195 else:
1196 limit = _MAX_LIMIT
1197 q_options['limit'] = limit
1198 q_options.setdefault('batch_size', limit)
1199 if self._needs_multi_query():
1200 return self.map_async(None, **q_options)
1201 # Optimization using direct batches.
1202 options = self._make_options(q_options)
1203 qry = self._fix_namespace()
1204 return qry._run_to_list([], options=options)
1206 def get(self, **q_options):
1207 """Get the first query result, if any.
1209 This is similar to calling q.fetch(1) and returning the first item
1210 of the list of results, if any, otherwise None.
1212 Args:
1213 **q_options: All query options keyword arguments are supported.
1215 Returns:
1216 A single result, or None if there are no results.
1218 return self.get_async(**q_options).get_result()
1220 def get_async(self, **q_options):
1221 """Get the first query result, if any.
1223 This is the asynchronous version of Query.get().
1225 qry = self._fix_namespace()
1226 return qry._get_async(**q_options)
1228 @tasklets.tasklet
1229 def _get_async(self, **q_options):
1230 """Internal version of get_async()."""
1231 res = yield self.fetch_async(1, **q_options)
1232 if not res:
1233 raise tasklets.Return(None)
1234 raise tasklets.Return(res[0])
1236 @utils.positional(2)
1237 def count(self, limit=None, **q_options):
1238 """Count the number of query results, up to a limit.
1240 This returns the same result as len(q.fetch(limit)) but more
1241 efficiently.
1243 Note that you must pass a maximum value to limit the amount of
1244 work done by the query.
1246 Args:
1247 limit: How many results to count at most.
1248 **q_options: All query options keyword arguments are supported.
1250 Returns:
1252 return self.count_async(limit, **q_options).get_result()
1254 @utils.positional(2)
1255 def count_async(self, limit=None, **q_options):
1256 """Count the number of query results, up to a limit.
1258 This is the asynchronous version of Query.count().
1260 qry = self._fix_namespace()
1261 return qry._count_async(limit=limit, **q_options)
1263 @tasklets.tasklet
1264 def _count_async(self, limit=None, **q_options):
1265 """Internal version of count_async()."""
1266 # TODO: Support offset by incorporating it to the limit.
1267 if 'offset' in q_options:
1268 raise NotImplementedError('.count() and .count_async() do not support '
1269 'offsets at present.')
1270 if 'limit' in q_options:
1271 raise TypeError('Cannot specify limit as a non-keyword argument and as a '
1272 'keyword argument simultaneously.')
1273 elif limit is None:
1274 limit = _MAX_LIMIT
1275 if self._needs_multi_query():
1276 # _MultiQuery does not support iterating over result batches,
1277 # so just fetch results and count them.
1278 # TODO: Use QueryIterator to avoid materializing the results list.
1279 q_options.setdefault('batch_size', limit)
1280 q_options.setdefault('keys_only', True)
1281 results = yield self.fetch_async(limit, **q_options)
1282 raise tasklets.Return(len(results))
1284 # Issue a special query requesting 0 results at a given offset.
1285 # The skipped_results count will tell us how many hits there were
1286 # before that offset without fetching the items.
1287 q_options['offset'] = limit
1288 q_options['limit'] = 0
1289 options = self._make_options(q_options)
1290 conn = tasklets.get_context()._conn
1291 dsquery = self._get_query(conn)
1292 rpc = dsquery.run_async(conn, options)
1293 total = 0
1294 while rpc is not None:
1295 batch = yield rpc
1296 rpc = batch.next_batch_async(options)
1297 total += batch.skipped_results
1298 raise tasklets.Return(total)
1300 @utils.positional(2)
1301 def fetch_page(self, page_size, **q_options):
1302 """Fetch a page of results.
1304 This is a specialized method for use by paging user interfaces.
1306 Args:
1307 page_size: The requested page size. At most this many results
1308 will be returned.
1310 In addition, any keyword argument supported by the QueryOptions
1311 class is supported. In particular, to fetch the next page, you
1312 pass the cursor returned by one call to the next call using
1313 start_cursor=<cursor>. A common idiom is to pass the cursor to
1314 the client using <cursor>.to_websafe_string() and to reconstruct
1315 that cursor on a subsequent request using
1316 Cursor.from_websafe_string(<string>).
1318 Returns:
1319 A tuple (results, cursor, more) where results is a list of query
1320 results, cursor is a cursor pointing just after the last result
1321 returned, and more is a bool indicating whether there are
1322 (likely) more results after that.
1324 # NOTE: page_size can't be passed as a keyword.
1325 return self.fetch_page_async(page_size, **q_options).get_result()
1327 @utils.positional(2)
1328 def fetch_page_async(self, page_size, **q_options):
1329 """Fetch a page of results.
1331 This is the asynchronous version of Query.fetch_page().
1333 qry = self._fix_namespace()
1334 return qry._fetch_page_async(page_size, **q_options)
1336 @tasklets.tasklet
1337 def _fetch_page_async(self, page_size, **q_options):
1338 """Internal version of fetch_page_async()."""
1339 q_options.setdefault('batch_size', page_size)
1340 q_options.setdefault('produce_cursors', True)
1341 it = self.iter(limit=page_size + 1, **q_options)
1342 results = []
1343 while (yield it.has_next_async()):
1344 results.append(it.next())
1345 if len(results) >= page_size:
1346 break
1347 try:
1348 cursor = it.cursor_after()
1349 except datastore_errors.BadArgumentError:
1350 cursor = None
1351 raise tasklets.Return(results, cursor, it.probably_has_next())
1353 def _make_options(self, q_options):
1354 """Helper to construct a QueryOptions object from keyword arguents.
1356 Args:
1357 q_options: a dict of keyword arguments.
1359 Note that either 'options' or 'config' can be used to pass another
1360 QueryOptions object, but not both. If another QueryOptions object is
1361 given it provides default values.
1363 If self.default_options is set, it is used to provide defaults,
1364 which have a lower precedence than options set in q_options.
1366 Returns:
1367 A QueryOptions object, or None if q_options is empty.
1369 if not q_options:
1370 return self.default_options
1371 if 'options' in q_options:
1372 # Move 'options' to 'config' since that is what QueryOptions() uses.
1373 if 'config' in q_options:
1374 raise TypeError('You cannot use config= and options= at the same time')
1375 q_options['config'] = q_options.pop('options')
1376 if q_options.get('projection'):
1377 try:
1378 q_options['projection'] = self._to_property_names(
1379 q_options['projection'])
1380 except TypeError, e:
1381 raise datastore_errors.BadArgumentError(e)
1382 self._check_properties(q_options['projection'])
1383 options = QueryOptions(**q_options)
1385 # Populate projection if it hasn't been overridden.
1386 if (options.keys_only is None and
1387 options.projection is None and
1388 self.__projection):
1389 options = QueryOptions(
1390 projection=self._to_property_names(self.__projection), config=options)
1391 # Populate default options
1392 if self.default_options is not None:
1393 options = self.default_options.merge(options)
1395 return options
1397 def _to_property_names(self, properties):
1398 if not isinstance(properties, (list, tuple)):
1399 properties = [properties] # It will be type-checked below.
1400 fixed = []
1401 for proj in properties:
1402 if isinstance(proj, basestring):
1403 fixed.append(proj)
1404 elif isinstance(proj, model.Property):
1405 fixed.append(proj._name)
1406 else:
1407 raise TypeError(
1408 'Unexpected property (%r); should be string or Property' % (proj,))
1409 return fixed
1411 def _check_properties(self, fixed, **kwargs):
1412 modelclass = model.Model._kind_map.get(self.__kind)
1413 if modelclass is not None:
1414 modelclass._check_properties(fixed, **kwargs)
1416 def analyze(self):
1417 """Return a list giving the parameters required by a query."""
1418 class MockBindings(dict):
1419 def __contains__(self, key):
1420 self[key] = None
1421 return True
1422 bindings = MockBindings()
1423 used = {}
1424 ancestor = self.ancestor
1425 if isinstance(ancestor, ParameterizedThing):
1426 ancestor = ancestor.resolve(bindings, used)
1427 filters = self.filters
1428 if filters is not None:
1429 filters = filters.resolve(bindings, used)
1430 return sorted(used) # Returns only the keys.
1432 def bind(self, *args, **kwds):
1433 """Bind parameter values. Returns a new Query object."""
1434 return self._bind(args, kwds)
1436 def _bind(self, args, kwds):
1437 """Bind parameter values. Returns a new Query object."""
1438 bindings = dict(kwds)
1439 for i, arg in enumerate(args):
1440 bindings[i + 1] = arg
1441 used = {}
1442 ancestor = self.ancestor
1443 if isinstance(ancestor, ParameterizedThing):
1444 ancestor = ancestor.resolve(bindings, used)
1445 filters = self.filters
1446 if filters is not None:
1447 filters = filters.resolve(bindings, used)
1448 unused = []
1449 for i in xrange(1, 1 + len(args)):
1450 if i not in used:
1451 unused.append(i)
1452 if unused:
1453 raise datastore_errors.BadArgumentError(
1454 'Positional arguments %s were given but not used.' %
1455 ', '.join(str(i) for i in unused))
1456 return self.__class__(kind=self.kind, ancestor=ancestor,
1457 filters=filters, orders=self.orders,
1458 app=self.app, namespace=self.namespace,
1459 default_options=self.default_options,
1460 projection=self.projection, group_by=self.group_by)
1463 def gql(query_string, *args, **kwds):
1464 """Parse a GQL query string.
1466 Args:
1467 query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
1468 *args, **kwds: If present, used to call bind().
1470 Returns:
1471 An instance of query_class.
1473 qry = _gql(query_string)
1474 if args or kwds:
1475 qry = qry._bind(args, kwds)
1476 return qry
1479 @utils.positional(1)
1480 def _gql(query_string, query_class=Query):
1481 """Parse a GQL query string (internal version).
1483 Args:
1484 query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'.
1485 query_class: Optional class to use, default Query.
1487 Returns:
1488 An instance of query_class.
1490 from .google_imports import gql # Late import, to avoid name conflict.
1491 gql_qry = gql.GQL(query_string)
1492 kind = gql_qry.kind()
1493 if kind is None:
1494 # The query must be lacking a "FROM <kind>" class. Let Expando
1495 # stand in for the model class (it won't actually be used to
1496 # construct the results).
1497 modelclass = model.Expando
1498 else:
1499 modelclass = model.Model._kind_map.get(kind)
1500 if modelclass is None:
1501 # If the Adapter has a default model, use it; raise KindError otherwise.
1502 ctx = tasklets.get_context()
1503 modelclass = ctx._conn.adapter.default_model
1504 if modelclass is None:
1505 raise model.KindError(
1506 "No model class found for kind %r. Did you forget to import it?" %
1507 (kind,))
1508 else:
1509 # Adjust kind to the model class's kind (for PolyModel).
1510 kind = modelclass._get_kind()
1511 ancestor = None
1512 flt = gql_qry.filters()
1513 filters = list(modelclass._default_filters())
1514 for name_op in sorted(flt):
1515 name, op = name_op
1516 values = flt[name_op]
1517 op = op.lower()
1518 if op == 'is' and name == gql.GQL._GQL__ANCESTOR:
1519 if len(values) != 1:
1520 raise ValueError('"is" requires exactly one value')
1521 [(func, args)] = values
1522 ancestor = _args_to_val(func, args)
1523 continue
1524 if op not in _OPS:
1525 raise NotImplementedError('Operation %r is not supported.' % op)
1526 for (func, args) in values:
1527 val = _args_to_val(func, args)
1528 prop = _get_prop_from_modelclass(modelclass, name)
1529 if prop._name != name:
1530 raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
1531 'returned a property whose name is %r?!' %
1532 (modelclass.__name__, name, prop._name))
1533 if isinstance(val, ParameterizedThing):
1534 node = ParameterNode(prop, op, val)
1535 elif op == 'in':
1536 node = prop._IN(val)
1537 else:
1538 node = prop._comparison(op, val)
1539 filters.append(node)
1540 if filters:
1541 filters = ConjunctionNode(*filters)
1542 else:
1543 filters = None
1544 orders = _orderings_to_orders(gql_qry.orderings(), modelclass)
1545 offset = gql_qry.offset()
1546 limit = gql_qry.limit()
1547 if limit < 0:
1548 limit = None
1549 keys_only = gql_qry._keys_only
1550 if not keys_only:
1551 keys_only = None
1552 options = QueryOptions(offset=offset, limit=limit, keys_only=keys_only)
1553 projection = gql_qry.projection()
1554 if gql_qry.is_distinct():
1555 group_by = projection
1556 else:
1557 group_by = None
1558 qry = query_class(kind=kind,
1559 ancestor=ancestor,
1560 filters=filters,
1561 orders=orders,
1562 default_options=options,
1563 projection=projection,
1564 group_by=group_by)
1565 return qry
1568 class QueryIterator(object):
1569 """This iterator works both for synchronous and async callers!
1571 For synchronous callers, just use:
1573 for entity in Account.query():
1574 <use entity>
1576 Async callers use this idiom:
1578 it = iter(Account.query())
1579 while (yield it.has_next_async()):
1580 entity = it.next()
1581 <use entity>
1583 You can also use q.iter([options]) instead of iter(q); this allows
1584 passing query options such as keys_only or produce_cursors.
1586 When keys_only is set, it.next() returns a key instead of an entity.
1588 When produce_cursors is set, the methods it.cursor_before() and
1589 it.cursor_after() return Cursor objects corresponding to the query
1590 position just before and after the item returned by it.next().
1591 Before it.next() is called for the first time, both raise an
1592 exception. Once the loop is exhausted, both return the cursor after
1593 the last item returned. Calling it.has_next() does not affect the
1594 cursors; you must call it.next() before the cursors move. Note that
1595 sometimes requesting a cursor requires a datastore roundtrip (but
1596 not if you happen to request a cursor corresponding to a batch
1597 boundary). If produce_cursors is not set, both methods always raise
1598 an exception.
1600 Note that queries requiring in-memory merging of multiple queries
1601 (i.e. queries using the IN, != or OR operators) do not support query
1602 options.
1605 # When produce_cursors is set, _lookahead collects (batch, index)
1606 # pairs passed to _extended_callback(), and (_batch, _index)
1607 # contain the info pertaining to the current item.
1608 _lookahead = None
1609 _batch = None
1610 _index = None
1612 # Indicate the loop is exhausted.
1613 _exhausted = False
1615 @utils.positional(2)
1616 def __init__(self, query, **q_options):
1617 """Constructor. Takes a Query and query options.
1619 This is normally called by Query.iter() or Query.__iter__().
1621 ctx = tasklets.get_context()
1622 callback = None
1623 options = query._make_options(q_options)
1624 callback = self._extended_callback
1625 self._iter = ctx.iter_query(query,
1626 callback=callback,
1627 pass_batch_into_callback=True,
1628 options=options)
1629 self._fut = None
1631 def _extended_callback(self, batch, index, ent):
1632 if self._exhausted:
1633 raise RuntimeError('QueryIterator is already exhausted')
1634 # TODO: Make _lookup a deque.
1635 if self._lookahead is None:
1636 self._lookahead = []
1637 self._lookahead.append((batch, index))
1638 return ent
1640 def _consume_item(self):
1641 if self._lookahead:
1642 self._batch, self._index = self._lookahead.pop(0)
1643 else:
1644 self._batch = self._index = None
1646 def cursor_before(self):
1647 """Return the cursor before the current item.
1649 You must pass a QueryOptions object with produce_cursors=True
1650 for this to work.
1652 If there is no cursor or no current item, raise BadArgumentError.
1653 Before next() has returned there is no cursor. Once the loop is
1654 exhausted, this returns the cursor after the last item.
1656 if self._batch is None:
1657 raise datastore_errors.BadArgumentError('There is no cursor currently')
1658 # TODO: if cursor_after() was called for the previous item
1659 # reuse that result instead of computing it from scratch.
1660 # (Some cursor() calls make a datastore roundtrip.)
1661 # TODO: reimplement the cursor() call to use NDB async I/O;
1662 # perhaps even add async versions of cursor_before/after.
1663 return self._batch.cursor(self._index + self._exhausted)
1665 def cursor_after(self):
1666 """Return the cursor after the current item.
1668 You must pass a QueryOptions object with produce_cursors=True
1669 for this to work.
1671 If there is no cursor or no current item, raise BadArgumentError.
1672 Before next() has returned there is no cursor. Once the loop is
1673 exhausted, this returns the cursor after the last item.
1675 if self._batch is None:
1676 raise datastore_errors.BadArgumentError('There is no cursor currently')
1677 return self._batch.cursor(self._index + 1) # TODO: inline this as async.
1679 def index_list(self):
1680 """Return the list of indexes used for this query.
1682 This returns a list of index representations, where an index
1683 representation is the same as what is returned by get_indexes().
1685 Before the first result, the information is unavailable, and then
1686 None is returned. This is not the same as an empty list -- the
1687 empty list means that no index was used to execute the query. (In
1688 the dev_appserver, an empty list may also mean that only built-in
1689 indexes were used; metadata queries also return an empty list
1690 here.)
1692 Proper use is as follows:
1693 q = <modelclass>.query(<filters>)
1694 i = q.iter()
1695 try:
1696 i.next()
1697 except Stopiteration:
1698 pass
1699 indexes = i.index_list()
1700 assert isinstance(indexes, list)
1702 Notes:
1703 - Forcing produce_cursors=False makes this always return None.
1704 - This always returns None for a multi-query.
1706 # TODO: Technically it is possible to implement this for
1707 # multi-query by merging all the index lists from each subquery.
1708 # Return None if the batch has no attribute index_list.
1709 # This also applies when the batch itself is None.
1710 return getattr(self._batch, 'index_list', None)
1712 def __iter__(self):
1713 """Iterator protocol: get the iterator for this iterator, i.e. self."""
1714 return self
1716 def probably_has_next(self):
1717 """Return whether a next item is (probably) available.
1719 This is not quite the same as has_next(), because when
1720 produce_cursors is set, some shortcuts are possible. However, in
1721 some cases (e.g. when the query has a post_filter) we can get a
1722 false positive (returns True but next() will raise StopIteration).
1723 There are no false negatives, if Batch.more_results doesn't lie.
1725 if self._lookahead:
1726 return True
1727 if self._batch is not None:
1728 return self._batch.more_results
1729 return self.has_next()
1731 def has_next(self):
1732 """Return whether a next item is available.
1734 See the module docstring for the usage pattern.
1736 return self.has_next_async().get_result()
1738 @tasklets.tasklet
1739 def has_next_async(self):
1740 """Return a Future whose result will say whether a next item is available.
1742 See the module docstring for the usage pattern.
1744 if self._fut is None:
1745 self._fut = self._iter.getq()
1746 flag = True
1747 try:
1748 yield self._fut
1749 except EOFError:
1750 flag = False
1751 raise tasklets.Return(flag)
1753 def next(self):
1754 """Iterator protocol: get next item or raise StopIteration."""
1755 if self._fut is None:
1756 self._fut = self._iter.getq()
1757 try:
1758 try:
1759 ent = self._fut.get_result()
1760 self._consume_item()
1761 return ent
1762 except EOFError:
1763 self._exhausted = True
1764 raise StopIteration
1765 finally:
1766 self._fut = None
1769 class _SubQueryIteratorState(object):
1770 """Helper class for _MultiQuery."""
1772 def __init__(self, batch_i_entity, iterator, dsquery, orders):
1773 batch, index, entity = batch_i_entity
1774 self.batch = batch
1775 self.index = index
1776 self.entity = entity
1777 self.iterator = iterator
1778 self.dsquery = dsquery
1779 self.orders = orders
1781 def __cmp__(self, other):
1782 if not isinstance(other, _SubQueryIteratorState):
1783 raise NotImplementedError('Can only compare _SubQueryIteratorState '
1784 'instances to other _SubQueryIteratorState '
1785 'instances; not %r' % other)
1786 if not self.orders == other.orders:
1787 raise NotImplementedError('Cannot compare _SubQueryIteratorStates with '
1788 'differing orders (%r != %r)' %
1789 (self.orders, other.orders))
1790 lhs = self.entity._orig_pb
1791 rhs = other.entity._orig_pb
1792 lhs_filter = self.dsquery._filter_predicate
1793 rhs_filter = other.dsquery._filter_predicate
1794 names = self.orders._get_prop_names()
1795 # TODO: In some future version, there won't be a need to add the
1796 # filters' names.
1797 if lhs_filter is not None:
1798 names |= lhs_filter._get_prop_names()
1799 if rhs_filter is not None:
1800 names |= rhs_filter._get_prop_names()
1801 lhs_value_map = datastore_query._make_key_value_map(lhs, names)
1802 rhs_value_map = datastore_query._make_key_value_map(rhs, names)
1803 if lhs_filter is not None:
1804 lhs_filter._prune(lhs_value_map)
1805 if rhs_filter is not None:
1806 rhs_filter._prune(rhs_value_map)
1807 return self.orders._cmp(lhs_value_map, rhs_value_map)
1810 class _MultiQuery(object):
1811 """Helper class to run queries involving !=, IN or OR operators."""
1813 # This is not instantiated by the user directly, but implicitly when
1814 # iterating over a query with at least one filter using an IN, OR or
1815 # != operator. Note that some options must be interpreted by
1816 # _MultiQuery instead of passed to the underlying Queries' methods,
1817 # e.g. offset (though not necessarily limit, and I'm not sure about
1818 # cursors).
1820 # TODO: Need a way to specify the unification of two queries that
1821 # are identical except one has an ancestor and the other doesn't.
1822 # The HR datastore makes that a useful special case.
1824 def __init__(self, subqueries):
1825 if not isinstance(subqueries, list):
1826 raise TypeError('subqueries must be a list; received %r' % subqueries)
1827 for subq in subqueries:
1828 if not isinstance(subq, Query):
1829 raise TypeError('Each subquery must be a Query instances; received %r'
1830 % subq)
1831 first_subquery = subqueries[0]
1832 kind = first_subquery.kind
1833 orders = first_subquery.orders
1834 if not kind:
1835 raise ValueError('Subquery kind cannot be missing')
1836 for subq in subqueries[1:]:
1837 if subq.kind != kind:
1838 raise ValueError('Subqueries must be for a common kind (%s != %s)' %
1839 (subq.kind, kind))
1840 elif subq.orders != orders:
1841 raise ValueError('Subqueries must have the same order(s) (%s != %s)' %
1842 (subq.orders, orders))
1843 # TODO: Ensure that app and namespace match, when we support them.
1844 self.__subqueries = subqueries
1845 self.__orders = orders
1846 self.ancestor = None # Hack for map_query().
1848 def _make_options(self, q_options):
1849 return self.__subqueries[0].default_options
1851 @property
1852 def orders(self):
1853 return self.__orders
1855 @property
1856 def default_options(self):
1857 return self.__subqueries[0].default_options
1859 @tasklets.tasklet
1860 def run_to_queue(self, queue, conn, options=None):
1861 """Run this query, putting entities into the given queue."""
1862 if options is None:
1863 # Default options.
1864 offset = None
1865 limit = None
1866 keys_only = None
1867 else:
1868 # Capture options we need to simulate.
1869 offset = options.offset
1870 limit = options.limit
1871 keys_only = options.keys_only
1873 # Cursors are supported for certain orders only.
1874 if (options.start_cursor or options.end_cursor or
1875 options.produce_cursors):
1876 names = set()
1877 if self.__orders is not None:
1878 names = self.__orders._get_prop_names()
1879 if '__key__' not in names:
1880 raise datastore_errors.BadArgumentError(
1881 '_MultiQuery with cursors requires __key__ order')
1883 # Decide if we need to modify the options passed to subqueries.
1884 # NOTE: It would seem we can sometimes let the datastore handle
1885 # the offset natively, but this would thwart the duplicate key
1886 # detection, so we always have to emulate the offset here.
1887 # We can set the limit we pass along to offset + limit though,
1888 # since that is the maximum number of results from a single
1889 # subquery we will ever have to consider.
1890 modifiers = {}
1891 if offset:
1892 modifiers['offset'] = None
1893 if limit is not None:
1894 modifiers['limit'] = min(_MAX_LIMIT, offset + limit)
1895 if keys_only and self.__orders is not None:
1896 modifiers['keys_only'] = None
1897 if modifiers:
1898 options = QueryOptions(config=options, **modifiers)
1900 if offset is None:
1901 offset = 0
1903 if limit is None:
1904 limit = _MAX_LIMIT
1906 if self.__orders is None:
1907 # Run the subqueries sequentially; there is no order to keep.
1908 keys_seen = set()
1909 for subq in self.__subqueries:
1910 if limit <= 0:
1911 break
1912 subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[ser]')
1913 subq.run_to_queue(subit, conn, options=options)
1914 while limit > 0:
1915 try:
1916 batch, index, result = yield subit.getq()
1917 except EOFError:
1918 break
1919 if keys_only:
1920 key = result
1921 else:
1922 key = result._key
1923 if key not in keys_seen:
1924 keys_seen.add(key)
1925 if offset > 0:
1926 offset -= 1
1927 else:
1928 limit -= 1
1929 queue.putq((None, None, result))
1930 queue.complete()
1931 return
1933 # This with-statement causes the adapter to set _orig_pb on all
1934 # entities it converts from protobuf.
1935 # TODO: Does this interact properly with the cache?
1936 with conn.adapter:
1937 # Start running all the sub-queries.
1938 todo = [] # List of (subit, dsquery) tuples.
1939 for subq in self.__subqueries:
1940 dsquery = subq._get_query(conn)
1941 subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[par]')
1942 subq.run_to_queue(subit, conn, options=options, dsquery=dsquery)
1943 todo.append((subit, dsquery))
1945 # Create a list of (first-entity, subquery-iterator) tuples.
1946 state = [] # List of _SubQueryIteratorState instances.
1947 for subit, dsquery in todo:
1948 try:
1949 thing = yield subit.getq()
1950 except EOFError:
1951 continue
1952 else:
1953 state.append(_SubQueryIteratorState(thing, subit, dsquery,
1954 self.__orders))
1956 # Now turn it into a sorted heap. The heapq module claims that
1957 # calling heapify() is more efficient than calling heappush() for
1958 # each item.
1959 heapq.heapify(state)
1961 # Repeatedly yield the lowest entity from the state vector,
1962 # filtering duplicates. This is essentially a multi-way merge
1963 # sort. One would think it should be possible to filter
1964 # duplicates simply by dropping other entities already in the
1965 # state vector that are equal to the lowest entity, but because of
1966 # the weird sorting of repeated properties, we have to explicitly
1967 # keep a set of all keys, so we can remove later occurrences.
1968 # Note that entities will still be sorted correctly, within the
1969 # constraints given by the sort order.
1970 keys_seen = set()
1971 while state and limit > 0:
1972 item = heapq.heappop(state)
1973 batch = item.batch
1974 index = item.index
1975 entity = item.entity
1976 key = entity._key
1977 if key not in keys_seen:
1978 keys_seen.add(key)
1979 if offset > 0:
1980 offset -= 1
1981 else:
1982 limit -= 1
1983 if keys_only:
1984 queue.putq((batch, index, key))
1985 else:
1986 queue.putq((batch, index, entity))
1987 subit = item.iterator
1988 try:
1989 batch, index, entity = yield subit.getq()
1990 except EOFError:
1991 pass
1992 else:
1993 item.batch = batch
1994 item.index = index
1995 item.entity = entity
1996 heapq.heappush(state, item)
1997 queue.complete()
1999 # Datastore API using the default context.
2001 def iter(self, **q_options):
2002 return QueryIterator(self, **q_options)
2004 __iter__ = iter
2006 # TODO: Add fetch() etc.?
2009 # Helper functions to convert between orders and orderings. An order
2010 # is a datastore_query.Order instance. An ordering is a
2011 # (property_name, direction) tuple.
2013 def _order_to_ordering(order):
2014 pb = order._to_pb()
2015 return pb.property(), pb.direction() # TODO: What about UTF-8?
2018 def _orders_to_orderings(orders):
2019 if orders is None:
2020 return []
2021 if isinstance(orders, datastore_query.PropertyOrder):
2022 return [_order_to_ordering(orders)]
2023 if isinstance(orders, datastore_query.CompositeOrder):
2024 # TODO: What about UTF-8?
2025 return [(pb.property(), pb.direction()) for pb in orders._to_pbs()]
2026 raise ValueError('Bad order: %r' % (orders,))
2029 def _ordering_to_order(ordering, modelclass):
2030 name, direction = ordering
2031 prop = _get_prop_from_modelclass(modelclass, name)
2032 if prop._name != name:
2033 raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) '
2034 'returned a property whose name is %r?!' %
2035 (modelclass.__name__, name, prop._name))
2036 return datastore_query.PropertyOrder(name, direction)
2039 def _orderings_to_orders(orderings, modelclass):
2040 orders = [_ordering_to_order(o, modelclass) for o in orderings]
2041 if not orders:
2042 return None
2043 if len(orders) == 1:
2044 return orders[0]
2045 return datastore_query.CompositeOrder(orders)