3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Helpers iterators for input_readers.DatastoreInputReader."""
22 from google
.appengine
.datastore
import datastore_query
23 from google
.appengine
.datastore
import datastore_rpc
24 from google
.appengine
.ext
import db
25 from google
.appengine
.ext
import key_range
26 from google
.appengine
.ext
.mapreduce
import json_util
27 from google
.appengine
.ext
.mapreduce
import key_ranges
28 from google
.appengine
.ext
.mapreduce
import model
29 from google
.appengine
.ext
.mapreduce
import namespace_range
30 from google
.appengine
.ext
.mapreduce
import property_range
31 from google
.appengine
.ext
.mapreduce
import util
35 "RangeIteratorFactory",
37 "AbstractKeyRangeIterator",
38 "KeyRangeModelIterator",
39 "KeyRangeEntityIterator",
40 "KeyRangeKeyIterator",
41 "KeyRangeEntityProtoIterator"]
44 class RangeIteratorFactory(object):
45 """Factory to create RangeIterators."""
48 def create_property_range_iterator(cls
,
52 """Create a RangeIterator.
55 p_range: a property_range.PropertyRange object that defines the
56 conditions entities should safisfy.
57 ns_range: a namesrange.NamespaceRange object that defines the namespaces
59 query_spec: a model.QuerySpec object that defines how to retrieve
60 entities from datastore.
65 return _PropertyRangeModelIterator(p_range
,
70 def create_key_ranges_iterator(cls
,
74 """Create a RangeIterator.
77 k_ranges: a key_ranges._KeyRanges object.
78 query_spec: a model.query_spec object that defines how to retrieve
79 entities from datastore.
80 key_range_iter_cls: the class that iterates over a single key range.
81 The value yielded by this class is yielded.
86 return _KeyRangesIterator(k_ranges
, query_spec
, key_range_iter_cls
)
89 def from_json(cls
, json
):
90 return _RANGE_ITERATORS
[json
["name"]].from_json(json
)
93 class RangeIterator(json_util
.JsonMixin
):
94 """Interface for DatastoreInputReader helper iterators.
96 RangeIterator defines Python's generator interface and additional
97 marshaling functionality. Marshaling saves the state of the generator.
98 Unmarshaling guarantees any new generator created can resume where the
99 old generator left off. When the produced generator raises StopIteration,
100 the behavior of marshaling/unmarshaling is NOT defined.
107 Iterates over datastore entities and yields some kind of value
110 raise NotImplementedError()
113 raise NotImplementedError()
116 """Serializes all states into json form.
119 all states in json-compatible map.
121 raise NotImplementedError()
124 def from_json(cls
, json
):
125 """Reverse of to_json."""
126 raise NotImplementedError()
129 class _PropertyRangeModelIterator(RangeIterator
):
130 """Yields db/ndb model entities within a property range."""
132 def __init__(self
, p_range
, ns_range
, query_spec
):
136 p_range: a property_range.PropertyRange object that defines the
137 conditions entities should safisfy.
138 ns_range: a namesrange.NamespaceRange object that defines the namespaces
140 query_spec: a model.QuerySpec object that defines how to retrieve
141 entities from datastore.
143 self
._property
_range
= p_range
144 self
._ns
_range
= ns_range
145 self
._query
_spec
= query_spec
150 return "PropertyRangeIterator for %s" % str(self
._property
_range
)
153 """Iterate over entities.
156 db model entities or ndb model entities if the model is defined with ndb.
158 for ns
in self
._ns
_range
:
159 self
._query
= self
._property
_range
.make_query(ns
)
160 if isinstance(self
._query
, db
.Query
):
162 self
._query
.with_cursor(self
._cursor
)
163 for model_instance
in self
._query
.run(
164 batch_size
=self
._query
_spec
.batch_size
,
165 keys_only
=self
._query
_spec
.keys_only
):
168 self
._query
= self
._query
.iter(batch_size
=self
._query
_spec
.batch_size
,
169 keys_only
=self
._query
_spec
.keys_only
,
170 start_cursor
=self
._cursor
,
171 produce_cursors
=True)
172 for model_instance
in self
._query
:
175 if ns
!= self
._ns
_range
.namespace_end
:
176 self
._ns
_range
= self
._ns
_range
.with_start_after(ns
)
180 cursor
= self
._cursor
181 if self
._query
is not None:
182 if isinstance(self
._query
, db
.Query
):
183 cursor
= self
._query
.cursor()
185 cursor
= self
._query
.cursor_after()
187 if isinstance(cursor
, basestring
):
188 cursor_object
= False
191 cursor
= cursor
.to_websafe_string()
193 return {"property_range": self
._property
_range
.to_json(),
194 "query_spec": self
._query
_spec
.to_json(),
196 "ns_range": self
._ns
_range
.to_json_object(),
197 "name": self
.__class
__.__name
__,
198 "cursor_object": cursor_object
}
205 def from_json(cls
, json
):
207 obj
= cls(property_range
.PropertyRange
.from_json(json
["property_range"]),
208 namespace_range
.NamespaceRange
.from_json_object(json
["ns_range"]),
209 model
.QuerySpec
.from_json(json
["query_spec"]))
210 cursor
= json
["cursor"]
213 if cursor
and json
["cursor_object"]:
214 obj
._cursor
= datastore_query
.Cursor
.from_websafe_string(cursor
)
220 class _KeyRangesIterator(RangeIterator
):
221 """Create an iterator over a key_ranges.KeyRanges object."""
230 k_ranges: a key_ranges._KeyRanges object.
231 query_spec: a model.query_spec object that defines how to retrieve
232 entities from datastore.
233 key_range_iter_cls: the class that iterates over a single key range.
234 The value yielded by this class is yielded.
236 self
._key
_ranges
= k_ranges
237 self
._query
_spec
= query_spec
238 self
._key
_range
_iter
_cls
= key_range_iter_cls
239 self
._current
_iter
= None
240 self
._current
_key
_range
= None
243 return "KeyRangesIterator for %s" % str(self
._key
_ranges
)
247 if self
._current
_iter
:
248 for o
in self
._current
_iter
:
252 k_range
= self
._key
_ranges
.next()
253 self
._current
_iter
= self
._key
_range
_iter
_cls
(k_range
,
255 except StopIteration:
256 self
._current
_iter
= None
262 if self
._current
_iter
:
263 current_iter
= self
._current
_iter
.to_json()
265 return {"key_ranges": self
._key
_ranges
.to_json(),
266 "query_spec": self
._query
_spec
.to_json(),
267 "current_iter": current_iter
,
268 "key_range_iter_cls": self
._key
_range
_iter
_cls
.__name
__,
269 "name": self
.__class
__.__name
__}
272 def from_json(cls
, json
):
274 key_range_iter_cls
= _KEY_RANGE_ITERATORS
[json
["key_range_iter_cls"]]
275 obj
= cls(key_ranges
.KeyRangesFactory
.from_json(json
["key_ranges"]),
276 model
.QuerySpec
.from_json(json
["query_spec"]),
280 if json
["current_iter"]:
281 current_iter
= key_range_iter_cls
.from_json(json
["current_iter"])
282 obj
._current
_iter
= current_iter
288 _PropertyRangeModelIterator
.__name
__: _PropertyRangeModelIterator
,
289 _KeyRangesIterator
.__name
__: _KeyRangesIterator
293 class AbstractKeyRangeIterator(json_util
.JsonMixin
):
294 """Iterates over a single key_range.KeyRange and yields value for each key."""
296 def __init__(self
, k_range
, query_spec
):
300 k_range: a key_range.KeyRange object that defines the entity keys to
301 operate on. KeyRange object already contains a namespace.
302 query_spec: a model.query_spec object that defines how to retrieve
303 entities from datastore.
305 self
._key
_range
= k_range
306 self
._query
_spec
= query_spec
312 raise NotImplementedError()
314 def _get_cursor(self
):
315 """Get cursor on current query iterator for serialization."""
316 raise NotImplementedError()
319 """Serializes all states into json form.
322 all states in json-compatible map.
324 cursor
= self
._get
_cursor
()
325 cursor_object
= False
326 if cursor
and isinstance(cursor
, datastore_query
.Cursor
):
327 cursor
= cursor
.to_websafe_string()
329 return {"key_range": self
._key
_range
.to_json(),
330 "query_spec": self
._query
_spec
.to_json(),
332 "cursor_object": cursor_object
}
335 def from_json(cls
, json
):
336 """Reverse of to_json."""
337 obj
= cls(key_range
.KeyRange
.from_json(json
["key_range"]),
338 model
.QuerySpec
.from_json(json
["query_spec"]))
339 cursor
= json
["cursor"]
342 if cursor
and json
["cursor_object"]:
343 obj
._cursor
= datastore_query
.Cursor
.from_websafe_string(cursor
)
349 class KeyRangeModelIterator(AbstractKeyRangeIterator
):
350 """Yields db/ndb model entities with a key range."""
353 self
._query
= self
._key
_range
.make_ascending_query(
354 util
.for_name(self
._query
_spec
.model_class_path
),
355 filters
=self
._query
_spec
.filters
)
357 if isinstance(self
._query
, db
.Query
):
359 self
._query
.with_cursor(self
._cursor
)
360 for model_instance
in self
._query
.run(
361 batch_size
=self
._query
_spec
.batch_size
,
362 keys_only
=self
._query
_spec
.keys_only
):
365 self
._query
= self
._query
.iter(batch_size
=self
._query
_spec
.batch_size
,
366 keys_only
=self
._query
_spec
.keys_only
,
367 start_cursor
=self
._cursor
,
368 produce_cursors
=True)
369 for model_instance
in self
._query
:
372 def _get_cursor(self
):
373 if self
._query
is None:
376 if isinstance(self
._query
, db
.Query
):
377 return self
._query
.cursor()
379 return self
._query
.cursor_after()
382 class KeyRangeEntityIterator(AbstractKeyRangeIterator
):
383 """Yields datastore.Entity type within a key range."""
388 self
._query
= self
._key
_range
.make_ascending_datastore_query(
389 self
._query
_spec
.entity_kind
, filters
=self
._query
_spec
.filters
)
390 for entity
in self
._query
.Run(config
=datastore_query
.QueryOptions(
391 batch_size
=self
._query
_spec
.batch_size
,
392 keys_only
=self
._KEYS
_ONLY
,
393 start_cursor
=self
._cursor
)):
396 def _get_cursor(self
):
397 if self
._query
is None:
399 return self
._query
.GetCursor()
402 class KeyRangeKeyIterator(KeyRangeEntityIterator
):
403 """Yields datastore.Key type within a key range."""
408 class KeyRangeEntityProtoIterator(AbstractKeyRangeIterator
):
409 """Yields datastore.Entity's raw proto within a key range."""
412 query
= self
._key
_range
.make_ascending_datastore_query(
413 self
._query
_spec
.entity_kind
, filters
=self
._query
_spec
.filters
)
415 connection
= datastore_rpc
.Connection()
416 query_options
= datastore_query
.QueryOptions(
417 batch_size
=self
._query
_spec
.batch_size
,
418 start_cursor
=self
._cursor
,
419 produce_cursors
=True)
424 self
._query
= datastore_query
.ResultsIterator(
425 query
.GetQuery().run(connection
, query_options
))
426 for entity_proto
in self
._query
:
429 def _get_cursor(self
):
430 if self
._query
is None:
432 return self
._query
.cursor()
438 _KEY_RANGE_ITERATORS
= {
439 KeyRangeModelIterator
.__name
__: KeyRangeModelIterator
,
440 KeyRangeEntityIterator
.__name
__: KeyRangeEntityIterator
,
441 KeyRangeKeyIterator
.__name
__: KeyRangeKeyIterator
,
442 KeyRangeEntityProtoIterator
.__name
__: KeyRangeEntityProtoIterator