App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / datastore_range_iterators.py
blobd1980b260c7d71e92e1d00c7445070bf243492c9
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Helpers iterators for input_readers.DatastoreInputReader."""
22 from google.appengine.datastore import datastore_query
23 from google.appengine.datastore import datastore_rpc
24 from google.appengine.ext import db
25 from google.appengine.ext import key_range
26 from google.appengine.ext.mapreduce import json_util
27 from google.appengine.ext.mapreduce import key_ranges
28 from google.appengine.ext.mapreduce import model
29 from google.appengine.ext.mapreduce import namespace_range
30 from google.appengine.ext.mapreduce import property_range
31 from google.appengine.ext.mapreduce import util
34 __all__ = [
35 "RangeIteratorFactory",
36 "RangeIterator",
37 "AbstractKeyRangeIterator",
38 "KeyRangeModelIterator",
39 "KeyRangeEntityIterator",
40 "KeyRangeKeyIterator",
41 "KeyRangeEntityProtoIterator"]
44 class RangeIteratorFactory(object):
45 """Factory to create RangeIterators."""
47 @classmethod
48 def create_property_range_iterator(cls,
49 p_range,
50 ns_range,
51 query_spec):
52 """Create a RangeIterator.
54 Args:
55 p_range: a property_range.PropertyRange object that defines the
56 conditions entities should safisfy.
57 ns_range: a namesrange.NamespaceRange object that defines the namespaces
58 to examine.
59 query_spec: a model.QuerySpec object that defines how to retrieve
60 entities from datastore.
62 Returns:
63 a RangeIterator.
64 """
65 return _PropertyRangeModelIterator(p_range,
66 ns_range,
67 query_spec)
69 @classmethod
70 def create_key_ranges_iterator(cls,
71 k_ranges,
72 query_spec,
73 key_range_iter_cls):
74 """Create a RangeIterator.
76 Args:
77 k_ranges: a key_ranges._KeyRanges object.
78 query_spec: a model.query_spec object that defines how to retrieve
79 entities from datastore.
80 key_range_iter_cls: the class that iterates over a single key range.
81 The value yielded by this class is yielded.
83 Returns:
84 a RangeIterator.
85 """
86 return _KeyRangesIterator(k_ranges, query_spec, key_range_iter_cls)
88 @classmethod
89 def from_json(cls, json):
90 return _RANGE_ITERATORS[json["name"]].from_json(json)
93 class RangeIterator(json_util.JsonMixin):
94 """Interface for DatastoreInputReader helper iterators.
96 RangeIterator defines Python's generator interface and additional
97 marshaling functionality. Marshaling saves the state of the generator.
98 Unmarshaling guarantees any new generator created can resume where the
99 old generator left off. When the produced generator raises StopIteration,
100 the behavior of marshaling/unmarshaling is NOT defined.
103 def __iter__(self):
104 """Iter.
106 Yields:
107 Iterates over datastore entities and yields some kind of value
108 for each entity.
110 raise NotImplementedError()
112 def __repr__(self):
113 raise NotImplementedError()
115 def to_json(self):
116 """Serializes all states into json form.
118 Returns:
119 all states in json-compatible map.
121 raise NotImplementedError()
123 @classmethod
124 def from_json(cls, json):
125 """Reverse of to_json."""
126 raise NotImplementedError()
129 class _PropertyRangeModelIterator(RangeIterator):
130 """Yields db/ndb model entities within a property range."""
132 def __init__(self, p_range, ns_range, query_spec):
133 """Init.
135 Args:
136 p_range: a property_range.PropertyRange object that defines the
137 conditions entities should safisfy.
138 ns_range: a namesrange.NamespaceRange object that defines the namespaces
139 to examine.
140 query_spec: a model.QuerySpec object that defines how to retrieve
141 entities from datastore.
143 self._property_range = p_range
144 self._ns_range = ns_range
145 self._query_spec = query_spec
146 self._cursor = None
147 self._query = None
149 def __repr__(self):
150 return "PropertyRangeIterator for %s" % str(self._property_range)
152 def __iter__(self):
153 """Iterate over entities.
155 Yields:
156 db model entities or ndb model entities if the model is defined with ndb.
158 for ns in self._ns_range:
159 self._query = self._property_range.make_query(ns)
160 if isinstance(self._query, db.Query):
161 if self._cursor:
162 self._query.with_cursor(self._cursor)
163 for model_instance in self._query.run(
164 batch_size=self._query_spec.batch_size,
165 keys_only=self._query_spec.keys_only):
166 yield model_instance
167 else:
168 self._query = self._query.iter(batch_size=self._query_spec.batch_size,
169 keys_only=self._query_spec.keys_only,
170 start_cursor=self._cursor,
171 produce_cursors=True)
172 for model_instance in self._query:
173 yield model_instance
174 self._cursor = None
175 if ns != self._ns_range.namespace_end:
176 self._ns_range = self._ns_range.with_start_after(ns)
178 def to_json(self):
179 """Inherit doc."""
180 cursor = self._cursor
181 if self._query is not None:
182 if isinstance(self._query, db.Query):
183 cursor = self._query.cursor()
184 else:
185 cursor = self._query.cursor_after()
187 if isinstance(cursor, basestring):
188 cursor_object = False
189 else:
190 cursor_object = True
191 cursor = cursor.to_websafe_string()
193 return {"property_range": self._property_range.to_json(),
194 "query_spec": self._query_spec.to_json(),
195 "cursor": cursor,
196 "ns_range": self._ns_range.to_json_object(),
197 "name": self.__class__.__name__,
198 "cursor_object": cursor_object}
204 @classmethod
205 def from_json(cls, json):
206 """Inherit doc."""
207 obj = cls(property_range.PropertyRange.from_json(json["property_range"]),
208 namespace_range.NamespaceRange.from_json_object(json["ns_range"]),
209 model.QuerySpec.from_json(json["query_spec"]))
210 cursor = json["cursor"]
213 if cursor and json["cursor_object"]:
214 obj._cursor = datastore_query.Cursor.from_websafe_string(cursor)
215 else:
216 obj._cursor = cursor
217 return obj
220 class _KeyRangesIterator(RangeIterator):
221 """Create an iterator over a key_ranges.KeyRanges object."""
223 def __init__(self,
224 k_ranges,
225 query_spec,
226 key_range_iter_cls):
227 """Init.
229 Args:
230 k_ranges: a key_ranges._KeyRanges object.
231 query_spec: a model.query_spec object that defines how to retrieve
232 entities from datastore.
233 key_range_iter_cls: the class that iterates over a single key range.
234 The value yielded by this class is yielded.
236 self._key_ranges = k_ranges
237 self._query_spec = query_spec
238 self._key_range_iter_cls = key_range_iter_cls
239 self._current_iter = None
240 self._current_key_range = None
242 def __repr__(self):
243 return "KeyRangesIterator for %s" % str(self._key_ranges)
245 def __iter__(self):
246 while True:
247 if self._current_iter:
248 for o in self._current_iter:
249 yield o
251 try:
252 k_range = self._key_ranges.next()
253 self._current_iter = self._key_range_iter_cls(k_range,
254 self._query_spec)
255 except StopIteration:
256 self._current_iter = None
257 break
259 def to_json(self):
260 """Inherit doc."""
261 current_iter = None
262 if self._current_iter:
263 current_iter = self._current_iter.to_json()
265 return {"key_ranges": self._key_ranges.to_json(),
266 "query_spec": self._query_spec.to_json(),
267 "current_iter": current_iter,
268 "key_range_iter_cls": self._key_range_iter_cls.__name__,
269 "name": self.__class__.__name__}
271 @classmethod
272 def from_json(cls, json):
273 """Inherit doc."""
274 key_range_iter_cls = _KEY_RANGE_ITERATORS[json["key_range_iter_cls"]]
275 obj = cls(key_ranges.KeyRangesFactory.from_json(json["key_ranges"]),
276 model.QuerySpec.from_json(json["query_spec"]),
277 key_range_iter_cls)
279 current_iter = None
280 if json["current_iter"]:
281 current_iter = key_range_iter_cls.from_json(json["current_iter"])
282 obj._current_iter = current_iter
283 return obj
287 _RANGE_ITERATORS = {
288 _PropertyRangeModelIterator.__name__: _PropertyRangeModelIterator,
289 _KeyRangesIterator.__name__: _KeyRangesIterator
293 class AbstractKeyRangeIterator(json_util.JsonMixin):
294 """Iterates over a single key_range.KeyRange and yields value for each key."""
296 def __init__(self, k_range, query_spec):
297 """Init.
299 Args:
300 k_range: a key_range.KeyRange object that defines the entity keys to
301 operate on. KeyRange object already contains a namespace.
302 query_spec: a model.query_spec object that defines how to retrieve
303 entities from datastore.
305 self._key_range = k_range
306 self._query_spec = query_spec
307 self._cursor = None
308 self._query = None
310 def __iter__(self):
311 """Iter."""
312 raise NotImplementedError()
314 def _get_cursor(self):
315 """Get cursor on current query iterator for serialization."""
316 raise NotImplementedError()
318 def to_json(self):
319 """Serializes all states into json form.
321 Returns:
322 all states in json-compatible map.
324 cursor = self._get_cursor()
325 cursor_object = False
326 if cursor and isinstance(cursor, datastore_query.Cursor):
327 cursor = cursor.to_websafe_string()
328 cursor_object = True
329 return {"key_range": self._key_range.to_json(),
330 "query_spec": self._query_spec.to_json(),
331 "cursor": cursor,
332 "cursor_object": cursor_object}
334 @classmethod
335 def from_json(cls, json):
336 """Reverse of to_json."""
337 obj = cls(key_range.KeyRange.from_json(json["key_range"]),
338 model.QuerySpec.from_json(json["query_spec"]))
339 cursor = json["cursor"]
342 if cursor and json["cursor_object"]:
343 obj._cursor = datastore_query.Cursor.from_websafe_string(cursor)
344 else:
345 obj._cursor = cursor
346 return obj
349 class KeyRangeModelIterator(AbstractKeyRangeIterator):
350 """Yields db/ndb model entities with a key range."""
352 def __iter__(self):
353 self._query = self._key_range.make_ascending_query(
354 util.for_name(self._query_spec.model_class_path),
355 filters=self._query_spec.filters)
357 if isinstance(self._query, db.Query):
358 if self._cursor:
359 self._query.with_cursor(self._cursor)
360 for model_instance in self._query.run(
361 batch_size=self._query_spec.batch_size,
362 keys_only=self._query_spec.keys_only):
363 yield model_instance
364 else:
365 self._query = self._query.iter(batch_size=self._query_spec.batch_size,
366 keys_only=self._query_spec.keys_only,
367 start_cursor=self._cursor,
368 produce_cursors=True)
369 for model_instance in self._query:
370 yield model_instance
372 def _get_cursor(self):
373 if self._query is None:
374 return self._cursor
376 if isinstance(self._query, db.Query):
377 return self._query.cursor()
378 else:
379 return self._query.cursor_after()
382 class KeyRangeEntityIterator(AbstractKeyRangeIterator):
383 """Yields datastore.Entity type within a key range."""
385 _KEYS_ONLY = False
387 def __iter__(self):
388 self._query = self._key_range.make_ascending_datastore_query(
389 self._query_spec.entity_kind, filters=self._query_spec.filters)
390 for entity in self._query.Run(config=datastore_query.QueryOptions(
391 batch_size=self._query_spec.batch_size,
392 keys_only=self._KEYS_ONLY,
393 start_cursor=self._cursor)):
394 yield entity
396 def _get_cursor(self):
397 if self._query is None:
398 return self._cursor
399 return self._query.GetCursor()
402 class KeyRangeKeyIterator(KeyRangeEntityIterator):
403 """Yields datastore.Key type within a key range."""
405 _KEYS_ONLY = True
408 class KeyRangeEntityProtoIterator(AbstractKeyRangeIterator):
409 """Yields datastore.Entity's raw proto within a key range."""
411 def __iter__(self):
412 query = self._key_range.make_ascending_datastore_query(
413 self._query_spec.entity_kind, filters=self._query_spec.filters)
415 connection = datastore_rpc.Connection()
416 query_options = datastore_query.QueryOptions(
417 batch_size=self._query_spec.batch_size,
418 start_cursor=self._cursor,
419 produce_cursors=True)
424 self._query = datastore_query.ResultsIterator(
425 query.GetQuery().run(connection, query_options))
426 for entity_proto in self._query:
427 yield entity_proto
429 def _get_cursor(self):
430 if self._query is None:
431 return self._cursor
432 return self._query.cursor()
438 _KEY_RANGE_ITERATORS = {
439 KeyRangeModelIterator.__name__: KeyRangeModelIterator,
440 KeyRangeEntityIterator.__name__: KeyRangeEntityIterator,
441 KeyRangeKeyIterator.__name__: KeyRangeKeyIterator,
442 KeyRangeEntityProtoIterator.__name__: KeyRangeEntityProtoIterator