3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Defines input readers for MapReduce."""
37 "AbstractDatastoreInputReader",
39 "BadReaderParamsError",
40 "BlobstoreLineInputReader",
41 "BlobstoreZipInputReader",
42 "BlobstoreZipLineInputReader",
43 "COUNTER_IO_READ_BYTES",
44 "COUNTER_IO_READ_MSEC",
45 "ConsistentKeyReader",
46 "DatastoreEntityInputReader",
47 "DatastoreInputReader",
48 "DatastoreKeyInputReader",
50 "RandomStringInputReader",
54 "NamespaceInputReader",
69 from google
.net
.proto
import ProtocolBuffer
71 from google
.appengine
.ext
import ndb
74 from google
.appengine
.api
import datastore
75 from google
.appengine
.api
import files
76 from google
.appengine
.api
import logservice
77 from google
.appengine
.api
.files
import records
78 from google
.appengine
.api
.logservice
import log_service_pb
79 from google
.appengine
.datastore
import datastore_query
80 from google
.appengine
.datastore
import datastore_rpc
81 from google
.appengine
.ext
import blobstore
82 from google
.appengine
.ext
import db
83 from google
.appengine
.ext
import key_range
84 from google
.appengine
.ext
.db
import metadata
85 from google
.appengine
.ext
.mapreduce
import context
86 from google
.appengine
.ext
.mapreduce
import errors
87 from google
.appengine
.ext
.mapreduce
import file_format_parser
88 from google
.appengine
.ext
.mapreduce
import file_format_root
89 from google
.appengine
.ext
.mapreduce
import model
90 from google
.appengine
.ext
.mapreduce
import namespace_range
91 from google
.appengine
.ext
.mapreduce
import operation
92 from google
.appengine
.ext
.mapreduce
import util
97 BadReaderParamsError
= errors
.BadReaderParamsError
101 COUNTER_IO_READ_BYTES
= "io-read-bytes"
104 COUNTER_IO_READ_MSEC
= "io-read-msec"
109 ALLOW_CHECKPOINT
= object()
112 class InputReader(model
.JsonMixin
):
113 """Abstract base class for input readers.
115 InputReaders have the following properties:
116 * They are created by using the split_input method to generate a set of
117 InputReaders from a MapperSpec.
118 * They generate inputs to the mapper via the iterator interface.
119 * After creation, they can be serialized and resumed using the JsonMixin
121 * They are cast to string for a user-readable description; it may be
122 valuable to implement __str__.
128 expand_parameters
= False
132 NAMESPACE_PARAM
= "namespace"
133 NAMESPACES_PARAM
= "namespaces"
139 """Returns the next input from this input reader as a key, value pair.
142 The next input from this input reader.
144 raise NotImplementedError("next() not implemented in %s" % self
.__class
__)
147 def from_json(cls
, input_shard_state
):
148 """Creates an instance of the InputReader for the given input shard state.
151 input_shard_state: The InputReader state as a dict-like object.
154 An instance of the InputReader configured using the values of json.
156 raise NotImplementedError("from_json() not implemented in %s" % cls
)
159 """Returns an input shard state for the remaining inputs.
162 A json-izable version of the remaining InputReader.
164 raise NotImplementedError("to_json() not implemented in %s" %
168 def split_input(cls
, mapper_spec
):
169 """Returns a list of input readers for the input spec.
172 mapper_spec: The MapperSpec for this InputReader.
175 A list of InputReaders.
177 raise NotImplementedError("split_input() not implemented in %s" % cls
)
180 def validate(cls
, mapper_spec
):
181 """Validates mapper spec and all mapper parameters.
183 Input reader parameters are expected to be passed as "input_reader"
184 subdictionary of mapper_spec.params. To be compatible with previous
185 API input reader is advised to check mapper_spec.params and issue
186 a warning if "input_reader" subdicationary is not present.
187 _get_params helper method can be used to simplify implementation.
190 mapper_spec: The MapperSpec for this InputReader.
193 BadReaderParamsError: required parameters are missing or invalid.
195 raise NotImplementedError("validate() not implemented in %s" % cls
)
198 def _get_params(mapper_spec
, allowed_keys
=None):
199 """Obtain input reader parameters.
201 Utility function for input readers implementation. Fetches parameters
202 from mapreduce specification giving appropriate usage warnings.
205 mapper_spec: The MapperSpec for the job
206 allowed_keys: set of all allowed keys in parameters as strings. If it is not
207 None, then parameters are expected to be in a separate "input_reader"
208 subdictionary of mapper_spec parameters.
211 mapper parameters as dict
214 BadReaderParamsError: if parameters are invalid/missing or not allowed.
216 if "input_reader" not in mapper_spec
.params
:
217 message
= ("Input reader's parameters should be specified in "
218 "input_reader subdictionary.")
220 raise errors
.BadReaderParamsError(message
)
221 params
= mapper_spec
.params
222 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
224 if not isinstance(mapper_spec
.params
.get("input_reader"), dict):
225 raise errors
.BadReaderParamsError(
226 "Input reader parameters should be a dictionary")
227 params
= mapper_spec
.params
.get("input_reader")
228 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
230 params_diff
= set(params
.keys()) - allowed_keys
232 raise errors
.BadReaderParamsError(
233 "Invalid input_reader parameters: %s" % ",".join(params_diff
))
237 class FileInputReader(InputReader
):
238 """Reader to read Files API files of user specified format.
240 This class currently only supports Google Storage files. It will be extended
241 to support blobstore files in the future.
244 files: a list of filenames or filename patterns.
245 filename must be of format '/gs/bucket/filename'.
246 filename pattern has format '/gs/bucket/prefix*'.
247 filename pattern will be expanded to filenames with the given prefix.
248 Please see parseGlob in the file api.files.gs.py which is included in the
249 App Engine SDK for supported patterns.
252 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
253 includes "file1", all files under bucket2, and files under bucket3 with
254 a prefix "p" in its name.
256 format: format string determines what your map function gets as its input.
257 format string can be "lines", "bytes", "zip", or a cascade of them plus
258 optional parameters. See file_formats.FORMATS for all supported formats.
259 See file_format_parser._FileFormatParser for format string syntax.
262 "lines": your map function gets files' contents line by line.
263 "bytes": your map function gets files' contents entirely.
264 "zip": InputReader unzips files and feeds your map function each of
265 the archive's member files as a whole.
266 "zip[bytes]: same as above.
267 "zip[lines]": InputReader unzips files and feeds your map function
268 files' contents line by line.
269 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
270 file with utf32 encoding and feeds your map function line by line.
271 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
272 base64 encoding, unzips each file, reads each of them with utf32
273 encoding and feeds your map function line by line.
275 Note that "encoding" only teaches InputReader how to interpret files.
276 The input your map function gets is always a Python str.
280 FILES_PARAM
= "files"
281 FORMAT_PARAM
= "format"
283 def __init__(self
, format_root
):
284 """Initialize input reader.
287 format_root: a FileFormatRoot instance.
289 self
._file
_format
_root
= format_root
298 start_time
= time
.time()
300 content
= self
._file
_format
_root
.next().read()
303 operation
.counters
.Increment(
304 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
305 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
310 def split_input(cls
, mapper_spec
):
312 params
= _get_params(mapper_spec
)
316 for f
in params
[cls
.FILES_PARAM
]:
317 parsedName
= files
.gs
.parseGlob(f
)
318 if isinstance(parsedName
, tuple):
319 filenames
.extend(files
.gs
.listdir(parsedName
[0],
320 {"prefix": parsedName
[1]}))
322 filenames
.append(parsedName
)
324 file_format_roots
= file_format_root
.split(filenames
,
325 params
[cls
.FORMAT_PARAM
],
326 mapper_spec
.shard_count
)
328 return [cls(root
) for root
in file_format_roots
]
331 def validate(cls
, mapper_spec
):
333 if mapper_spec
.input_reader_class() != cls
:
334 raise BadReaderParamsError("Mapper input reader class mismatch")
337 params
= _get_params(mapper_spec
)
338 if cls
.FILES_PARAM
not in params
:
339 raise BadReaderParamsError("Must specify %s" % cls
.FILES_PARAM
)
340 if cls
.FORMAT_PARAM
not in params
:
341 raise BadReaderParamsError("Must specify %s" % cls
.FORMAT_PARAM
)
343 format_string
= params
[cls
.FORMAT_PARAM
]
344 if not isinstance(format_string
, basestring
):
345 raise BadReaderParamsError("format should be string but is %s" %
348 file_format_parser
.parse(format_string
)
349 except ValueError, e
:
350 raise BadReaderParamsError(e
)
352 paths
= params
[cls
.FILES_PARAM
]
353 if not (paths
and isinstance(paths
, list)):
354 raise BadReaderParamsError("files should be a list of filenames.")
359 files
.gs
.parseGlob(path
)
360 except files
.InvalidFileNameError
:
361 raise BadReaderParamsError("Invalid filename %s." % path
)
364 def from_json(cls
, json
):
367 file_format_root
.FileFormatRoot
.from_json(json
["file_format_root"]))
371 return {"file_format_root": self
._file
_format
_root
.to_json()}
377 class AbstractDatastoreInputReader(InputReader
):
378 """Abstract base class for classes that iterate over datastore entities.
380 Concrete subclasses must implement _iter_key_range(self, k_range). See the
381 docstring for that method for details.
388 _MAX_SHARD_COUNT
= 256
391 _OVERSAMPLING_FACTOR
= 32
396 MAX_NAMESPACES_FOR_KEY_SHARD
= 10
399 ENTITY_KIND_PARAM
= "entity_kind"
400 KEYS_ONLY_PARAM
= "keys_only"
401 BATCH_SIZE_PARAM
= "batch_size"
402 KEY_RANGE_PARAM
= "key_range"
403 NAMESPACE_RANGE_PARAM
= "namespace_range"
404 CURRENT_KEY_RANGE_PARAM
= "current_key_range"
405 FILTERS_PARAM
= "filters"
414 batch_size
=_BATCH_SIZE
,
415 current_key_range
=None,
417 """Create new AbstractDatastoreInputReader object.
419 This is internal constructor. Use split_query in a concrete class instead.
422 entity_kind: entity kind as string.
423 key_ranges: a sequence of key_range.KeyRange instances to process. Only
424 one of key_ranges or ns_range can be non-None.
425 ns_range: a namespace_range.NamespaceRange to process. Only one of
426 key_ranges or ns_range can be non-None.
427 batch_size: size of read batch as int.
428 current_key_range: the current key_range.KeyRange being processed.
429 filters: optional list of filters to apply to the query. Each filter is
430 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
431 User filters are applied first.
433 assert key_ranges
is not None or ns_range
is not None, (
434 "must specify one of 'key_ranges' or 'ns_range'")
435 assert key_ranges
is None or ns_range
is None, (
436 "can't specify both 'key_ranges ' and 'ns_range'")
438 self
._entity
_kind
= entity_kind
441 self
._key
_ranges
= key_ranges
and list(reversed(key_ranges
))
443 self
._ns
_range
= ns_range
444 self
._batch
_size
= int(batch_size
)
445 self
._current
_key
_range
= current_key_range
446 self
._filters
= filters
449 def _get_raw_entity_kind(cls
, entity_kind
):
450 if "." in entity_kind
:
452 ". detected in entity kind %s specified for reader %s."
453 "Assuming entity kind contains the dot.",
454 entity_kind
, cls
.__name
__)
458 """Iterates over the given KeyRanges or NamespaceRange.
460 This method iterates over the given KeyRanges or NamespaceRange and sets
461 the self._current_key_range to the KeyRange currently being processed. It
462 then delegates to the _iter_key_range method to yield that actual
466 Forwards the objects yielded by the subclasses concrete _iter_key_range()
467 method. The caller must consume the result yielded because self.to_json()
470 if self
._key
_ranges
is not None:
471 for o
in self
._iter
_key
_ranges
():
473 elif self
._ns
_range
is not None:
474 for o
in self
._iter
_ns
_range
():
477 assert False, "self._key_ranges and self._ns_range are both None"
479 def _iter_key_ranges(self
):
480 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
482 if self
._current
_key
_range
is None:
484 self
._current
_key
_range
= self
._key
_ranges
.pop()
491 for key
, o
in self
._iter
_key
_range
(
492 copy
.deepcopy(self
._current
_key
_range
)):
495 self
._current
_key
_range
.advance(key
)
497 self
._current
_key
_range
= None
499 def _iter_ns_range(self
):
500 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
502 if self
._current
_key
_range
is None:
503 query
= self
._ns
_range
.make_datastore_query()
504 namespace_result
= query
.Get(1)
505 if not namespace_result
:
508 namespace
= namespace_result
[0].name() or ""
509 self
._current
_key
_range
= key_range
.KeyRange(
510 namespace
=namespace
, _app
=self
._ns
_range
.app
)
511 yield ALLOW_CHECKPOINT
513 for key
, o
in self
._iter
_key
_range
(
514 copy
.deepcopy(self
._current
_key
_range
)):
517 self
._current
_key
_range
.advance(key
)
520 if (self
._ns
_range
.is_single_namespace
or
521 self
._current
_key
_range
.namespace
== self
._ns
_range
.namespace_end
):
523 self
._ns
_range
= self
._ns
_range
.with_start_after(
524 self
._current
_key
_range
.namespace
)
525 self
._current
_key
_range
= None
527 def _iter_key_range(self
, k_range
):
528 """Yields a db.Key and the value that should be yielded by self.__iter__().
531 k_range: The key_range.KeyRange to iterate over.
534 A 2-tuple containing the last db.Key processed and the value that should
535 be yielded by __iter__. The returned db.Key will be used to determine the
536 InputReader's current position in self._current_key_range.
538 raise NotImplementedError("_iter_key_range() not implemented in %s" %
542 """Returns the string representation of this InputReader."""
543 if self
._ns
_range
is None:
544 return repr(self
._key
_ranges
)
546 return repr(self
._ns
_range
)
549 def _choose_split_points(cls
, sorted_keys
, shard_count
):
550 """Returns the best split points given a random set of db.Keys."""
551 assert len(sorted_keys
) >= shard_count
552 index_stride
= len(sorted_keys
) / float(shard_count
)
553 return [sorted_keys
[int(round(index_stride
* i
))]
554 for i
in range(1, shard_count
)]
559 def _split_input_from_namespace(cls
, app
, namespace
, entity_kind
,
561 """Return KeyRange objects. Helper for _split_input_from_params.
563 If there are not enough Entities to make all of the given shards, the
564 returned list of KeyRanges will include Nones. The returned list will
565 contain KeyRanges ordered lexographically with any Nones appearing at the
569 raw_entity_kind
= cls
._get
_raw
_entity
_kind
(entity_kind
)
572 return [key_range
.KeyRange(namespace
=namespace
, _app
=app
)]
574 ds_query
= datastore
.Query(kind
=raw_entity_kind
,
578 ds_query
.Order("__scatter__")
579 random_keys
= ds_query
.Get(shard_count
* cls
._OVERSAMPLING
_FACTOR
)
584 return ([key_range
.KeyRange(namespace
=namespace
, _app
=app
)] +
585 [None] * (shard_count
- 1))
589 if len(random_keys
) >= shard_count
:
591 random_keys
= cls
._choose
_split
_points
(random_keys
, shard_count
)
595 key_ranges
.append(key_range
.KeyRange(
597 key_end
=random_keys
[0],
598 direction
=key_range
.KeyRange
.ASC
,
604 for i
in range(0, len(random_keys
) - 1):
605 key_ranges
.append(key_range
.KeyRange(
606 key_start
=random_keys
[i
],
607 key_end
=random_keys
[i
+1],
608 direction
=key_range
.KeyRange
.ASC
,
614 key_ranges
.append(key_range
.KeyRange(
615 key_start
=random_keys
[-1],
617 direction
=key_range
.KeyRange
.ASC
,
623 if len(key_ranges
) < shard_count
:
625 key_ranges
= key_ranges
+ [None] * (shard_count
- len(key_ranges
))
630 def _split_input_from_params(cls
, app
, namespaces
, entity_kind_name
,
631 params
, shard_count
):
632 """Return input reader objects. Helper for split_input."""
634 for namespace
in namespaces
:
636 cls
._split
_input
_from
_namespace
(app
,
644 shared_ranges
= [[] for _
in range(shard_count
)]
645 for i
, k_range
in enumerate(key_ranges
):
646 shared_ranges
[i
% shard_count
].append(k_range
)
647 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
649 return [cls(entity_kind_name
,
650 key_ranges
=key_ranges
,
652 batch_size
=batch_size
)
653 for key_ranges
in shared_ranges
if key_ranges
]
656 def validate(cls
, mapper_spec
):
657 """Validates mapper spec and all mapper parameters.
660 mapper_spec: The MapperSpec for this InputReader.
663 BadReaderParamsError: required parameters are missing or invalid.
665 if mapper_spec
.input_reader_class() != cls
:
666 raise BadReaderParamsError("Input reader class mismatch")
667 params
= _get_params(mapper_spec
)
668 if cls
.ENTITY_KIND_PARAM
not in params
:
669 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
670 if cls
.BATCH_SIZE_PARAM
in params
:
672 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
674 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
675 except ValueError, e
:
676 raise BadReaderParamsError("Bad batch size: %s" % e
)
677 if cls
.NAMESPACE_PARAM
in params
:
678 if not isinstance(params
[cls
.NAMESPACE_PARAM
],
679 (str, unicode, type(None))):
680 raise BadReaderParamsError(
681 "Expected a single namespace string")
682 if cls
.NAMESPACES_PARAM
in params
:
683 raise BadReaderParamsError("Multiple namespaces are no longer supported")
684 if cls
.FILTERS_PARAM
in params
:
685 filters
= params
[cls
.FILTERS_PARAM
]
686 if not isinstance(filters
, list):
687 raise BadReaderParamsError("Expected list for filters parameter")
689 if not isinstance(f
, (tuple, list)):
690 raise BadReaderParamsError("Filter should be a tuple or list: %s", f
)
692 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f
)
693 if not isinstance(f
[0], basestring
):
694 raise BadReaderParamsError("First element should be string: %s", f
)
696 raise BadReaderParamsError(
697 "Only equality filters are supported: %s", f
)
700 def split_input(cls
, mapper_spec
):
701 """Splits query into shards without fetching query results.
703 Tries as best as it can to split the whole query result set into equal
704 shards. Due to difficulty of making the perfect split, resulting shards'
705 sizes might differ significantly from each other.
708 mapper_spec: MapperSpec with params containing 'entity_kind'.
709 May have 'namespace' in the params as a string containing a single
710 namespace. If specified then the input reader will only yield values
711 in the given namespace. If 'namespace' is not given then values from
712 all namespaces will be yielded. May also have 'batch_size' in the params
713 to specify the number of entities to process in each batch.
716 A list of InputReader objects. If the query results are empty then the
717 empty list will be returned. Otherwise, the list will always have a length
718 equal to number_of_shards but may be padded with Nones if there are too
719 few results for effective sharding.
721 params
= _get_params(mapper_spec
)
722 entity_kind_name
= params
[cls
.ENTITY_KIND_PARAM
]
723 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
724 shard_count
= mapper_spec
.shard_count
725 namespace
= params
.get(cls
.NAMESPACE_PARAM
)
726 app
= params
.get(cls
._APP
_PARAM
)
727 filters
= params
.get(cls
.FILTERS_PARAM
)
729 if namespace
is None:
741 namespace_query
= datastore
.Query("__namespace__",
744 namespace_keys
= namespace_query
.Get(
745 limit
=cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
747 if len(namespace_keys
) > cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
748 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
751 return [cls(entity_kind_name
,
754 batch_size
=batch_size
,
756 for ns_range
in ns_ranges
]
757 elif not namespace_keys
:
758 return [cls(entity_kind_name
,
760 ns_range
=namespace_range
.NamespaceRange(),
761 batch_size
=shard_count
,
764 namespaces
= [namespace_key
.name() or ""
765 for namespace_key
in namespace_keys
]
767 namespaces
= [namespace
]
769 readers
= cls
._split
_input
_from
_params
(
770 app
, namespaces
, entity_kind_name
, params
, shard_count
)
772 for reader
in readers
:
773 reader
._filters
= filters
777 """Serializes all the data in this query range into json form.
780 all the data in json-compatible map.
782 if self
._key
_ranges
is None:
783 key_ranges_json
= None
786 for k
in self
._key
_ranges
:
788 key_ranges_json
.append(k
.to_json())
790 key_ranges_json
.append(None)
792 if self
._ns
_range
is None:
793 namespace_range_json
= None
795 namespace_range_json
= self
._ns
_range
.to_json_object()
797 if self
._current
_key
_range
is None:
798 current_key_range_json
= None
800 current_key_range_json
= self
._current
_key
_range
.to_json()
802 json_dict
= {self
.KEY_RANGE_PARAM
: key_ranges_json
,
803 self
.NAMESPACE_RANGE_PARAM
: namespace_range_json
,
804 self
.CURRENT_KEY_RANGE_PARAM
: current_key_range_json
,
805 self
.ENTITY_KIND_PARAM
: self
._entity
_kind
,
806 self
.BATCH_SIZE_PARAM
: self
._batch
_size
,
807 self
.FILTERS_PARAM
: self
._filters
}
811 def from_json(cls
, json
):
812 """Create new DatastoreInputReader from the json, encoded by to_json.
815 json: json map representation of DatastoreInputReader.
818 an instance of DatastoreInputReader with all data deserialized from json.
820 if json
[cls
.KEY_RANGE_PARAM
] is None:
824 for k
in json
[cls
.KEY_RANGE_PARAM
]:
826 key_ranges
.append(key_range
.KeyRange
.from_json(k
))
828 key_ranges
.append(None)
830 if json
[cls
.NAMESPACE_RANGE_PARAM
] is None:
833 ns_range
= namespace_range
.NamespaceRange
.from_json_object(
834 json
[cls
.NAMESPACE_RANGE_PARAM
])
836 if json
[cls
.CURRENT_KEY_RANGE_PARAM
] is None:
837 current_key_range
= None
839 current_key_range
= key_range
.KeyRange
.from_json(
840 json
[cls
.CURRENT_KEY_RANGE_PARAM
])
843 json
[cls
.ENTITY_KIND_PARAM
],
846 json
[cls
.BATCH_SIZE_PARAM
],
848 filters
=json
.get(cls
.FILTERS_PARAM
))
851 class DatastoreInputReader(AbstractDatastoreInputReader
):
852 """Represents a range in query results.
854 DatastoreInputReader yields model instances from the entities in a given key
855 range. Iterating over DatastoreInputReader changes its range past consumed
858 The class shouldn't be instantiated directly. Use the split_input class method
862 def _iter_key_range(self
, k_range
):
865 query
= k_range
.make_ascending_query(
866 util
.for_name(self
._entity
_kind
),
867 filters
=self
._filters
)
868 if isinstance(query
, db
.Query
):
871 query
.with_cursor(cursor
)
873 results
= query
.fetch(limit
=self
._batch
_size
)
877 for model_instance
in results
:
878 key
= model_instance
.key()
879 yield key
, model_instance
880 cursor
= query
.cursor()
883 results
, cursor
, more
= query
.fetch_page(self
._batch
_size
,
885 for model_instance
in results
:
886 key
= model_instance
.key
887 yield key
, model_instance
892 def validate(cls
, mapper_spec
):
893 """Validates mapper spec and all mapper parameters.
896 mapper_spec: The MapperSpec for this InputReader.
899 BadReaderParamsError: required parameters are missing or invalid.
901 super(DatastoreInputReader
, cls
).validate(mapper_spec
)
902 params
= _get_params(mapper_spec
)
903 keys_only
= util
.parse_bool(params
.get(cls
.KEYS_ONLY_PARAM
, False))
905 raise BadReaderParamsError("The keys_only parameter is obsolete. "
906 "Use DatastoreKeyInputReader instead.")
908 entity_kind_name
= params
[cls
.ENTITY_KIND_PARAM
]
911 util
.for_name(entity_kind_name
)
912 except ImportError, e
:
913 raise BadReaderParamsError("Bad entity kind: %s" % e
)
916 def _get_raw_entity_kind(cls
, entity_kind
):
917 """Returns an entity kind to use with datastore calls."""
918 entity_type
= util
.for_name(entity_kind
)
919 if isinstance(entity_type
, db
.Model
):
920 return entity_type
.kind()
921 elif ndb
and isinstance(entity_type
, (ndb
.Model
, ndb
.MetaModel
)):
922 return entity_type
._get
_kind
()
924 return util
.get_short_name(entity_kind
)
927 class DatastoreKeyInputReader(AbstractDatastoreInputReader
):
928 """An input reader which takes a Kind and yields Keys for that kind."""
930 def _iter_key_range(self
, k_range
):
931 raw_entity_kind
= self
._get
_raw
_entity
_kind
(self
._entity
_kind
)
932 query
= k_range
.make_ascending_datastore_query(
933 raw_entity_kind
, keys_only
=True, filters
=self
._filters
)
934 for key
in query
.Run(
935 config
=datastore_query
.QueryOptions(batch_size
=self
._batch
_size
)):
939 class DatastoreEntityInputReader(AbstractDatastoreInputReader
):
940 """An input reader which yields low level datastore entities for a kind."""
942 def _iter_key_range(self
, k_range
):
943 raw_entity_kind
= self
._get
_raw
_entity
_kind
(self
._entity
_kind
)
944 query
= k_range
.make_ascending_datastore_query(
945 raw_entity_kind
, self
._filters
)
946 for entity
in query
.Run(
947 config
=datastore_query
.QueryOptions(batch_size
=self
._batch
_size
)):
948 yield entity
.key(), entity
951 class BlobstoreLineInputReader(InputReader
):
952 """Input reader for a newline delimited blob in Blobstore."""
955 _BLOB_BUFFER_SIZE
= 64000
958 _MAX_SHARD_COUNT
= 256
961 _MAX_BLOB_KEYS_COUNT
= 246
964 BLOB_KEYS_PARAM
= "blob_keys"
967 INITIAL_POSITION_PARAM
= "initial_position"
968 END_POSITION_PARAM
= "end_position"
969 BLOB_KEY_PARAM
= "blob_key"
971 def __init__(self
, blob_key
, start_position
, end_position
):
972 """Initializes this instance with the given blob key and character range.
974 This BlobstoreInputReader will read from the first record starting after
975 strictly after start_position until the first record ending at or after
976 end_position (exclusive). As an exception, if start_position is 0, then
977 this InputReader starts reading at the first record.
980 blob_key: the BlobKey that this input reader is processing.
981 start_position: the position to start reading at.
982 end_position: a position in the last record to read.
984 self
._blob
_key
= blob_key
985 self
._blob
_reader
= blobstore
.BlobReader(blob_key
,
986 self
._BLOB
_BUFFER
_SIZE
,
988 self
._end
_position
= end_position
989 self
._has
_iterated
= False
990 self
._read
_before
_start
= bool(start_position
)
993 """Returns the next input from as an (offset, line) tuple."""
994 self
._has
_iterated
= True
996 if self
._read
_before
_start
:
997 self
._blob
_reader
.readline()
998 self
._read
_before
_start
= False
999 start_position
= self
._blob
_reader
.tell()
1001 if start_position
> self
._end
_position
:
1002 raise StopIteration()
1004 line
= self
._blob
_reader
.readline()
1007 raise StopIteration()
1009 return start_position
, line
.rstrip("\n")
1012 """Returns an json-compatible input shard spec for remaining inputs."""
1013 new_pos
= self
._blob
_reader
.tell()
1014 if self
._has
_iterated
:
1016 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1017 self
.INITIAL_POSITION_PARAM
: new_pos
,
1018 self
.END_POSITION_PARAM
: self
._end
_position
}
1021 """Returns the string representation of this BlobstoreLineInputReader."""
1022 return "blobstore.BlobKey(%r):[%d, %d]" % (
1023 self
._blob
_key
, self
._blob
_reader
.tell(), self
._end
_position
)
1026 def from_json(cls
, json
):
1027 """Instantiates an instance of this InputReader for the given shard spec."""
1028 return cls(json
[cls
.BLOB_KEY_PARAM
],
1029 json
[cls
.INITIAL_POSITION_PARAM
],
1030 json
[cls
.END_POSITION_PARAM
])
1033 def validate(cls
, mapper_spec
):
1034 """Validates mapper spec and all mapper parameters.
1037 mapper_spec: The MapperSpec for this InputReader.
1040 BadReaderParamsError: required parameters are missing or invalid.
1042 if mapper_spec
.input_reader_class() != cls
:
1043 raise BadReaderParamsError("Mapper input reader class mismatch")
1044 params
= _get_params(mapper_spec
)
1045 if cls
.BLOB_KEYS_PARAM
not in params
:
1046 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1047 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1048 if isinstance(blob_keys
, basestring
):
1051 blob_keys
= blob_keys
.split(",")
1052 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1053 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1055 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1056 for blob_key
in blob_keys
:
1057 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1059 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1063 def split_input(cls
, mapper_spec
):
1064 """Returns a list of shard_count input_spec_shards for input_spec.
1067 mapper_spec: The mapper specification to split from. Must contain
1068 'blob_keys' parameter with one or more blob keys.
1071 A list of BlobstoreInputReaders corresponding to the specified shards.
1073 params
= _get_params(mapper_spec
)
1074 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1075 if isinstance(blob_keys
, basestring
):
1078 blob_keys
= blob_keys
.split(",")
1081 for blob_key
in blob_keys
:
1082 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1083 blob_sizes
[blob_key
] = blob_info
.size
1085 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1086 shards_per_blob
= shard_count
// len(blob_keys
)
1087 if shards_per_blob
== 0:
1091 for blob_key
, blob_size
in blob_sizes
.items():
1092 blob_chunk_size
= blob_size
// shards_per_blob
1093 for i
in xrange(shards_per_blob
- 1):
1094 chunks
.append(BlobstoreLineInputReader
.from_json(
1095 {cls
.BLOB_KEY_PARAM
: blob_key
,
1096 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* i
,
1097 cls
.END_POSITION_PARAM
: blob_chunk_size
* (i
+ 1)}))
1098 chunks
.append(BlobstoreLineInputReader
.from_json(
1099 {cls
.BLOB_KEY_PARAM
: blob_key
,
1100 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* (shards_per_blob
- 1),
1101 cls
.END_POSITION_PARAM
: blob_size
}))
1105 class BlobstoreZipInputReader(InputReader
):
1106 """Input reader for files from a zip archive stored in the Blobstore.
1108 Each instance of the reader will read the TOC, from the end of the zip file,
1109 and then only the contained files which it is responsible for.
1113 _MAX_SHARD_COUNT
= 256
1116 BLOB_KEY_PARAM
= "blob_key"
1117 START_INDEX_PARAM
= "start_index"
1118 END_INDEX_PARAM
= "end_index"
1120 def __init__(self
, blob_key
, start_index
, end_index
,
1121 _reader
=blobstore
.BlobReader
):
1122 """Initializes this instance with the given blob key and file range.
1124 This BlobstoreZipInputReader will read from the file with index start_index
1125 up to but not including the file with index end_index.
1128 blob_key: the BlobKey that this input reader is processing.
1129 start_index: the index of the first file to read.
1130 end_index: the index of the first file that will not be read.
1131 _reader: a callable that returns a file-like object for reading blobs.
1132 Used for dependency injection.
1134 self
._blob
_key
= blob_key
1135 self
._start
_index
= start_index
1136 self
._end
_index
= end_index
1137 self
._reader
= _reader
1139 self
._entries
= None
1142 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1145 The next input from this input reader, in the form of a 2-tuple.
1146 The first element of the tuple is a zipfile.ZipInfo object.
1147 The second element of the tuple is a zero-argument function that, when
1148 called, returns the complete body of the file.
1151 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1153 self
._entries
= self
._zip
.infolist()[self
._start
_index
:self
._end
_index
]
1154 self
._entries
.reverse()
1155 if not self
._entries
:
1156 raise StopIteration()
1157 entry
= self
._entries
.pop()
1158 self
._start
_index
+= 1
1159 return (entry
, lambda: self
._read
(entry
))
1161 def _read(self
, entry
):
1162 """Read entry content.
1165 entry: zip file entry as zipfile.ZipInfo.
1167 Entry content as string.
1169 start_time
= time
.time()
1170 content
= self
._zip
.read(entry
.filename
)
1174 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1175 operation
.counters
.Increment(
1176 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1181 def from_json(cls
, json
):
1182 """Creates an instance of the InputReader for the given input shard state.
1185 json: The InputReader state as a dict-like object.
1188 An instance of the InputReader configured using the values of json.
1190 return cls(json
[cls
.BLOB_KEY_PARAM
],
1191 json
[cls
.START_INDEX_PARAM
],
1192 json
[cls
.END_INDEX_PARAM
])
1195 """Returns an input shard state for the remaining inputs.
1198 A json-izable version of the remaining InputReader.
1200 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1201 self
.START_INDEX_PARAM
: self
._start
_index
,
1202 self
.END_INDEX_PARAM
: self
._end
_index
}
1205 """Returns the string representation of this BlobstoreZipInputReader."""
1206 return "blobstore.BlobKey(%r):[%d, %d]" % (
1207 self
._blob
_key
, self
._start
_index
, self
._end
_index
)
1210 def validate(cls
, mapper_spec
):
1211 """Validates mapper spec and all mapper parameters.
1214 mapper_spec: The MapperSpec for this InputReader.
1217 BadReaderParamsError: required parameters are missing or invalid.
1219 if mapper_spec
.input_reader_class() != cls
:
1220 raise BadReaderParamsError("Mapper input reader class mismatch")
1221 params
= _get_params(mapper_spec
)
1222 if cls
.BLOB_KEY_PARAM
not in params
:
1223 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1224 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1225 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1227 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1232 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1233 """Returns a list of input shard states for the input spec.
1236 mapper_spec: The MapperSpec for this InputReader. Must contain
1237 'blob_key' parameter with one blob key.
1238 _reader: a callable that returns a file-like object for reading blobs.
1239 Used for dependency injection.
1242 A list of InputReaders spanning files within the zip.
1244 params
= _get_params(mapper_spec
)
1245 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1246 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1247 files
= zip_input
.infolist()
1248 total_size
= sum(x
.file_size
for x
in files
)
1249 num_shards
= min(mapper_spec
.shard_count
, cls
._MAX
_SHARD
_COUNT
)
1250 size_per_shard
= total_size
// num_shards
1254 shard_start_indexes
= [0]
1255 current_shard_size
= 0
1256 for i
, fileinfo
in enumerate(files
):
1257 current_shard_size
+= fileinfo
.file_size
1258 if current_shard_size
>= size_per_shard
:
1259 shard_start_indexes
.append(i
+ 1)
1260 current_shard_size
= 0
1262 if shard_start_indexes
[-1] != len(files
):
1263 shard_start_indexes
.append(len(files
))
1265 return [cls(blob_key
, start_index
, end_index
, _reader
)
1266 for start_index
, end_index
1267 in zip(shard_start_indexes
, shard_start_indexes
[1:])]
1270 class BlobstoreZipLineInputReader(InputReader
):
1271 """Input reader for newline delimited files in zip archives from Blobstore.
1273 This has the same external interface as the BlobstoreLineInputReader, in that
1274 it takes a list of blobs as its input and yields lines to the reader.
1275 However the blobs themselves are expected to be zip archives of line delimited
1276 files instead of the files themselves.
1278 This is useful as many line delimited files gain greatly from compression.
1282 _MAX_SHARD_COUNT
= 256
1285 _MAX_BLOB_KEYS_COUNT
= 246
1288 BLOB_KEYS_PARAM
= "blob_keys"
1291 BLOB_KEY_PARAM
= "blob_key"
1292 START_FILE_INDEX_PARAM
= "start_file_index"
1293 END_FILE_INDEX_PARAM
= "end_file_index"
1294 OFFSET_PARAM
= "offset"
1296 def __init__(self
, blob_key
, start_file_index
, end_file_index
, offset
,
1297 _reader
=blobstore
.BlobReader
):
1298 """Initializes this instance with the given blob key and file range.
1300 This BlobstoreZipLineInputReader will read from the file with index
1301 start_file_index up to but not including the file with index end_file_index.
1302 It will return lines starting at offset within file[start_file_index]
1305 blob_key: the BlobKey that this input reader is processing.
1306 start_file_index: the index of the first file to read within the zip.
1307 end_file_index: the index of the first file that will not be read.
1308 offset: the byte offset within blob_key.zip[start_file_index] to start
1309 reading. The reader will continue to the end of the file.
1310 _reader: a callable that returns a file-like object for reading blobs.
1311 Used for dependency injection.
1313 self
._blob
_key
= blob_key
1314 self
._start
_file
_index
= start_file_index
1315 self
._end
_file
_index
= end_file_index
1316 self
._initial
_offset
= offset
1317 self
._reader
= _reader
1319 self
._entries
= None
1320 self
._filestream
= None
1323 def validate(cls
, mapper_spec
):
1324 """Validates mapper spec and all mapper parameters.
1327 mapper_spec: The MapperSpec for this InputReader.
1330 BadReaderParamsError: required parameters are missing or invalid.
1332 if mapper_spec
.input_reader_class() != cls
:
1333 raise BadReaderParamsError("Mapper input reader class mismatch")
1334 params
= _get_params(mapper_spec
)
1335 if cls
.BLOB_KEYS_PARAM
not in params
:
1336 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1338 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1339 if isinstance(blob_keys
, basestring
):
1342 blob_keys
= blob_keys
.split(",")
1343 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1344 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1346 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1347 for blob_key
in blob_keys
:
1348 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1350 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1354 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1355 """Returns a list of input readers for the input spec.
1358 mapper_spec: The MapperSpec for this InputReader. Must contain
1359 'blob_keys' parameter with one or more blob keys.
1360 _reader: a callable that returns a file-like object for reading blobs.
1361 Used for dependency injection.
1364 A list of InputReaders spanning the subfiles within the blobs.
1365 There will be at least one reader per blob, but it will otherwise
1366 attempt to keep the expanded size even.
1368 params
= _get_params(mapper_spec
)
1369 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1370 if isinstance(blob_keys
, basestring
):
1373 blob_keys
= blob_keys
.split(",")
1377 for blob_key
in blob_keys
:
1378 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1379 blob_files
[blob_key
] = zip_input
.infolist()
1380 total_size
+= sum(x
.file_size
for x
in blob_files
[blob_key
])
1382 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1388 size_per_shard
= total_size
// shard_count
1391 for blob_key
in blob_keys
:
1392 files
= blob_files
[blob_key
]
1393 current_shard_size
= 0
1394 start_file_index
= 0
1396 for fileinfo
in files
:
1397 next_file_index
+= 1
1398 current_shard_size
+= fileinfo
.file_size
1399 if current_shard_size
>= size_per_shard
:
1400 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1402 current_shard_size
= 0
1403 start_file_index
= next_file_index
1404 if current_shard_size
!= 0:
1405 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1411 """Returns the next line from this input reader as (lineinfo, line) tuple.
1414 The next input from this input reader, in the form of a 2-tuple.
1415 The first element of the tuple describes the source, it is itself
1416 a tuple (blobkey, filenumber, byteoffset).
1417 The second element of the tuple is the line found at that offset.
1419 if not self
._filestream
:
1421 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1423 self
._entries
= self
._zip
.infolist()[self
._start
_file
_index
:
1424 self
._end
_file
_index
]
1425 self
._entries
.reverse()
1426 if not self
._entries
:
1427 raise StopIteration()
1428 entry
= self
._entries
.pop()
1429 value
= self
._zip
.read(entry
.filename
)
1430 self
._filestream
= StringIO
.StringIO(value
)
1431 if self
._initial
_offset
:
1432 self
._filestream
.seek(self
._initial
_offset
)
1433 self
._filestream
.readline()
1435 start_position
= self
._filestream
.tell()
1436 line
= self
._filestream
.readline()
1440 self
._filestream
.close()
1441 self
._filestream
= None
1442 self
._start
_file
_index
+= 1
1443 self
._initial
_offset
= 0
1446 return ((self
._blob
_key
, self
._start
_file
_index
, start_position
),
1449 def _next_offset(self
):
1450 """Return the offset of the next line to read."""
1451 if self
._filestream
:
1452 offset
= self
._filestream
.tell()
1456 offset
= self
._initial
_offset
1461 """Returns an input shard state for the remaining inputs.
1464 A json-izable version of the remaining InputReader.
1467 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1468 self
.START_FILE_INDEX_PARAM
: self
._start
_file
_index
,
1469 self
.END_FILE_INDEX_PARAM
: self
._end
_file
_index
,
1470 self
.OFFSET_PARAM
: self
._next
_offset
()}
1473 def from_json(cls
, json
, _reader
=blobstore
.BlobReader
):
1474 """Creates an instance of the InputReader for the given input shard state.
1477 json: The InputReader state as a dict-like object.
1478 _reader: For dependency injection.
1481 An instance of the InputReader configured using the values of json.
1483 return cls(json
[cls
.BLOB_KEY_PARAM
],
1484 json
[cls
.START_FILE_INDEX_PARAM
],
1485 json
[cls
.END_FILE_INDEX_PARAM
],
1486 json
[cls
.OFFSET_PARAM
],
1490 """Returns the string representation of this reader.
1493 string blobkey:[start file num, end file num]:current offset.
1495 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1496 self
._blob
_key
, self
._start
_file
_index
, self
._end
_file
_index
,
1497 self
._next
_offset
())
1500 class RandomStringInputReader(InputReader
):
1501 """RandomStringInputReader generates random strings as output.
1503 Primary usage is to populate output with testing entries.
1509 STRING_LENGTH
= "string_length"
1511 DEFAULT_STRING_LENGTH
= 10
1513 def __init__(self
, count
, string_length
):
1514 """Initialize input reader.
1517 count: number of entries this shard should generate.
1518 string_length: the length of generated random strings.
1521 self
._string
_length
= string_length
1528 start_time
= time
.time()
1529 content
= "".join(random
.choice(string
.ascii_lowercase
)
1530 for _
in range(self
._string
_length
))
1532 operation
.counters
.Increment(
1533 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1534 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1538 def split_input(cls
, mapper_spec
):
1539 params
= _get_params(mapper_spec
)
1540 count
= params
[cls
.COUNT
]
1541 string_length
= cls
.DEFAULT_STRING_LENGTH
1542 if cls
.STRING_LENGTH
in params
:
1543 string_length
= params
[cls
.STRING_LENGTH
]
1545 shard_count
= mapper_spec
.shard_count
1546 count_per_shard
= count
// shard_count
1548 mr_input_readers
= [
1549 cls(count_per_shard
, string_length
) for _
in range(shard_count
)]
1551 left
= count
- count_per_shard
*shard_count
1553 mr_input_readers
.append(cls(left
, string_length
))
1555 return mr_input_readers
1558 def validate(cls
, mapper_spec
):
1559 if mapper_spec
.input_reader_class() != cls
:
1560 raise BadReaderParamsError("Mapper input reader class mismatch")
1562 params
= _get_params(mapper_spec
)
1563 if cls
.COUNT
not in params
:
1564 raise BadReaderParamsError("Must specify %s" % cls
.COUNT
)
1565 if not isinstance(params
[cls
.COUNT
], int):
1566 raise BadReaderParamsError("%s should be an int but is %s" %
1567 (cls
.COUNT
, type(params
[cls
.COUNT
])))
1568 if params
[cls
.COUNT
] <= 0:
1569 raise BadReaderParamsError("%s should be a positive int")
1570 if cls
.STRING_LENGTH
in params
and not (
1571 isinstance(params
[cls
.STRING_LENGTH
], int) and
1572 params
[cls
.STRING_LENGTH
] > 0):
1573 raise BadReaderParamsError("%s should be a positive int but is %s" %
1574 (cls
.STRING_LENGTH
, params
[cls
.STRING_LENGTH
]))
1575 if (not isinstance(mapper_spec
.shard_count
, int) or
1576 mapper_spec
.shard_count
<= 0):
1577 raise BadReaderParamsError(
1578 "shard_count should be a positive int but is %s" %
1579 mapper_spec
.shard_count
)
1582 def from_json(cls
, json
):
1583 return cls(json
[cls
.COUNT
], json
[cls
.STRING_LENGTH
])
1586 return {self
.COUNT
: self
._count
, self
.STRING_LENGTH
: self
._string
_length
}
1589 class ConsistentKeyReader(DatastoreKeyInputReader
):
1590 """A key reader which reads consistent data from datastore.
1592 Datastore might have entities which were written, but not visible through
1593 queries for some time. Typically these entities can be only read inside
1594 transaction until they are 'applied'.
1596 This reader reads all keys even if they are not visible. It might take
1597 significant time to start yielding some data because it has to apply all
1598 modifications created before its start.
1600 START_TIME_US_PARAM
= "start_time_us"
1601 UNAPPLIED_LOG_FILTER
= "__unapplied_log_timestamp_us__ <"
1602 DUMMY_KIND
= "DUMMY_KIND"
1603 DUMMY_ID
= 106275677020293L
1604 UNAPPLIED_QUERY_DEADLINE
= 270
1606 def _get_unapplied_jobs_accross_namespaces(self
,
1610 filters
= {"__key__ >=": db
.Key
.from_path("__namespace__",
1611 namespace_start
or 1,
1613 "__key__ <=": db
.Key
.from_path("__namespace__",
1616 self
.UNAPPLIED_LOG_FILTER
: self
.start_time_us
}
1617 unapplied_query
= datastore
.Query(filters
=filters
, keys_only
=True, _app
=app
)
1618 return unapplied_query
.Get(
1619 limit
=self
._batch
_size
,
1620 config
=datastore_rpc
.Configuration(
1621 deadline
=self
.UNAPPLIED_QUERY_DEADLINE
))
1623 def _iter_ns_range(self
):
1625 unapplied_jobs
= self
._get
_unapplied
_jobs
_accross
_namespaces
(
1626 self
._ns
_range
.namespace_start
,
1627 self
._ns
_range
.namespace_end
,
1630 if not unapplied_jobs
:
1633 self
._apply
_jobs
(unapplied_jobs
)
1635 for o
in super(ConsistentKeyReader
, self
)._iter
_ns
_range
():
1638 def _iter_key_range(self
, k_range
):
1639 assert hasattr(self
, "start_time_us"), "start_time_us property was not set"
1640 if self
._ns
_range
is None:
1643 self
._apply
_key
_range
(k_range
)
1645 for o
in super(ConsistentKeyReader
, self
)._iter
_key
_range
(k_range
):
1648 def _apply_key_range(self
, k_range
):
1649 """Apply all jobs in the given KeyRange."""
1655 apply_range
= copy
.deepcopy(k_range
)
1660 unapplied_query
= self
._make
_unapplied
_query
(apply_range
)
1661 unapplied_jobs
= unapplied_query
.Get(
1662 limit
=self
._batch
_size
,
1663 config
=datastore_rpc
.Configuration(
1664 deadline
=self
.UNAPPLIED_QUERY_DEADLINE
))
1665 if not unapplied_jobs
:
1667 self
._apply
_jobs
(unapplied_jobs
)
1670 apply_range
.advance(unapplied_jobs
[-1])
1672 def _make_unapplied_query(self
, k_range
):
1673 """Returns a datastore.Query that finds the unapplied keys in k_range."""
1674 unapplied_query
= k_range
.make_ascending_datastore_query(
1675 kind
=None, keys_only
=True)
1677 ConsistentKeyReader
.UNAPPLIED_LOG_FILTER
] = self
.start_time_us
1678 return unapplied_query
1680 def _apply_jobs(self
, unapplied_jobs
):
1681 """Apply all jobs implied by the given keys."""
1684 for key
in unapplied_jobs
:
1687 path
= key
.to_path() + [ConsistentKeyReader
.DUMMY_KIND
,
1688 ConsistentKeyReader
.DUMMY_ID
]
1689 keys_to_apply
.append(
1690 db
.Key
.from_path(_app
=key
.app(), namespace
=key
.namespace(), *path
))
1691 db
.get(keys_to_apply
, config
=datastore_rpc
.Configuration(
1692 deadline
=self
.UNAPPLIED_QUERY_DEADLINE
,
1693 read_policy
=datastore_rpc
.Configuration
.APPLY_ALL_JOBS_CONSISTENCY
))
1696 def _split_input_from_namespace(cls
,
1701 key_ranges
= super(ConsistentKeyReader
, cls
)._split
_input
_from
_namespace
(
1702 app
, namespace
, entity_kind_name
, shard_count
)
1703 assert len(key_ranges
) == shard_count
1709 last_key_range_index
= key_ranges
.index(None) - 1
1711 last_key_range_index
= shard_count
- 1
1713 if last_key_range_index
!= -1:
1714 key_ranges
[0].key_start
= None
1715 key_ranges
[0].include_start
= False
1716 key_ranges
[last_key_range_index
].key_end
= None
1717 key_ranges
[last_key_range_index
].include_end
= False
1721 def _split_input_from_params(cls
, app
, namespaces
, entity_kind_name
,
1722 params
, shard_count
):
1723 readers
= super(ConsistentKeyReader
, cls
)._split
_input
_from
_params
(
1733 readers
= [cls(entity_kind_name
,
1735 ns_range
=namespace_range
.NamespaceRange(),
1736 batch_size
=shard_count
)]
1741 def split_input(cls
, mapper_spec
):
1742 """Splits input into key ranges."""
1743 readers
= super(ConsistentKeyReader
, cls
).split_input(mapper_spec
)
1744 start_time_us
= _get_params(mapper_spec
).get(
1745 cls
.START_TIME_US_PARAM
, long(time
.time() * 1e6
))
1746 for reader
in readers
:
1747 reader
.start_time_us
= start_time_us
1751 """Serializes all the data in this reader into json form.
1754 all the data in json-compatible map.
1756 json_dict
= super(DatastoreKeyInputReader
, self
).to_json()
1757 json_dict
[self
.START_TIME_US_PARAM
] = self
.start_time_us
1761 def from_json(cls
, json
):
1762 """Create new ConsistentKeyReader from the json, encoded by to_json.
1765 json: json map representation of ConsistentKeyReader.
1768 an instance of ConsistentKeyReader with all data deserialized from json.
1770 reader
= super(ConsistentKeyReader
, cls
).from_json(json
)
1771 reader
.start_time_us
= json
[cls
.START_TIME_US_PARAM
]
1781 class NamespaceInputReader(InputReader
):
1782 """An input reader to iterate over namespaces.
1784 This reader yields namespace names as string.
1785 It will always produce only one shard.
1788 NAMESPACE_RANGE_PARAM
= "namespace_range"
1789 BATCH_SIZE_PARAM
= "batch_size"
1792 def __init__(self
, ns_range
, batch_size
= _BATCH_SIZE
):
1793 self
.ns_range
= ns_range
1794 self
._batch
_size
= batch_size
1797 """Serializes all the data in this query range into json form.
1800 all the data in json-compatible map.
1802 return {self
.NAMESPACE_RANGE_PARAM
: self
.ns_range
.to_json_object(),
1803 self
.BATCH_SIZE_PARAM
: self
._batch
_size
}
1806 def from_json(cls
, json
):
1807 """Create new DatastoreInputReader from the json, encoded by to_json.
1810 json: json map representation of DatastoreInputReader.
1813 an instance of DatastoreInputReader with all data deserialized from json.
1816 namespace_range
.NamespaceRange
.from_json_object(
1817 json
[cls
.NAMESPACE_RANGE_PARAM
]),
1818 json
[cls
.BATCH_SIZE_PARAM
])
1821 def validate(cls
, mapper_spec
):
1822 """Validates mapper spec.
1825 mapper_spec: The MapperSpec for this InputReader.
1828 BadReaderParamsError: required parameters are missing or invalid.
1830 if mapper_spec
.input_reader_class() != cls
:
1831 raise BadReaderParamsError("Input reader class mismatch")
1832 params
= _get_params(mapper_spec
)
1833 if cls
.BATCH_SIZE_PARAM
in params
:
1835 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
1837 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
1838 except ValueError, e
:
1839 raise BadReaderParamsError("Bad batch size: %s" % e
)
1842 def split_input(cls
, mapper_spec
):
1843 """Returns a list of input readers for the input spec.
1846 mapper_spec: The MapperSpec for this InputReader.
1849 A list of InputReaders.
1851 batch_size
= int(_get_params(mapper_spec
).get(
1852 cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
1853 shard_count
= mapper_spec
.shard_count
1854 namespace_ranges
= namespace_range
.NamespaceRange
.split(shard_count
,
1856 return [NamespaceInputReader(ns_range
, batch_size
)
1857 for ns_range
in namespace_ranges
]
1861 keys
= self
.ns_range
.make_datastore_query().Get(limit
=self
._batch
_size
)
1866 namespace
= metadata
.Namespace
.key_to_namespace(key
)
1867 self
.ns_range
= self
.ns_range
.with_start_after(namespace
)
1871 return repr(self
.ns_range
)
1874 class RecordsReader(InputReader
):
1875 """Reader to read a list of Files API file in records format.
1877 The number of input shards can be specified by the SHARDS_PARAM
1878 mapper parameter. Input files cannot be split, so there will be at most
1879 one shard per file. Also the number of shards will not be reduced based on
1880 the number of input files, so shards in always equals shards out.
1884 FILES_PARAM
= "files"
1886 def __init__(self
, filenames
, position
):
1890 filenames: list of filenames.
1891 position: file position to start reading from as int.
1893 self
._filenames
= filenames
1895 self
._reader
= records
.RecordsReader(
1896 files
.BufferedFile(self
._filenames
[0]))
1897 self
._reader
.seek(position
)
1902 """Iterate over records in file.
1904 Yields records as strings.
1910 start_time
= time
.time()
1911 record
= self
._reader
.read()
1913 operation
.counters
.Increment(
1914 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1915 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(record
))(ctx
)
1917 except (files
.ExistenceError
), e
:
1918 raise errors
.FailJobError("ExistenceError: %s" % e
)
1919 except (files
.UnknownError
), e
:
1920 raise errors
.RetrySliceError("UnknownError: %s" % e
)
1922 self
._filenames
.pop(0)
1923 if not self
._filenames
:
1926 self
._reader
= records
.RecordsReader(
1927 files
.BufferedFile(self
._filenames
[0]))
1930 def from_json(cls
, json
):
1931 """Creates an instance of the InputReader for the given input shard state.
1934 json: The InputReader state as a dict-like object.
1937 An instance of the InputReader configured using the values of json.
1939 return cls(json
["filenames"], json
["position"])
1942 """Returns an input shard state for the remaining inputs.
1945 A json-izable version of the remaining InputReader.
1948 "filenames": self
._filenames
,
1952 result
["position"] = self
._reader
.tell()
1956 def split_input(cls
, mapper_spec
):
1957 """Returns a list of input readers for the input spec.
1960 mapper_spec: The MapperSpec for this InputReader.
1963 A list of InputReaders.
1965 params
= _get_params(mapper_spec
)
1966 shard_count
= mapper_spec
.shard_count
1968 if cls
.FILES_PARAM
in params
:
1969 filenames
= params
[cls
.FILES_PARAM
]
1970 if isinstance(filenames
, basestring
):
1971 filenames
= filenames
.split(",")
1973 filenames
= [params
[cls
.FILE_PARAM
]]
1975 batch_list
= [[] for _
in xrange(shard_count
)]
1976 for index
, filename
in enumerate(filenames
):
1978 batch_list
[index
% shard_count
].append(filenames
[index
])
1981 batch_list
.sort(reverse
=True, key
=lambda x
: len(x
))
1982 return [cls(batch
, 0) for batch
in batch_list
]
1985 def validate(cls
, mapper_spec
):
1986 """Validates mapper spec and all mapper parameters.
1989 mapper_spec: The MapperSpec for this InputReader.
1992 BadReaderParamsError: required parameters are missing or invalid.
1994 if mapper_spec
.input_reader_class() != cls
:
1995 raise errors
.BadReaderParamsError("Input reader class mismatch")
1996 params
= _get_params(mapper_spec
)
1997 if (cls
.FILES_PARAM
not in params
and
1998 cls
.FILE_PARAM
not in params
):
1999 raise BadReaderParamsError(
2000 "Must specify '%s' or '%s' parameter for mapper input" %
2001 (cls
.FILES_PARAM
, cls
.FILE_PARAM
))
2006 position
= self
._reader
.tell()
2007 return "%s:%s" % (self
._filenames
, position
)
2010 class LogInputReader(InputReader
):
2011 """Input reader for a time range of logs via the Logs Reader API.
2013 The number of input shards may be specified by the SHARDS_PARAM mapper
2014 parameter. A starting and ending time (in seconds since the Unix epoch) are
2015 required to generate time ranges over which to shard the input.
2018 START_TIME_PARAM
= "start_time"
2019 END_TIME_PARAM
= "end_time"
2020 MINIMUM_LOG_LEVEL_PARAM
= "minimum_log_level"
2021 INCLUDE_INCOMPLETE_PARAM
= "include_incomplete"
2022 INCLUDE_APP_LOGS_PARAM
= "include_app_logs"
2023 VERSION_IDS_PARAM
= "version_ids"
2026 _OFFSET_PARAM
= "offset"
2027 _PROTOTYPE_REQUEST_PARAM
= "prototype_request"
2029 _PARAMS
= frozenset([START_TIME_PARAM
, END_TIME_PARAM
, _OFFSET_PARAM
,
2030 MINIMUM_LOG_LEVEL_PARAM
, INCLUDE_INCOMPLETE_PARAM
,
2031 INCLUDE_APP_LOGS_PARAM
, VERSION_IDS_PARAM
,
2032 _PROTOTYPE_REQUEST_PARAM
])
2033 _KWARGS
= frozenset([_OFFSET_PARAM
, _PROTOTYPE_REQUEST_PARAM
])
2038 minimum_log_level
=None,
2039 include_incomplete
=False,
2040 include_app_logs
=False,
2046 start_time: The earliest request completion or last-update time of logs
2047 that should be mapped over, in seconds since the Unix epoch.
2048 end_time: The latest request completion or last-update time that logs
2049 should be mapped over, in seconds since the Unix epoch.
2050 minimum_log_level: An application log level which serves as a filter on
2051 the requests mapped over--requests with no application log at or above
2052 the specified level will be omitted, even if include_app_logs is False.
2053 include_incomplete: Whether or not to include requests that have started
2054 but not yet finished, as a boolean. Defaults to False.
2055 include_app_logs: Whether or not to include application level logs in the
2056 mapped logs, as a boolean. Defaults to False.
2057 version_ids: A list of version ids whose logs should be mapped against.
2059 InputReader
.__init
__(self
)
2063 self
.__params
= dict(kwargs
)
2065 if start_time
is not None:
2066 self
.__params
[self
.START_TIME_PARAM
] = start_time
2067 if end_time
is not None:
2068 self
.__params
[self
.END_TIME_PARAM
] = end_time
2069 if minimum_log_level
is not None:
2070 self
.__params
[self
.MINIMUM_LOG_LEVEL_PARAM
] = minimum_log_level
2071 if include_incomplete
is not None:
2072 self
.__params
[self
.INCLUDE_INCOMPLETE_PARAM
] = include_incomplete
2073 if include_app_logs
is not None:
2074 self
.__params
[self
.INCLUDE_APP_LOGS_PARAM
] = include_app_logs
2076 self
.__params
[self
.VERSION_IDS_PARAM
] = version_ids
2079 if self
._PROTOTYPE
_REQUEST
_PARAM
in self
.__params
:
2080 prototype_request
= log_service_pb
.LogReadRequest(
2081 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
])
2082 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
2085 """Iterates over logs in a given range of time.
2088 A RequestLog containing all the information for a single request.
2090 for log
in logservice
.fetch(**self
.__params
):
2091 self
.__params
[self
._OFFSET
_PARAM
] = log
.offset
2095 def from_json(cls
, json
):
2096 """Creates an instance of the InputReader for the given input shard's state.
2099 json: The InputReader state as a dict-like object.
2102 An instance of the InputReader configured using the given JSON parameters.
2105 params
= dict((str(k
), v
) for k
, v
in json
.iteritems()
2106 if k
in cls
._PARAMS
)
2111 if cls
._OFFSET
_PARAM
in params
:
2112 params
[cls
._OFFSET
_PARAM
] = base64
.b64decode(params
[cls
._OFFSET
_PARAM
])
2113 return cls(**params
)
2116 """Returns an input shard state for the remaining inputs.
2119 A JSON serializable version of the remaining input to read.
2122 params
= dict(self
.__params
)
2123 if self
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2124 prototype_request
= params
[self
._PROTOTYPE
_REQUEST
_PARAM
]
2125 params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
.Encode()
2126 if self
._OFFSET
_PARAM
in params
:
2127 params
[self
._OFFSET
_PARAM
] = base64
.b64encode(params
[self
._OFFSET
_PARAM
])
2131 def split_input(cls
, mapper_spec
):
2132 """Returns a list of input readers for the given input specification.
2135 mapper_spec: The MapperSpec for this InputReader.
2138 A list of InputReaders.
2140 params
= _get_params(mapper_spec
)
2141 shard_count
= mapper_spec
.shard_count
2144 start_time
= params
[cls
.START_TIME_PARAM
]
2145 end_time
= params
[cls
.END_TIME_PARAM
]
2146 seconds_per_shard
= (end_time
- start_time
) / shard_count
2150 for _
in xrange(shard_count
- 1):
2151 params
[cls
.END_TIME_PARAM
] = (params
[cls
.START_TIME_PARAM
] +
2153 shards
.append(LogInputReader(**params
))
2154 params
[cls
.START_TIME_PARAM
] = params
[cls
.END_TIME_PARAM
]
2157 params
[cls
.END_TIME_PARAM
] = end_time
2158 return shards
+ [LogInputReader(**params
)]
2161 def validate(cls
, mapper_spec
):
2162 """Validates the mapper's specification and all necessary parameters.
2165 mapper_spec: The MapperSpec to be used with this InputReader.
2168 BadReaderParamsError: If the user fails to specify both a starting time
2169 and an ending time, or if the starting time is later than the ending
2172 if mapper_spec
.input_reader_class() != cls
:
2173 raise errors
.BadReaderParamsError("Input reader class mismatch")
2175 params
= _get_params(mapper_spec
, allowed_keys
=cls
._PARAMS
)
2176 if cls
.VERSION_IDS_PARAM
not in params
:
2177 raise errors
.BadReaderParamsError("Must specify a list of version ids "
2179 if (cls
.START_TIME_PARAM
not in params
or
2180 params
[cls
.START_TIME_PARAM
] is None):
2181 raise errors
.BadReaderParamsError("Must specify a starting time for "
2183 if cls
.END_TIME_PARAM
not in params
or params
[cls
.END_TIME_PARAM
] is None:
2184 params
[cls
.END_TIME_PARAM
] = time
.time()
2186 if params
[cls
.START_TIME_PARAM
] >= params
[cls
.END_TIME_PARAM
]:
2187 raise errors
.BadReaderParamsError("The starting time cannot be later "
2188 "than or the same as the ending time.")
2190 if cls
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2192 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
] = log_service_pb
.LogReadRequest(
2193 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
])
2194 except (TypeError, ProtocolBuffer
.ProtocolBufferDecodeError
):
2195 raise errors
.BadReaderParamsError("The prototype request must be "
2196 "parseable as a LogReadRequest.")
2202 logservice
.fetch(**params
)
2203 except logservice
.InvalidArgumentError
, e
:
2204 raise errors
.BadReaderParamsError("One or more parameters are not valid "
2205 "inputs to logservice.fetch(): %s" % e
)
2208 """Returns the string representation of this LogInputReader."""
2210 for key
in sorted(self
.__params
.keys()):
2211 value
= self
.__params
[key
]
2212 if key
is self
._PROTOTYPE
_REQUEST
_PARAM
:
2213 params
.append("%s='%s'" % (key
, value
))
2214 elif key
is self
._OFFSET
_PARAM
:
2215 params
.append("%s='%s'" % (key
, value
))
2217 params
.append("%s=%s" % (key
, value
))
2219 return "LogInputReader(%s)" % ", ".join(params
)