3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Defines input readers for MapReduce."""
36 "AbstractDatastoreInputReader",
38 "BadReaderParamsError",
39 "BlobstoreLineInputReader",
40 "BlobstoreZipInputReader",
41 "BlobstoreZipLineInputReader",
42 "COUNTER_IO_READ_BYTES",
43 "COUNTER_IO_READ_MSEC",
44 "DatastoreEntityInputReader",
45 "DatastoreInputReader",
46 "DatastoreKeyInputReader",
48 "RandomStringInputReader",
49 "RawDatastoreInputReader",
53 "NamespaceInputReader",
70 from google
.net
.proto
import ProtocolBuffer
71 from google
.appengine
.ext
import ndb
73 from google
.appengine
.api
import datastore
74 from google
.appengine
.api
import files
75 from google
.appengine
.api
import logservice
76 from google
.appengine
.api
.files
import file_service_pb
77 from google
.appengine
.api
.logservice
import log_service_pb
78 from google
.appengine
.ext
import blobstore
79 from google
.appengine
.ext
import db
80 from google
.appengine
.ext
import key_range
81 from google
.appengine
.ext
.db
import metadata
82 from google
.appengine
.ext
.mapreduce
import context
83 from google
.appengine
.ext
.mapreduce
import datastore_range_iterators
as db_iters
84 from google
.appengine
.ext
.mapreduce
import errors
85 from google
.appengine
.ext
.mapreduce
import file_format_parser
86 from google
.appengine
.ext
.mapreduce
import file_format_root
87 from google
.appengine
.ext
.mapreduce
import json_util
88 from google
.appengine
.ext
.mapreduce
import key_ranges
89 from google
.appengine
.ext
.mapreduce
import model
90 from google
.appengine
.ext
.mapreduce
import namespace_range
91 from google
.appengine
.ext
.mapreduce
import operation
92 from google
.appengine
.ext
.mapreduce
import property_range
93 from google
.appengine
.ext
.mapreduce
import records
94 from google
.appengine
.ext
.mapreduce
import util
100 from google
.appengine
.ext
import cloudstorage
101 if hasattr(cloudstorage
, "_STUB"):
109 BadReaderParamsError
= errors
.BadReaderParamsError
113 COUNTER_IO_READ_BYTES
= "io-read-bytes"
116 COUNTER_IO_READ_MSEC
= "io-read-msec"
121 ALLOW_CHECKPOINT
= object()
124 class InputReader(json_util
.JsonMixin
):
125 """Abstract base class for input readers.
127 InputReaders have the following properties:
128 * They are created by using the split_input method to generate a set of
129 InputReaders from a MapperSpec.
130 * They generate inputs to the mapper via the iterator interface.
131 * After creation, they can be serialized and resumed using the JsonMixin
133 * They are cast to string for a user-readable description; it may be
134 valuable to implement __str__.
140 expand_parameters
= False
144 NAMESPACE_PARAM
= "namespace"
145 NAMESPACES_PARAM
= "namespaces"
151 """Returns the next input from this input reader as a key, value pair.
154 The next input from this input reader.
156 raise NotImplementedError("next() not implemented in %s" % self
.__class
__)
159 def from_json(cls
, input_shard_state
):
160 """Creates an instance of the InputReader for the given input shard state.
163 input_shard_state: The InputReader state as a dict-like object.
166 An instance of the InputReader configured using the values of json.
168 raise NotImplementedError("from_json() not implemented in %s" % cls
)
171 """Returns an input shard state for the remaining inputs.
174 A json-izable version of the remaining InputReader.
176 raise NotImplementedError("to_json() not implemented in %s" %
180 def split_input(cls
, mapper_spec
):
181 """Returns a list of input readers.
183 This method creates a list of input readers, each for one shard.
184 It attempts to split inputs among readers evenly.
187 mapper_spec: model.MapperSpec specifies the inputs and additional
188 parameters to define the behavior of input readers.
191 A list of InputReaders. None or [] when no input data can be found.
193 raise NotImplementedError("split_input() not implemented in %s" % cls
)
196 def validate(cls
, mapper_spec
):
197 """Validates mapper spec and all mapper parameters.
199 Input reader parameters are expected to be passed as "input_reader"
200 subdictionary in mapper_spec.params.
202 Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
203 to be compatible, input reader check mapper_spec.params as well and
204 issue a warning if "input_reader" subdicationary is not present.
207 mapper_spec: The MapperSpec for this InputReader.
210 BadReaderParamsError: required parameters are missing or invalid.
212 if mapper_spec
.input_reader_class() != cls
:
213 raise BadReaderParamsError("Input reader class mismatch")
216 def _get_params(mapper_spec
, allowed_keys
=None, allow_old
=True):
217 """Obtain input reader parameters.
219 Utility function for input readers implementation. Fetches parameters
220 from mapreduce specification giving appropriate usage warnings.
223 mapper_spec: The MapperSpec for the job
224 allowed_keys: set of all allowed keys in parameters as strings. If it is not
225 None, then parameters are expected to be in a separate "input_reader"
226 subdictionary of mapper_spec parameters.
227 allow_old: Allow parameters to exist outside of the input_reader
228 subdictionary for compatability.
231 mapper parameters as dict
234 BadReaderParamsError: if parameters are invalid/missing or not allowed.
236 if "input_reader" not in mapper_spec
.params
:
237 message
= ("Input reader's parameters should be specified in "
238 "input_reader subdictionary.")
239 if not allow_old
or allowed_keys
:
240 raise errors
.BadReaderParamsError(message
)
241 params
= mapper_spec
.params
242 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
244 if not isinstance(mapper_spec
.params
.get("input_reader"), dict):
245 raise errors
.BadReaderParamsError(
246 "Input reader parameters should be a dictionary")
247 params
= mapper_spec
.params
.get("input_reader")
248 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
250 params_diff
= set(params
.keys()) - allowed_keys
252 raise errors
.BadReaderParamsError(
253 "Invalid input_reader parameters: %s" % ",".join(params_diff
))
257 class FileInputReader(InputReader
):
258 """Reader to read Files API files of user specified format.
260 This class currently only supports Google Storage files. It will be extended
261 to support blobstore files in the future.
264 files: a list of filenames or filename patterns.
265 filename must be of format '/gs/bucket/filename'.
266 filename pattern has format '/gs/bucket/prefix*'.
267 filename pattern will be expanded to filenames with the given prefix.
268 Please see parseGlob in the file api.files.gs.py which is included in the
269 App Engine SDK for supported patterns.
272 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
273 includes "file1", all files under bucket2, and files under bucket3 with
274 a prefix "p" in its name.
276 format: format string determines what your map function gets as its input.
277 format string can be "lines", "bytes", "zip", or a cascade of them plus
278 optional parameters. See file_formats.FORMATS for all supported formats.
279 See file_format_parser._FileFormatParser for format string syntax.
282 "lines": your map function gets files' contents line by line.
283 "bytes": your map function gets files' contents entirely.
284 "zip": InputReader unzips files and feeds your map function each of
285 the archive's member files as a whole.
286 "zip[bytes]: same as above.
287 "zip[lines]": InputReader unzips files and feeds your map function
288 files' contents line by line.
289 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
290 file with utf32 encoding and feeds your map function line by line.
291 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
292 base64 encoding, unzips each file, reads each of them with utf32
293 encoding and feeds your map function line by line.
295 Note that "encoding" only teaches InputReader how to interpret files.
296 The input your map function gets is always a Python str.
300 FILES_PARAM
= "files"
301 FORMAT_PARAM
= "format"
303 def __init__(self
, format_root
):
304 """Initialize input reader.
307 format_root: a FileFormatRoot instance.
309 self
._file
_format
_root
= format_root
318 start_time
= time
.time()
320 content
= self
._file
_format
_root
.next().read()
323 operation
.counters
.Increment(
324 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
325 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
330 def split_input(cls
, mapper_spec
):
332 params
= _get_params(mapper_spec
)
336 for f
in params
[cls
.FILES_PARAM
]:
337 parsedName
= files
.gs
.parseGlob(f
)
338 if isinstance(parsedName
, tuple):
339 filenames
.extend(files
.gs
.listdir(parsedName
[0],
340 {"prefix": parsedName
[1]}))
342 filenames
.append(parsedName
)
344 file_format_roots
= file_format_root
.split(filenames
,
345 params
[cls
.FORMAT_PARAM
],
346 mapper_spec
.shard_count
)
348 if file_format_roots
is None:
350 return [cls(root
) for root
in file_format_roots
]
353 def validate(cls
, mapper_spec
):
355 if mapper_spec
.input_reader_class() != cls
:
356 raise BadReaderParamsError("Mapper input reader class mismatch")
359 params
= _get_params(mapper_spec
)
360 if cls
.FILES_PARAM
not in params
:
361 raise BadReaderParamsError("Must specify %s" % cls
.FILES_PARAM
)
362 if cls
.FORMAT_PARAM
not in params
:
363 raise BadReaderParamsError("Must specify %s" % cls
.FORMAT_PARAM
)
365 format_string
= params
[cls
.FORMAT_PARAM
]
366 if not isinstance(format_string
, basestring
):
367 raise BadReaderParamsError("format should be string but is %s" %
370 file_format_parser
.parse(format_string
)
371 except ValueError, e
:
372 raise BadReaderParamsError(e
)
374 paths
= params
[cls
.FILES_PARAM
]
375 if not (paths
and isinstance(paths
, list)):
376 raise BadReaderParamsError("files should be a list of filenames.")
381 files
.gs
.parseGlob(path
)
382 except files
.InvalidFileNameError
:
383 raise BadReaderParamsError("Invalid filename %s." % path
)
386 def from_json(cls
, json
):
389 file_format_root
.FileFormatRoot
.from_json(json
["file_format_root"]))
393 return {"file_format_root": self
._file
_format
_root
.to_json()}
396 class AbstractDatastoreInputReader(InputReader
):
397 """Abstract class for datastore input readers."""
403 _MAX_SHARD_COUNT
= 256
408 MAX_NAMESPACES_FOR_KEY_SHARD
= 10
411 ENTITY_KIND_PARAM
= "entity_kind"
412 KEYS_ONLY_PARAM
= "keys_only"
413 BATCH_SIZE_PARAM
= "batch_size"
414 KEY_RANGE_PARAM
= "key_range"
415 FILTERS_PARAM
= "filters"
417 _KEY_RANGE_ITER_CLS
= db_iters
.AbstractKeyRangeIterator
419 def __init__(self
, iterator
):
420 """Create new DatastoreInputReader object.
422 This is internal constructor. Use split_input to create readers instead.
425 iterator: an iterator that generates objects for this input reader.
427 self
._iter
= iterator
430 """Yields whatever internal iterator yields."""
435 """Returns the string representation of this InputReader."""
436 return repr(self
._iter
)
439 """Serializes input reader to json compatible format.
442 all the data in json-compatible map.
444 return self
._iter
.to_json()
447 def from_json(cls
, json
):
448 """Create new DatastoreInputReader from json, encoded by to_json.
451 json: json representation of DatastoreInputReader.
454 an instance of DatastoreInputReader with all data deserialized from json.
456 return cls(db_iters
.RangeIteratorFactory
.from_json(json
))
459 def _get_query_spec(cls
, mapper_spec
):
460 """Construct a model.QuerySpec from model.MapperSpec."""
461 params
= _get_params(mapper_spec
)
462 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
463 filters
= params
.get(cls
.FILTERS_PARAM
)
464 app
= params
.get(cls
._APP
_PARAM
)
465 ns
= params
.get(cls
.NAMESPACE_PARAM
)
467 return model
.QuerySpec(
468 entity_kind
=cls
._get
_raw
_entity
_kind
(entity_kind
),
469 keys_only
=bool(params
.get(cls
.KEYS_ONLY_PARAM
, False)),
471 batch_size
=int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
)),
472 model_class_path
=entity_kind
,
477 def split_input(cls
, mapper_spec
):
479 shard_count
= mapper_spec
.shard_count
480 query_spec
= cls
._get
_query
_spec
(mapper_spec
)
483 if query_spec
.ns
is not None:
484 k_ranges
= cls
._to
_key
_ranges
_by
_shard
(
485 query_spec
.app
, [query_spec
.ns
], shard_count
, query_spec
)
487 ns_keys
= namespace_range
.get_namespace_keys(
488 query_spec
.app
, cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
495 elif len(ns_keys
) <= cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
496 namespaces
= [ns_key
.name() or "" for ns_key
in ns_keys
]
497 k_ranges
= cls
._to
_key
_ranges
_by
_shard
(
498 query_spec
.app
, namespaces
, shard_count
, query_spec
)
501 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
503 can_query
=lambda: True,
505 k_ranges
= [key_ranges
.KeyRangesFactory
.create_from_ns_range(ns_range
)
506 for ns_range
in ns_ranges
]
508 iters
= [db_iters
.RangeIteratorFactory
.create_key_ranges_iterator(
509 r
, query_spec
, cls
._KEY
_RANGE
_ITER
_CLS
) for r
in k_ranges
]
511 return [cls(i
) for i
in iters
]
514 def _to_key_ranges_by_shard(cls
, app
, namespaces
, shard_count
, query_spec
):
515 """Get a list of key_ranges.KeyRanges objects, one for each shard.
517 This method uses scatter index to split each namespace into pieces
518 and assign those pieces to shards.
522 namespaces: a list of namespaces in str.
523 shard_count: number of shards to split.
524 query_spec: model.QuerySpec.
527 a list of key_ranges.KeyRanges objects.
529 key_ranges_by_ns
= []
532 for namespace
in namespaces
:
533 ranges
= cls
._split
_ns
_by
_scatter
(
536 query_spec
.entity_kind
,
540 random
.shuffle(ranges
)
541 key_ranges_by_ns
.append(ranges
)
546 ranges_by_shard
= [[] for _
in range(shard_count
)]
547 for ranges
in key_ranges_by_ns
:
548 for i
, k_range
in enumerate(ranges
):
550 ranges_by_shard
[i
].append(k_range
)
552 key_ranges_by_shard
= []
553 for ranges
in ranges_by_shard
:
555 key_ranges_by_shard
.append(key_ranges
.KeyRangesFactory
.create_from_list(
557 return key_ranges_by_shard
560 def _split_ns_by_scatter(cls
,
565 """Split a namespace by scatter index into key_range.KeyRange.
567 TODO: Power this with key_range.KeyRange.compute_split_points.
570 shard_count: number of shards.
571 namespace: namespace name to split. str.
572 raw_entity_kind: low level datastore API entity kind.
576 A list of key_range.KeyRange objects. If there are not enough entities to
577 splits into requested shards, the returned list will contain KeyRanges
578 ordered lexicographically with any Nones appearing at the end.
582 return [key_range
.KeyRange(namespace
=namespace
, _app
=app
)]
584 ds_query
= datastore
.Query(kind
=raw_entity_kind
,
588 ds_query
.Order("__scatter__")
589 oversampling_factor
= 32
590 random_keys
= ds_query
.Get(shard_count
* oversampling_factor
)
595 return ([key_range
.KeyRange(namespace
=namespace
, _app
=app
)] +
596 [None] * (shard_count
- 1))
600 if len(random_keys
) >= shard_count
:
602 random_keys
= cls
._choose
_split
_points
(random_keys
, shard_count
)
606 k_ranges
.append(key_range
.KeyRange(
608 key_end
=random_keys
[0],
609 direction
=key_range
.KeyRange
.ASC
,
615 for i
in range(0, len(random_keys
) - 1):
616 k_ranges
.append(key_range
.KeyRange(
617 key_start
=random_keys
[i
],
618 key_end
=random_keys
[i
+1],
619 direction
=key_range
.KeyRange
.ASC
,
625 k_ranges
.append(key_range
.KeyRange(
626 key_start
=random_keys
[-1],
628 direction
=key_range
.KeyRange
.ASC
,
634 if len(k_ranges
) < shard_count
:
636 k_ranges
+= [None] * (shard_count
- len(k_ranges
))
640 def _choose_split_points(cls
, sorted_keys
, shard_count
):
641 """Returns the best split points given a random set of datastore.Keys."""
642 assert len(sorted_keys
) >= shard_count
643 index_stride
= len(sorted_keys
) / float(shard_count
)
644 return [sorted_keys
[int(round(index_stride
* i
))]
645 for i
in range(1, shard_count
)]
648 def validate(cls
, mapper_spec
):
650 params
= _get_params(mapper_spec
)
651 if cls
.ENTITY_KIND_PARAM
not in params
:
652 raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
653 if cls
.BATCH_SIZE_PARAM
in params
:
655 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
657 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
658 except ValueError, e
:
659 raise BadReaderParamsError("Bad batch size: %s" % e
)
661 bool(params
.get(cls
.KEYS_ONLY_PARAM
, False))
663 raise BadReaderParamsError("keys_only expects a boolean value but got %s",
664 params
[cls
.KEYS_ONLY_PARAM
])
665 if cls
.NAMESPACE_PARAM
in params
:
666 if not isinstance(params
[cls
.NAMESPACE_PARAM
],
667 (str, unicode, type(None))):
668 raise BadReaderParamsError(
669 "Expected a single namespace string")
670 if cls
.NAMESPACES_PARAM
in params
:
671 raise BadReaderParamsError("Multiple namespaces are no longer supported")
672 if cls
.FILTERS_PARAM
in params
:
673 filters
= params
[cls
.FILTERS_PARAM
]
674 if not isinstance(filters
, list):
675 raise BadReaderParamsError("Expected list for filters parameter")
677 if not isinstance(f
, (tuple, list)):
678 raise BadReaderParamsError("Filter should be a tuple or list: %s", f
)
680 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f
)
682 if not isinstance(prop
, basestring
):
683 raise BadReaderParamsError("Property should be string: %s", prop
)
684 if not isinstance(op
, basestring
):
685 raise BadReaderParamsError("Operator should be string: %s", op
)
688 def _get_raw_entity_kind(cls
, entity_kind_or_model_classpath
):
689 """Returns the entity kind to use with low level datastore calls.
692 entity_kind_or_model_classpath: user specified entity kind or model
696 the entity kind in str to use with low level datastore calls.
698 return entity_kind_or_model_classpath
701 class RawDatastoreInputReader(AbstractDatastoreInputReader
):
702 """Iterates over an entity kind and yields datastore.Entity."""
704 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeEntityIterator
707 def validate(cls
, mapper_spec
):
709 super(RawDatastoreInputReader
, cls
).validate(mapper_spec
)
710 params
= _get_params(mapper_spec
)
711 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
712 if "." in entity_kind
:
714 ". detected in entity kind %s specified for reader %s."
715 "Assuming entity kind contains the dot.",
716 entity_kind
, cls
.__name
__)
717 if cls
.FILTERS_PARAM
in params
:
718 filters
= params
[cls
.FILTERS_PARAM
]
721 raise BadReaderParamsError(
722 "Only equality filters are supported: %s", f
)
725 class DatastoreInputReader(AbstractDatastoreInputReader
):
726 """Iterates over a Model and yields model instances.
728 Supports both db.model and ndb.model.
731 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeModelIterator
734 def _get_raw_entity_kind(cls
, model_classpath
):
735 entity_type
= util
.for_name(model_classpath
)
736 if isinstance(entity_type
, db
.Model
):
737 return entity_type
.kind()
738 elif isinstance(entity_type
, (ndb
.Model
, ndb
.MetaModel
)):
740 return entity_type
._get
_kind
()
742 return util
.get_short_name(model_classpath
)
745 def validate(cls
, mapper_spec
):
747 super(DatastoreInputReader
, cls
).validate(mapper_spec
)
748 params
= _get_params(mapper_spec
)
749 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
752 model_class
= util
.for_name(entity_kind
)
753 except ImportError, e
:
754 raise BadReaderParamsError("Bad entity kind: %s" % e
)
755 if cls
.FILTERS_PARAM
in params
:
756 filters
= params
[cls
.FILTERS_PARAM
]
757 if issubclass(model_class
, db
.Model
):
758 cls
._validate
_filters
(filters
, model_class
)
760 cls
._validate
_filters
_ndb
(filters
, model_class
)
761 property_range
.PropertyRange(filters
, entity_kind
)
764 def _validate_filters(cls
, filters
, model_class
):
765 """Validate user supplied filters.
767 Validate filters are on existing properties and filter values
768 have valid semantics.
771 filters: user supplied filters. Each filter should be a list or tuple of
772 format (<property_name_as_str>, <query_operator_as_str>,
773 <value_of_certain_type>). Value type is up to the property's type.
774 model_class: the db.Model class for the entity type to apply filters on.
777 BadReaderParamsError: if any filter is invalid in any way.
782 properties
= model_class
.properties()
786 if prop
not in properties
:
787 raise errors
.BadReaderParamsError(
788 "Property %s is not defined for entity type %s",
789 prop
, model_class
.kind())
794 properties
[prop
].validate(val
)
795 except db
.BadValueError
, e
:
796 raise errors
.BadReaderParamsError(e
)
800 def _validate_filters_ndb(cls
, filters
, model_class
):
801 """Validate ndb.Model filters."""
805 properties
= model_class
._properties
809 if prop
not in properties
:
810 raise errors
.BadReaderParamsError(
811 "Property %s is not defined for entity type %s",
812 prop
, model_class
._get
_kind
())
817 properties
[prop
]._do
_validate
(val
)
818 except db
.BadValueError
, e
:
819 raise errors
.BadReaderParamsError(e
)
822 def split_input(cls
, mapper_spec
):
824 shard_count
= mapper_spec
.shard_count
825 query_spec
= cls
._get
_query
_spec
(mapper_spec
)
827 if not property_range
.should_shard_by_property_range(query_spec
.filters
):
828 return super(DatastoreInputReader
, cls
).split_input(mapper_spec
)
830 p_range
= property_range
.PropertyRange(query_spec
.filters
,
831 query_spec
.model_class_path
)
832 p_ranges
= p_range
.split(shard_count
)
836 ns_range
= namespace_range
.NamespaceRange(
837 namespace_start
=query_spec
.ns
,
838 namespace_end
=query_spec
.ns
,
840 ns_ranges
= [copy
.copy(ns_range
) for _
in p_ranges
]
842 ns_keys
= namespace_range
.get_namespace_keys(
843 query_spec
.app
, cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
848 if len(ns_keys
) <= cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
849 ns_ranges
= [namespace_range
.NamespaceRange(_app
=query_spec
.app
)
853 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
855 can_query
=lambda: True,
857 p_ranges
= [copy
.copy(p_range
) for _
in ns_ranges
]
859 assert len(p_ranges
) == len(ns_ranges
)
862 db_iters
.RangeIteratorFactory
.create_property_range_iterator(
863 p
, ns
, query_spec
) for p
, ns
in zip(p_ranges
, ns_ranges
)]
864 return [cls(i
) for i
in iters
]
867 class DatastoreKeyInputReader(RawDatastoreInputReader
):
868 """Iterate over an entity kind and yields datastore.Key."""
870 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeKeyIterator
874 DatastoreEntityInputReader
= RawDatastoreInputReader
879 class _OldAbstractDatastoreInputReader(InputReader
):
880 """Abstract base class for classes that iterate over datastore entities.
882 Concrete subclasses must implement _iter_key_range(self, k_range). See the
883 docstring for that method for details.
890 _MAX_SHARD_COUNT
= 256
893 _OVERSAMPLING_FACTOR
= 32
898 MAX_NAMESPACES_FOR_KEY_SHARD
= 10
901 ENTITY_KIND_PARAM
= "entity_kind"
902 KEYS_ONLY_PARAM
= "keys_only"
903 BATCH_SIZE_PARAM
= "batch_size"
904 KEY_RANGE_PARAM
= "key_range"
905 NAMESPACE_RANGE_PARAM
= "namespace_range"
906 CURRENT_KEY_RANGE_PARAM
= "current_key_range"
907 FILTERS_PARAM
= "filters"
917 batch_size
=_BATCH_SIZE
,
918 current_key_range
=None,
920 """Create new AbstractDatastoreInputReader object.
922 This is internal constructor. Use split_query in a concrete class instead.
925 entity_kind: entity kind as string.
926 key_ranges: a sequence of key_range.KeyRange instances to process. Only
927 one of key_ranges or ns_range can be non-None.
928 ns_range: a namespace_range.NamespaceRange to process. Only one of
929 key_ranges or ns_range can be non-None.
930 batch_size: size of read batch as int.
931 current_key_range: the current key_range.KeyRange being processed.
932 filters: optional list of filters to apply to the query. Each filter is
933 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
934 User filters are applied first.
936 assert key_ranges
is not None or ns_range
is not None, (
937 "must specify one of 'key_ranges' or 'ns_range'")
938 assert key_ranges
is None or ns_range
is None, (
939 "can't specify both 'key_ranges ' and 'ns_range'")
941 self
._entity
_kind
= entity_kind
944 self
._key
_ranges
= key_ranges
and list(reversed(key_ranges
))
946 self
._ns
_range
= ns_range
947 self
._batch
_size
= int(batch_size
)
948 self
._current
_key
_range
= current_key_range
949 self
._filters
= filters
952 def _get_raw_entity_kind(cls
, entity_kind
):
953 if "." in entity_kind
:
955 ". detected in entity kind %s specified for reader %s."
956 "Assuming entity kind contains the dot.",
957 entity_kind
, cls
.__name
__)
961 """Iterates over the given KeyRanges or NamespaceRange.
963 This method iterates over the given KeyRanges or NamespaceRange and sets
964 the self._current_key_range to the KeyRange currently being processed. It
965 then delegates to the _iter_key_range method to yield that actual
969 Forwards the objects yielded by the subclasses concrete _iter_key_range()
970 method. The caller must consume the result yielded because self.to_json()
973 if self
._key
_ranges
is not None:
974 for o
in self
._iter
_key
_ranges
():
976 elif self
._ns
_range
is not None:
977 for o
in self
._iter
_ns
_range
():
980 assert False, "self._key_ranges and self._ns_range are both None"
982 def _iter_key_ranges(self
):
983 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
985 if self
._current
_key
_range
is None:
987 self
._current
_key
_range
= self
._key
_ranges
.pop()
994 for key
, o
in self
._iter
_key
_range
(
995 copy
.deepcopy(self
._current
_key
_range
)):
998 self
._current
_key
_range
.advance(key
)
1000 self
._current
_key
_range
= None
1002 def _iter_ns_range(self
):
1003 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
1005 if self
._current
_key
_range
is None:
1006 query
= self
._ns
_range
.make_datastore_query()
1007 namespace_result
= query
.Get(1)
1008 if not namespace_result
:
1011 namespace
= namespace_result
[0].name() or ""
1012 self
._current
_key
_range
= key_range
.KeyRange(
1013 namespace
=namespace
, _app
=self
._ns
_range
.app
)
1014 yield ALLOW_CHECKPOINT
1016 for key
, o
in self
._iter
_key
_range
(
1017 copy
.deepcopy(self
._current
_key
_range
)):
1020 self
._current
_key
_range
.advance(key
)
1023 if (self
._ns
_range
.is_single_namespace
or
1024 self
._current
_key
_range
.namespace
== self
._ns
_range
.namespace_end
):
1026 self
._ns
_range
= self
._ns
_range
.with_start_after(
1027 self
._current
_key
_range
.namespace
)
1028 self
._current
_key
_range
= None
1030 def _iter_key_range(self
, k_range
):
1031 """Yields a db.Key and the value that should be yielded by self.__iter__().
1034 k_range: The key_range.KeyRange to iterate over.
1037 A 2-tuple containing the last db.Key processed and the value that should
1038 be yielded by __iter__. The returned db.Key will be used to determine the
1039 InputReader's current position in self._current_key_range.
1041 raise NotImplementedError("_iter_key_range() not implemented in %s" %
1045 """Returns the string representation of this InputReader."""
1046 if self
._ns
_range
is None:
1047 return repr(self
._key
_ranges
)
1049 return repr(self
._ns
_range
)
1052 def _choose_split_points(cls
, sorted_keys
, shard_count
):
1053 """Returns the best split points given a random set of db.Keys."""
1054 assert len(sorted_keys
) >= shard_count
1055 index_stride
= len(sorted_keys
) / float(shard_count
)
1056 return [sorted_keys
[int(round(index_stride
* i
))]
1057 for i
in range(1, shard_count
)]
1062 def _split_input_from_namespace(cls
, app
, namespace
, entity_kind
,
1064 """Helper for _split_input_from_params.
1066 If there are not enough Entities to make all of the given shards, the
1067 returned list of KeyRanges will include Nones. The returned list will
1068 contain KeyRanges ordered lexographically with any Nones appearing at the
1073 namespace: the namespace.
1074 entity_kind: entity kind as string.
1075 shard_count: the number of shards.
1081 raw_entity_kind
= cls
._get
_raw
_entity
_kind
(entity_kind
)
1082 if shard_count
== 1:
1084 return [key_range
.KeyRange(namespace
=namespace
, _app
=app
)]
1086 ds_query
= datastore
.Query(kind
=raw_entity_kind
,
1087 namespace
=namespace
,
1090 ds_query
.Order("__scatter__")
1091 random_keys
= ds_query
.Get(shard_count
* cls
._OVERSAMPLING
_FACTOR
)
1096 return ([key_range
.KeyRange(namespace
=namespace
, _app
=app
)] +
1097 [None] * (shard_count
- 1))
1101 if len(random_keys
) >= shard_count
:
1103 random_keys
= cls
._choose
_split
_points
(random_keys
, shard_count
)
1108 key_ranges
.append(key_range
.KeyRange(
1110 key_end
=random_keys
[0],
1111 direction
=key_range
.KeyRange
.ASC
,
1112 include_start
=False,
1114 namespace
=namespace
,
1117 for i
in range(0, len(random_keys
) - 1):
1118 key_ranges
.append(key_range
.KeyRange(
1119 key_start
=random_keys
[i
],
1120 key_end
=random_keys
[i
+1],
1121 direction
=key_range
.KeyRange
.ASC
,
1124 namespace
=namespace
,
1127 key_ranges
.append(key_range
.KeyRange(
1128 key_start
=random_keys
[-1],
1130 direction
=key_range
.KeyRange
.ASC
,
1133 namespace
=namespace
,
1136 if len(key_ranges
) < shard_count
:
1138 key_ranges
+= [None] * (shard_count
- len(key_ranges
))
1143 def _split_input_from_params(cls
, app
, namespaces
, entity_kind_name
,
1144 params
, shard_count
):
1145 """Return input reader objects. Helper for split_input."""
1148 for namespace
in namespaces
:
1150 cls
._split
_input
_from
_namespace
(app
,
1158 shared_ranges
= [[] for _
in range(shard_count
)]
1159 for i
, k_range
in enumerate(key_ranges
):
1160 shared_ranges
[i
% shard_count
].append(k_range
)
1161 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
1163 return [cls(entity_kind_name
,
1164 key_ranges
=key_ranges
,
1166 batch_size
=batch_size
)
1167 for key_ranges
in shared_ranges
if key_ranges
]
1170 def validate(cls
, mapper_spec
):
1171 """Validates mapper spec and all mapper parameters.
1174 mapper_spec: The MapperSpec for this InputReader.
1177 BadReaderParamsError: required parameters are missing or invalid.
1179 if mapper_spec
.input_reader_class() != cls
:
1180 raise BadReaderParamsError("Input reader class mismatch")
1181 params
= _get_params(mapper_spec
)
1182 if cls
.ENTITY_KIND_PARAM
not in params
:
1183 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
1184 if cls
.BATCH_SIZE_PARAM
in params
:
1186 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
1188 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
1189 except ValueError, e
:
1190 raise BadReaderParamsError("Bad batch size: %s" % e
)
1191 if cls
.NAMESPACE_PARAM
in params
:
1192 if not isinstance(params
[cls
.NAMESPACE_PARAM
],
1193 (str, unicode, type(None))):
1194 raise BadReaderParamsError(
1195 "Expected a single namespace string")
1196 if cls
.NAMESPACES_PARAM
in params
:
1197 raise BadReaderParamsError("Multiple namespaces are no longer supported")
1198 if cls
.FILTERS_PARAM
in params
:
1199 filters
= params
[cls
.FILTERS_PARAM
]
1200 if not isinstance(filters
, list):
1201 raise BadReaderParamsError("Expected list for filters parameter")
1203 if not isinstance(f
, (tuple, list)):
1204 raise BadReaderParamsError("Filter should be a tuple or list: %s", f
)
1206 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f
)
1207 if not isinstance(f
[0], basestring
):
1208 raise BadReaderParamsError("First element should be string: %s", f
)
1210 raise BadReaderParamsError(
1211 "Only equality filters are supported: %s", f
)
1214 def split_input(cls
, mapper_spec
):
1215 """Splits query into shards without fetching query results.
1217 Tries as best as it can to split the whole query result set into equal
1218 shards. Due to difficulty of making the perfect split, resulting shards'
1219 sizes might differ significantly from each other.
1222 mapper_spec: MapperSpec with params containing 'entity_kind'.
1223 May have 'namespace' in the params as a string containing a single
1224 namespace. If specified then the input reader will only yield values
1225 in the given namespace. If 'namespace' is not given then values from
1226 all namespaces will be yielded. May also have 'batch_size' in the params
1227 to specify the number of entities to process in each batch.
1230 A list of InputReader objects. If the query results are empty then the
1231 empty list will be returned. Otherwise, the list will always have a length
1232 equal to number_of_shards but may be padded with Nones if there are too
1233 few results for effective sharding.
1235 params
= _get_params(mapper_spec
)
1236 entity_kind_name
= params
[cls
.ENTITY_KIND_PARAM
]
1237 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
1238 shard_count
= mapper_spec
.shard_count
1239 namespace
= params
.get(cls
.NAMESPACE_PARAM
)
1240 app
= params
.get(cls
._APP
_PARAM
)
1241 filters
= params
.get(cls
.FILTERS_PARAM
)
1243 if namespace
is None:
1255 namespace_query
= datastore
.Query("__namespace__",
1258 namespace_keys
= namespace_query
.Get(
1259 limit
=cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
1261 if len(namespace_keys
) > cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
1262 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
1265 return [cls(entity_kind_name
,
1268 batch_size
=batch_size
,
1270 for ns_range
in ns_ranges
]
1271 elif not namespace_keys
:
1272 return [cls(entity_kind_name
,
1274 ns_range
=namespace_range
.NamespaceRange(_app
=app
),
1275 batch_size
=shard_count
,
1278 namespaces
= [namespace_key
.name() or ""
1279 for namespace_key
in namespace_keys
]
1281 namespaces
= [namespace
]
1283 readers
= cls
._split
_input
_from
_params
(
1284 app
, namespaces
, entity_kind_name
, params
, shard_count
)
1286 for reader
in readers
:
1287 reader
._filters
= filters
1291 """Serializes all the data in this query range into json form.
1294 all the data in json-compatible map.
1296 if self
._key
_ranges
is None:
1297 key_ranges_json
= None
1299 key_ranges_json
= []
1300 for k
in self
._key
_ranges
:
1302 key_ranges_json
.append(k
.to_json())
1304 key_ranges_json
.append(None)
1306 if self
._ns
_range
is None:
1307 namespace_range_json
= None
1309 namespace_range_json
= self
._ns
_range
.to_json_object()
1311 if self
._current
_key
_range
is None:
1312 current_key_range_json
= None
1314 current_key_range_json
= self
._current
_key
_range
.to_json()
1316 json_dict
= {self
.KEY_RANGE_PARAM
: key_ranges_json
,
1317 self
.NAMESPACE_RANGE_PARAM
: namespace_range_json
,
1318 self
.CURRENT_KEY_RANGE_PARAM
: current_key_range_json
,
1319 self
.ENTITY_KIND_PARAM
: self
._entity
_kind
,
1320 self
.BATCH_SIZE_PARAM
: self
._batch
_size
,
1321 self
.FILTERS_PARAM
: self
._filters
}
1325 def from_json(cls
, json
):
1326 """Create new DatastoreInputReader from the json, encoded by to_json.
1329 json: json map representation of DatastoreInputReader.
1332 an instance of DatastoreInputReader with all data deserialized from json.
1334 if json
[cls
.KEY_RANGE_PARAM
] is None:
1339 for k
in json
[cls
.KEY_RANGE_PARAM
]:
1341 key_ranges
.append(key_range
.KeyRange
.from_json(k
))
1343 key_ranges
.append(None)
1345 if json
[cls
.NAMESPACE_RANGE_PARAM
] is None:
1348 ns_range
= namespace_range
.NamespaceRange
.from_json_object(
1349 json
[cls
.NAMESPACE_RANGE_PARAM
])
1351 if json
[cls
.CURRENT_KEY_RANGE_PARAM
] is None:
1352 current_key_range
= None
1354 current_key_range
= key_range
.KeyRange
.from_json(
1355 json
[cls
.CURRENT_KEY_RANGE_PARAM
])
1358 json
[cls
.ENTITY_KIND_PARAM
],
1361 json
[cls
.BATCH_SIZE_PARAM
],
1363 filters
=json
.get(cls
.FILTERS_PARAM
))
1366 class BlobstoreLineInputReader(InputReader
):
1367 """Input reader for a newline delimited blob in Blobstore."""
1370 _BLOB_BUFFER_SIZE
= 64000
1373 _MAX_SHARD_COUNT
= 256
1376 _MAX_BLOB_KEYS_COUNT
= 246
1379 BLOB_KEYS_PARAM
= "blob_keys"
1382 INITIAL_POSITION_PARAM
= "initial_position"
1383 END_POSITION_PARAM
= "end_position"
1384 BLOB_KEY_PARAM
= "blob_key"
1386 def __init__(self
, blob_key
, start_position
, end_position
):
1387 """Initializes this instance with the given blob key and character range.
1389 This BlobstoreInputReader will read from the first record starting after
1390 strictly after start_position until the first record ending at or after
1391 end_position (exclusive). As an exception, if start_position is 0, then
1392 this InputReader starts reading at the first record.
1395 blob_key: the BlobKey that this input reader is processing.
1396 start_position: the position to start reading at.
1397 end_position: a position in the last record to read.
1399 self
._blob
_key
= blob_key
1400 self
._blob
_reader
= blobstore
.BlobReader(blob_key
,
1401 self
._BLOB
_BUFFER
_SIZE
,
1403 self
._end
_position
= end_position
1404 self
._has
_iterated
= False
1405 self
._read
_before
_start
= bool(start_position
)
1408 """Returns the next input from as an (offset, line) tuple."""
1409 self
._has
_iterated
= True
1411 if self
._read
_before
_start
:
1412 self
._blob
_reader
.readline()
1413 self
._read
_before
_start
= False
1414 start_position
= self
._blob
_reader
.tell()
1416 if start_position
> self
._end
_position
:
1417 raise StopIteration()
1419 line
= self
._blob
_reader
.readline()
1422 raise StopIteration()
1424 return start_position
, line
.rstrip("\n")
1427 """Returns an json-compatible input shard spec for remaining inputs."""
1428 new_pos
= self
._blob
_reader
.tell()
1429 if self
._has
_iterated
:
1431 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1432 self
.INITIAL_POSITION_PARAM
: new_pos
,
1433 self
.END_POSITION_PARAM
: self
._end
_position
}
1436 """Returns the string representation of this BlobstoreLineInputReader."""
1437 return "blobstore.BlobKey(%r):[%d, %d]" % (
1438 self
._blob
_key
, self
._blob
_reader
.tell(), self
._end
_position
)
1441 def from_json(cls
, json
):
1442 """Instantiates an instance of this InputReader for the given shard spec."""
1443 return cls(json
[cls
.BLOB_KEY_PARAM
],
1444 json
[cls
.INITIAL_POSITION_PARAM
],
1445 json
[cls
.END_POSITION_PARAM
])
1448 def validate(cls
, mapper_spec
):
1449 """Validates mapper spec and all mapper parameters.
1452 mapper_spec: The MapperSpec for this InputReader.
1455 BadReaderParamsError: required parameters are missing or invalid.
1457 if mapper_spec
.input_reader_class() != cls
:
1458 raise BadReaderParamsError("Mapper input reader class mismatch")
1459 params
= _get_params(mapper_spec
)
1460 if cls
.BLOB_KEYS_PARAM
not in params
:
1461 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1462 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1463 if isinstance(blob_keys
, basestring
):
1466 blob_keys
= blob_keys
.split(",")
1467 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1468 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1470 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1471 for blob_key
in blob_keys
:
1472 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1474 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1478 def split_input(cls
, mapper_spec
):
1479 """Returns a list of shard_count input_spec_shards for input_spec.
1482 mapper_spec: The mapper specification to split from. Must contain
1483 'blob_keys' parameter with one or more blob keys.
1486 A list of BlobstoreInputReaders corresponding to the specified shards.
1488 params
= _get_params(mapper_spec
)
1489 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1490 if isinstance(blob_keys
, basestring
):
1493 blob_keys
= blob_keys
.split(",")
1496 for blob_key
in blob_keys
:
1497 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1498 blob_sizes
[blob_key
] = blob_info
.size
1500 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1501 shards_per_blob
= shard_count
// len(blob_keys
)
1502 if shards_per_blob
== 0:
1506 for blob_key
, blob_size
in blob_sizes
.items():
1507 blob_chunk_size
= blob_size
// shards_per_blob
1508 for i
in xrange(shards_per_blob
- 1):
1509 chunks
.append(BlobstoreLineInputReader
.from_json(
1510 {cls
.BLOB_KEY_PARAM
: blob_key
,
1511 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* i
,
1512 cls
.END_POSITION_PARAM
: blob_chunk_size
* (i
+ 1)}))
1513 chunks
.append(BlobstoreLineInputReader
.from_json(
1514 {cls
.BLOB_KEY_PARAM
: blob_key
,
1515 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* (shards_per_blob
- 1),
1516 cls
.END_POSITION_PARAM
: blob_size
}))
1520 class BlobstoreZipInputReader(InputReader
):
1521 """Input reader for files from a zip archive stored in the Blobstore.
1523 Each instance of the reader will read the TOC, from the end of the zip file,
1524 and then only the contained files which it is responsible for.
1528 _MAX_SHARD_COUNT
= 256
1531 BLOB_KEY_PARAM
= "blob_key"
1532 START_INDEX_PARAM
= "start_index"
1533 END_INDEX_PARAM
= "end_index"
1535 def __init__(self
, blob_key
, start_index
, end_index
,
1536 _reader
=blobstore
.BlobReader
):
1537 """Initializes this instance with the given blob key and file range.
1539 This BlobstoreZipInputReader will read from the file with index start_index
1540 up to but not including the file with index end_index.
1543 blob_key: the BlobKey that this input reader is processing.
1544 start_index: the index of the first file to read.
1545 end_index: the index of the first file that will not be read.
1546 _reader: a callable that returns a file-like object for reading blobs.
1547 Used for dependency injection.
1549 self
._blob
_key
= blob_key
1550 self
._start
_index
= start_index
1551 self
._end
_index
= end_index
1552 self
._reader
= _reader
1554 self
._entries
= None
1557 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1560 The next input from this input reader, in the form of a 2-tuple.
1561 The first element of the tuple is a zipfile.ZipInfo object.
1562 The second element of the tuple is a zero-argument function that, when
1563 called, returns the complete body of the file.
1566 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1568 self
._entries
= self
._zip
.infolist()[self
._start
_index
:self
._end
_index
]
1569 self
._entries
.reverse()
1570 if not self
._entries
:
1571 raise StopIteration()
1572 entry
= self
._entries
.pop()
1573 self
._start
_index
+= 1
1574 return (entry
, lambda: self
._read
(entry
))
1576 def _read(self
, entry
):
1577 """Read entry content.
1580 entry: zip file entry as zipfile.ZipInfo.
1582 Entry content as string.
1584 start_time
= time
.time()
1585 content
= self
._zip
.read(entry
.filename
)
1589 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1590 operation
.counters
.Increment(
1591 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1596 def from_json(cls
, json
):
1597 """Creates an instance of the InputReader for the given input shard state.
1600 json: The InputReader state as a dict-like object.
1603 An instance of the InputReader configured using the values of json.
1605 return cls(json
[cls
.BLOB_KEY_PARAM
],
1606 json
[cls
.START_INDEX_PARAM
],
1607 json
[cls
.END_INDEX_PARAM
])
1610 """Returns an input shard state for the remaining inputs.
1613 A json-izable version of the remaining InputReader.
1615 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1616 self
.START_INDEX_PARAM
: self
._start
_index
,
1617 self
.END_INDEX_PARAM
: self
._end
_index
}
1620 """Returns the string representation of this BlobstoreZipInputReader."""
1621 return "blobstore.BlobKey(%r):[%d, %d]" % (
1622 self
._blob
_key
, self
._start
_index
, self
._end
_index
)
1625 def validate(cls
, mapper_spec
):
1626 """Validates mapper spec and all mapper parameters.
1629 mapper_spec: The MapperSpec for this InputReader.
1632 BadReaderParamsError: required parameters are missing or invalid.
1634 if mapper_spec
.input_reader_class() != cls
:
1635 raise BadReaderParamsError("Mapper input reader class mismatch")
1636 params
= _get_params(mapper_spec
)
1637 if cls
.BLOB_KEY_PARAM
not in params
:
1638 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1639 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1640 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1642 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1646 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1647 """Returns a list of input shard states for the input spec.
1650 mapper_spec: The MapperSpec for this InputReader. Must contain
1651 'blob_key' parameter with one blob key.
1652 _reader: a callable that returns a file-like object for reading blobs.
1653 Used for dependency injection.
1656 A list of InputReaders spanning files within the zip.
1658 params
= _get_params(mapper_spec
)
1659 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1660 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1661 zfiles
= zip_input
.infolist()
1662 total_size
= sum(x
.file_size
for x
in zfiles
)
1663 num_shards
= min(mapper_spec
.shard_count
, cls
._MAX
_SHARD
_COUNT
)
1664 size_per_shard
= total_size
// num_shards
1668 shard_start_indexes
= [0]
1669 current_shard_size
= 0
1670 for i
, fileinfo
in enumerate(zfiles
):
1671 current_shard_size
+= fileinfo
.file_size
1672 if current_shard_size
>= size_per_shard
:
1673 shard_start_indexes
.append(i
+ 1)
1674 current_shard_size
= 0
1676 if shard_start_indexes
[-1] != len(zfiles
):
1677 shard_start_indexes
.append(len(zfiles
))
1679 return [cls(blob_key
, start_index
, end_index
, _reader
)
1680 for start_index
, end_index
1681 in zip(shard_start_indexes
, shard_start_indexes
[1:])]
1684 class BlobstoreZipLineInputReader(InputReader
):
1685 """Input reader for newline delimited files in zip archives from Blobstore.
1687 This has the same external interface as the BlobstoreLineInputReader, in that
1688 it takes a list of blobs as its input and yields lines to the reader.
1689 However the blobs themselves are expected to be zip archives of line delimited
1690 files instead of the files themselves.
1692 This is useful as many line delimited files gain greatly from compression.
1696 _MAX_SHARD_COUNT
= 256
1699 _MAX_BLOB_KEYS_COUNT
= 246
1702 BLOB_KEYS_PARAM
= "blob_keys"
1705 BLOB_KEY_PARAM
= "blob_key"
1706 START_FILE_INDEX_PARAM
= "start_file_index"
1707 END_FILE_INDEX_PARAM
= "end_file_index"
1708 OFFSET_PARAM
= "offset"
1710 def __init__(self
, blob_key
, start_file_index
, end_file_index
, offset
,
1711 _reader
=blobstore
.BlobReader
):
1712 """Initializes this instance with the given blob key and file range.
1714 This BlobstoreZipLineInputReader will read from the file with index
1715 start_file_index up to but not including the file with index end_file_index.
1716 It will return lines starting at offset within file[start_file_index]
1719 blob_key: the BlobKey that this input reader is processing.
1720 start_file_index: the index of the first file to read within the zip.
1721 end_file_index: the index of the first file that will not be read.
1722 offset: the byte offset within blob_key.zip[start_file_index] to start
1723 reading. The reader will continue to the end of the file.
1724 _reader: a callable that returns a file-like object for reading blobs.
1725 Used for dependency injection.
1727 self
._blob
_key
= blob_key
1728 self
._start
_file
_index
= start_file_index
1729 self
._end
_file
_index
= end_file_index
1730 self
._initial
_offset
= offset
1731 self
._reader
= _reader
1733 self
._entries
= None
1734 self
._filestream
= None
1737 def validate(cls
, mapper_spec
):
1738 """Validates mapper spec and all mapper parameters.
1741 mapper_spec: The MapperSpec for this InputReader.
1744 BadReaderParamsError: required parameters are missing or invalid.
1746 if mapper_spec
.input_reader_class() != cls
:
1747 raise BadReaderParamsError("Mapper input reader class mismatch")
1748 params
= _get_params(mapper_spec
)
1749 if cls
.BLOB_KEYS_PARAM
not in params
:
1750 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1752 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1753 if isinstance(blob_keys
, basestring
):
1756 blob_keys
= blob_keys
.split(",")
1757 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1758 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1760 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1761 for blob_key
in blob_keys
:
1762 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1764 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1768 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1769 """Returns a list of input readers for the input spec.
1772 mapper_spec: The MapperSpec for this InputReader. Must contain
1773 'blob_keys' parameter with one or more blob keys.
1774 _reader: a callable that returns a file-like object for reading blobs.
1775 Used for dependency injection.
1778 A list of InputReaders spanning the subfiles within the blobs.
1779 There will be at least one reader per blob, but it will otherwise
1780 attempt to keep the expanded size even.
1782 params
= _get_params(mapper_spec
)
1783 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1784 if isinstance(blob_keys
, basestring
):
1787 blob_keys
= blob_keys
.split(",")
1791 for blob_key
in blob_keys
:
1792 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1793 blob_files
[blob_key
] = zip_input
.infolist()
1794 total_size
+= sum(x
.file_size
for x
in blob_files
[blob_key
])
1796 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1802 size_per_shard
= total_size
// shard_count
1805 for blob_key
in blob_keys
:
1806 bfiles
= blob_files
[blob_key
]
1807 current_shard_size
= 0
1808 start_file_index
= 0
1810 for fileinfo
in bfiles
:
1811 next_file_index
+= 1
1812 current_shard_size
+= fileinfo
.file_size
1813 if current_shard_size
>= size_per_shard
:
1814 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1816 current_shard_size
= 0
1817 start_file_index
= next_file_index
1818 if current_shard_size
!= 0:
1819 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1825 """Returns the next line from this input reader as (lineinfo, line) tuple.
1828 The next input from this input reader, in the form of a 2-tuple.
1829 The first element of the tuple describes the source, it is itself
1830 a tuple (blobkey, filenumber, byteoffset).
1831 The second element of the tuple is the line found at that offset.
1833 if not self
._filestream
:
1835 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1837 self
._entries
= self
._zip
.infolist()[self
._start
_file
_index
:
1838 self
._end
_file
_index
]
1839 self
._entries
.reverse()
1840 if not self
._entries
:
1841 raise StopIteration()
1842 entry
= self
._entries
.pop()
1843 value
= self
._zip
.read(entry
.filename
)
1844 self
._filestream
= StringIO
.StringIO(value
)
1845 if self
._initial
_offset
:
1846 self
._filestream
.seek(self
._initial
_offset
)
1847 self
._filestream
.readline()
1849 start_position
= self
._filestream
.tell()
1850 line
= self
._filestream
.readline()
1854 self
._filestream
.close()
1855 self
._filestream
= None
1856 self
._start
_file
_index
+= 1
1857 self
._initial
_offset
= 0
1860 return ((self
._blob
_key
, self
._start
_file
_index
, start_position
),
1863 def _next_offset(self
):
1864 """Return the offset of the next line to read."""
1865 if self
._filestream
:
1866 offset
= self
._filestream
.tell()
1870 offset
= self
._initial
_offset
1875 """Returns an input shard state for the remaining inputs.
1878 A json-izable version of the remaining InputReader.
1881 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1882 self
.START_FILE_INDEX_PARAM
: self
._start
_file
_index
,
1883 self
.END_FILE_INDEX_PARAM
: self
._end
_file
_index
,
1884 self
.OFFSET_PARAM
: self
._next
_offset
()}
1887 def from_json(cls
, json
, _reader
=blobstore
.BlobReader
):
1888 """Creates an instance of the InputReader for the given input shard state.
1891 json: The InputReader state as a dict-like object.
1892 _reader: For dependency injection.
1895 An instance of the InputReader configured using the values of json.
1897 return cls(json
[cls
.BLOB_KEY_PARAM
],
1898 json
[cls
.START_FILE_INDEX_PARAM
],
1899 json
[cls
.END_FILE_INDEX_PARAM
],
1900 json
[cls
.OFFSET_PARAM
],
1904 """Returns the string representation of this reader.
1907 string blobkey:[start file num, end file num]:current offset.
1909 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1910 self
._blob
_key
, self
._start
_file
_index
, self
._end
_file
_index
,
1911 self
._next
_offset
())
1914 class RandomStringInputReader(InputReader
):
1915 """RandomStringInputReader generates random strings as output.
1917 Primary usage is to populate output with testing entries.
1923 STRING_LENGTH
= "string_length"
1925 DEFAULT_STRING_LENGTH
= 10
1927 def __init__(self
, count
, string_length
):
1928 """Initialize input reader.
1931 count: number of entries this shard should generate.
1932 string_length: the length of generated random strings.
1935 self
._string
_length
= string_length
1942 start_time
= time
.time()
1943 content
= "".join(random
.choice(string
.ascii_lowercase
)
1944 for _
in range(self
._string
_length
))
1946 operation
.counters
.Increment(
1947 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1948 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1952 def split_input(cls
, mapper_spec
):
1953 params
= _get_params(mapper_spec
)
1954 count
= params
[cls
.COUNT
]
1955 string_length
= cls
.DEFAULT_STRING_LENGTH
1956 if cls
.STRING_LENGTH
in params
:
1957 string_length
= params
[cls
.STRING_LENGTH
]
1959 shard_count
= mapper_spec
.shard_count
1960 count_per_shard
= count
// shard_count
1962 mr_input_readers
= [
1963 cls(count_per_shard
, string_length
) for _
in range(shard_count
)]
1965 left
= count
- count_per_shard
*shard_count
1967 mr_input_readers
.append(cls(left
, string_length
))
1969 return mr_input_readers
1972 def validate(cls
, mapper_spec
):
1973 if mapper_spec
.input_reader_class() != cls
:
1974 raise BadReaderParamsError("Mapper input reader class mismatch")
1976 params
= _get_params(mapper_spec
)
1977 if cls
.COUNT
not in params
:
1978 raise BadReaderParamsError("Must specify %s" % cls
.COUNT
)
1979 if not isinstance(params
[cls
.COUNT
], int):
1980 raise BadReaderParamsError("%s should be an int but is %s" %
1981 (cls
.COUNT
, type(params
[cls
.COUNT
])))
1982 if params
[cls
.COUNT
] <= 0:
1983 raise BadReaderParamsError("%s should be a positive int")
1984 if cls
.STRING_LENGTH
in params
and not (
1985 isinstance(params
[cls
.STRING_LENGTH
], int) and
1986 params
[cls
.STRING_LENGTH
] > 0):
1987 raise BadReaderParamsError("%s should be a positive int but is %s" %
1988 (cls
.STRING_LENGTH
, params
[cls
.STRING_LENGTH
]))
1989 if (not isinstance(mapper_spec
.shard_count
, int) or
1990 mapper_spec
.shard_count
<= 0):
1991 raise BadReaderParamsError(
1992 "shard_count should be a positive int but is %s" %
1993 mapper_spec
.shard_count
)
1996 def from_json(cls
, json
):
1997 return cls(json
[cls
.COUNT
], json
[cls
.STRING_LENGTH
])
2000 return {self
.COUNT
: self
._count
, self
.STRING_LENGTH
: self
._string
_length
}
2009 class NamespaceInputReader(InputReader
):
2010 """An input reader to iterate over namespaces.
2012 This reader yields namespace names as string.
2013 It will always produce only one shard.
2016 NAMESPACE_RANGE_PARAM
= "namespace_range"
2017 BATCH_SIZE_PARAM
= "batch_size"
2020 def __init__(self
, ns_range
, batch_size
=_BATCH_SIZE
):
2021 self
.ns_range
= ns_range
2022 self
._batch
_size
= batch_size
2025 """Serializes all the data in this query range into json form.
2028 all the data in json-compatible map.
2030 return {self
.NAMESPACE_RANGE_PARAM
: self
.ns_range
.to_json_object(),
2031 self
.BATCH_SIZE_PARAM
: self
._batch
_size
}
2034 def from_json(cls
, json
):
2035 """Create new DatastoreInputReader from the json, encoded by to_json.
2038 json: json map representation of DatastoreInputReader.
2041 an instance of DatastoreInputReader with all data deserialized from json.
2044 namespace_range
.NamespaceRange
.from_json_object(
2045 json
[cls
.NAMESPACE_RANGE_PARAM
]),
2046 json
[cls
.BATCH_SIZE_PARAM
])
2049 def validate(cls
, mapper_spec
):
2050 """Validates mapper spec.
2053 mapper_spec: The MapperSpec for this InputReader.
2056 BadReaderParamsError: required parameters are missing or invalid.
2058 if mapper_spec
.input_reader_class() != cls
:
2059 raise BadReaderParamsError("Input reader class mismatch")
2060 params
= _get_params(mapper_spec
)
2061 if cls
.BATCH_SIZE_PARAM
in params
:
2063 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
2065 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
2066 except ValueError, e
:
2067 raise BadReaderParamsError("Bad batch size: %s" % e
)
2070 def split_input(cls
, mapper_spec
):
2071 """Returns a list of input readers for the input spec.
2074 mapper_spec: The MapperSpec for this InputReader.
2077 A list of InputReaders.
2079 batch_size
= int(_get_params(mapper_spec
).get(
2080 cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
2081 shard_count
= mapper_spec
.shard_count
2082 namespace_ranges
= namespace_range
.NamespaceRange
.split(shard_count
,
2084 return [NamespaceInputReader(ns_range
, batch_size
)
2085 for ns_range
in namespace_ranges
]
2089 keys
= self
.ns_range
.make_datastore_query().Get(limit
=self
._batch
_size
)
2094 namespace
= metadata
.Namespace
.key_to_namespace(key
)
2095 self
.ns_range
= self
.ns_range
.with_start_after(namespace
)
2099 return repr(self
.ns_range
)
2102 class RecordsReader(InputReader
):
2103 """Reader to read a list of Files API file in records format.
2105 The number of input shards can be specified by the SHARDS_PARAM
2106 mapper parameter. Input files cannot be split, so there will be at most
2107 one shard per file. Also the number of shards will not be reduced based on
2108 the number of input files, so shards in always equals shards out.
2112 FILES_PARAM
= "files"
2114 def __init__(self
, filenames
, position
):
2118 filenames: list of filenames.
2119 position: file position to start reading from as int.
2121 self
._filenames
= filenames
2123 self
._reader
= records
.RecordsReader(
2124 files
.BufferedFile(self
._filenames
[0]))
2125 self
._reader
.seek(position
)
2130 """Iterate over records in file.
2139 start_time
= time
.time()
2140 record
= self
._reader
.read()
2142 operation
.counters
.Increment(
2143 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2144 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(record
))(ctx
)
2146 except (files
.ExistenceError
), e
:
2147 raise errors
.FailJobError("ExistenceError: %s" % e
)
2148 except (files
.UnknownError
), e
:
2149 raise errors
.RetrySliceError("UnknownError: %s" % e
)
2151 self
._filenames
.pop(0)
2152 if not self
._filenames
:
2155 self
._reader
= records
.RecordsReader(
2156 files
.BufferedFile(self
._filenames
[0]))
2159 def from_json(cls
, json
):
2160 """Creates an instance of the InputReader for the given input shard state.
2163 json: The InputReader state as a dict-like object.
2166 An instance of the InputReader configured using the values of json.
2168 return cls(json
["filenames"], json
["position"])
2171 """Returns an input shard state for the remaining inputs.
2174 A json-izable version of the remaining InputReader.
2177 "filenames": self
._filenames
,
2181 result
["position"] = self
._reader
.tell()
2185 def split_input(cls
, mapper_spec
):
2186 """Returns a list of input readers for the input spec.
2189 mapper_spec: The MapperSpec for this InputReader.
2192 A list of InputReaders.
2194 params
= _get_params(mapper_spec
)
2195 shard_count
= mapper_spec
.shard_count
2197 if cls
.FILES_PARAM
in params
:
2198 filenames
= params
[cls
.FILES_PARAM
]
2199 if isinstance(filenames
, basestring
):
2200 filenames
= filenames
.split(",")
2202 filenames
= [params
[cls
.FILE_PARAM
]]
2204 batch_list
= [[] for _
in xrange(shard_count
)]
2205 for index
, _
in enumerate(filenames
):
2207 batch_list
[index
% shard_count
].append(filenames
[index
])
2210 batch_list
.sort(reverse
=True, key
=len)
2211 return [cls(batch
, 0) for batch
in batch_list
]
2214 def validate(cls
, mapper_spec
):
2215 """Validates mapper spec and all mapper parameters.
2218 mapper_spec: The MapperSpec for this InputReader.
2221 BadReaderParamsError: required parameters are missing or invalid.
2223 if mapper_spec
.input_reader_class() != cls
:
2224 raise errors
.BadReaderParamsError("Input reader class mismatch")
2225 params
= _get_params(mapper_spec
)
2226 if (cls
.FILES_PARAM
not in params
and
2227 cls
.FILE_PARAM
not in params
):
2228 raise BadReaderParamsError(
2229 "Must specify '%s' or '%s' parameter for mapper input" %
2230 (cls
.FILES_PARAM
, cls
.FILE_PARAM
))
2235 position
= self
._reader
.tell()
2236 return "%s:%s" % (self
._filenames
, position
)
2239 class LogInputReader(InputReader
):
2240 """Input reader for a time range of logs via the Logs Reader API.
2242 The number of input shards may be specified by the SHARDS_PARAM mapper
2243 parameter. A starting and ending time (in seconds since the Unix epoch) are
2244 required to generate time ranges over which to shard the input.
2247 START_TIME_PARAM
= "start_time"
2248 END_TIME_PARAM
= "end_time"
2249 MINIMUM_LOG_LEVEL_PARAM
= "minimum_log_level"
2250 INCLUDE_INCOMPLETE_PARAM
= "include_incomplete"
2251 INCLUDE_APP_LOGS_PARAM
= "include_app_logs"
2252 VERSION_IDS_PARAM
= "version_ids"
2253 MODULE_VERSIONS_PARAM
= "module_versions"
2256 _OFFSET_PARAM
= "offset"
2257 _PROTOTYPE_REQUEST_PARAM
= "prototype_request"
2259 _PARAMS
= frozenset([START_TIME_PARAM
, END_TIME_PARAM
, _OFFSET_PARAM
,
2260 MINIMUM_LOG_LEVEL_PARAM
, INCLUDE_INCOMPLETE_PARAM
,
2261 INCLUDE_APP_LOGS_PARAM
, VERSION_IDS_PARAM
,
2262 MODULE_VERSIONS_PARAM
, _PROTOTYPE_REQUEST_PARAM
])
2263 _KWARGS
= frozenset([_OFFSET_PARAM
, _PROTOTYPE_REQUEST_PARAM
])
2268 minimum_log_level
=None,
2269 include_incomplete
=False,
2270 include_app_logs
=False,
2272 module_versions
=None,
2277 start_time: The earliest request completion or last-update time of logs
2278 that should be mapped over, in seconds since the Unix epoch.
2279 end_time: The latest request completion or last-update time that logs
2280 should be mapped over, in seconds since the Unix epoch.
2281 minimum_log_level: An application log level which serves as a filter on
2282 the requests mapped over--requests with no application log at or above
2283 the specified level will be omitted, even if include_app_logs is False.
2284 include_incomplete: Whether or not to include requests that have started
2285 but not yet finished, as a boolean. Defaults to False.
2286 include_app_logs: Whether or not to include application level logs in the
2287 mapped logs, as a boolean. Defaults to False.
2288 version_ids: A list of version ids whose logs should be read. This can not
2289 be used with module_versions
2290 module_versions: A list of tuples containing a module and version id
2291 whose logs should be read. This can not be used with version_ids
2292 **kwargs: A dictionary of keywords associated with this input reader.
2294 InputReader
.__init
__(self
)
2298 self
.__params
= dict(kwargs
)
2300 if start_time
is not None:
2301 self
.__params
[self
.START_TIME_PARAM
] = start_time
2302 if end_time
is not None:
2303 self
.__params
[self
.END_TIME_PARAM
] = end_time
2304 if minimum_log_level
is not None:
2305 self
.__params
[self
.MINIMUM_LOG_LEVEL_PARAM
] = minimum_log_level
2306 if include_incomplete
is not None:
2307 self
.__params
[self
.INCLUDE_INCOMPLETE_PARAM
] = include_incomplete
2308 if include_app_logs
is not None:
2309 self
.__params
[self
.INCLUDE_APP_LOGS_PARAM
] = include_app_logs
2311 self
.__params
[self
.VERSION_IDS_PARAM
] = version_ids
2313 self
.__params
[self
.MODULE_VERSIONS_PARAM
] = module_versions
2316 if self
._PROTOTYPE
_REQUEST
_PARAM
in self
.__params
:
2317 prototype_request
= log_service_pb
.LogReadRequest(
2318 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
])
2319 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
2322 """Iterates over logs in a given range of time.
2325 A RequestLog containing all the information for a single request.
2327 for log
in logservice
.fetch(**self
.__params
):
2328 self
.__params
[self
._OFFSET
_PARAM
] = log
.offset
2332 def from_json(cls
, json
):
2333 """Creates an instance of the InputReader for the given input shard's state.
2336 json: The InputReader state as a dict-like object.
2339 An instance of the InputReader configured using the given JSON parameters.
2342 params
= dict((str(k
), v
) for k
, v
in json
.iteritems()
2343 if k
in cls
._PARAMS
)
2348 if cls
._OFFSET
_PARAM
in params
:
2349 params
[cls
._OFFSET
_PARAM
] = base64
.b64decode(params
[cls
._OFFSET
_PARAM
])
2350 return cls(**params
)
2353 """Returns an input shard state for the remaining inputs.
2356 A JSON serializable version of the remaining input to read.
2359 params
= dict(self
.__params
)
2360 if self
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2361 prototype_request
= params
[self
._PROTOTYPE
_REQUEST
_PARAM
]
2362 params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
.Encode()
2363 if self
._OFFSET
_PARAM
in params
:
2364 params
[self
._OFFSET
_PARAM
] = base64
.b64encode(params
[self
._OFFSET
_PARAM
])
2368 def split_input(cls
, mapper_spec
):
2369 """Returns a list of input readers for the given input specification.
2372 mapper_spec: The MapperSpec for this InputReader.
2375 A list of InputReaders.
2377 params
= _get_params(mapper_spec
)
2378 shard_count
= mapper_spec
.shard_count
2381 start_time
= params
[cls
.START_TIME_PARAM
]
2382 end_time
= params
[cls
.END_TIME_PARAM
]
2383 seconds_per_shard
= (end_time
- start_time
) / shard_count
2387 for _
in xrange(shard_count
- 1):
2388 params
[cls
.END_TIME_PARAM
] = (params
[cls
.START_TIME_PARAM
] +
2390 shards
.append(LogInputReader(**params
))
2391 params
[cls
.START_TIME_PARAM
] = params
[cls
.END_TIME_PARAM
]
2394 params
[cls
.END_TIME_PARAM
] = end_time
2395 return shards
+ [LogInputReader(**params
)]
2398 def validate(cls
, mapper_spec
):
2399 """Validates the mapper's specification and all necessary parameters.
2402 mapper_spec: The MapperSpec to be used with this InputReader.
2405 BadReaderParamsError: If the user fails to specify both a starting time
2406 and an ending time, or if the starting time is later than the ending
2409 if mapper_spec
.input_reader_class() != cls
:
2410 raise errors
.BadReaderParamsError("Input reader class mismatch")
2412 params
= _get_params(mapper_spec
, allowed_keys
=cls
._PARAMS
)
2413 if (cls
.VERSION_IDS_PARAM
not in params
and
2414 cls
.MODULE_VERSIONS_PARAM
not in params
):
2415 raise errors
.BadReaderParamsError("Must specify a list of version ids or "
2416 "module/version ids for mapper input")
2417 if (cls
.VERSION_IDS_PARAM
in params
and
2418 cls
.MODULE_VERSIONS_PARAM
in params
):
2419 raise errors
.BadReaderParamsError("Can not supply both version ids or "
2420 "module/version ids. Use only one.")
2421 if (cls
.START_TIME_PARAM
not in params
or
2422 params
[cls
.START_TIME_PARAM
] is None):
2423 raise errors
.BadReaderParamsError("Must specify a starting time for "
2425 if cls
.END_TIME_PARAM
not in params
or params
[cls
.END_TIME_PARAM
] is None:
2426 params
[cls
.END_TIME_PARAM
] = time
.time()
2428 if params
[cls
.START_TIME_PARAM
] >= params
[cls
.END_TIME_PARAM
]:
2429 raise errors
.BadReaderParamsError("The starting time cannot be later "
2430 "than or the same as the ending time.")
2432 if cls
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2434 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
] = log_service_pb
.LogReadRequest(
2435 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
])
2436 except (TypeError, ProtocolBuffer
.ProtocolBufferDecodeError
):
2437 raise errors
.BadReaderParamsError("The prototype request must be "
2438 "parseable as a LogReadRequest.")
2444 logservice
.fetch(**params
)
2445 except logservice
.InvalidArgumentError
, e
:
2446 raise errors
.BadReaderParamsError("One or more parameters are not valid "
2447 "inputs to logservice.fetch(): %s" % e
)
2450 """Returns the string representation of this LogInputReader."""
2452 for key
in sorted(self
.__params
.keys()):
2453 value
= self
.__params
[key
]
2454 if key
is self
._PROTOTYPE
_REQUEST
_PARAM
:
2455 params
.append("%s='%s'" % (key
, value
))
2456 elif key
is self
._OFFSET
_PARAM
:
2457 params
.append("%s='%s'" % (key
, value
))
2459 params
.append("%s=%s" % (key
, value
))
2461 return "LogInputReader(%s)" % ", ".join(params
)
2464 class _GoogleCloudStorageInputReader(InputReader
):
2465 """Input reader from Google Cloud Storage using the cloudstorage library.
2467 This class is expected to be subclassed with a reader that understands
2470 Required configuration in the mapper_spec.input_reader dictionary.
2471 BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
2472 suffixed such as directories.
2473 OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
2474 in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
2475 treated as prefix and all objects with matching names will be read.
2476 Entries should not start with a slash unless that is part of the object's
2477 name. An example list could be:
2478 ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
2479 To retrieve all files "*" will match every object in the bucket. If a file
2480 is listed twice or is covered by multiple prefixes it will be read twice,
2481 there is no deduplication.
2483 Optional configuration in the mapper_sec.input_reader dictionary.
2484 BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
2485 DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
2486 The delimiter is used as a path separator to designate directory
2487 hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
2488 will stop at the first directory instead of matching
2489 all files under the directory. This allows MR to process bucket with
2490 hundreds of thousands of files.
2494 BUCKET_NAME_PARAM
= "bucket_name"
2495 OBJECT_NAMES_PARAM
= "objects"
2496 BUFFER_SIZE_PARAM
= "buffer_size"
2497 DELIMITER_PARAM
= "delimiter"
2500 _ACCOUNT_ID_PARAM
= "account_id"
2503 _JSON_PICKLE
= "pickle"
2504 _STRING_MAX_FILES_LISTED
= 10
2512 def __init__(self
, filenames
, index
=0, buffer_size
=None, _account_id
=None,
2514 """Initialize a GoogleCloudStorageInputReader instance.
2517 filenames: A list of Google Cloud Storage filenames of the form
2518 '/bucket/objectname'.
2519 index: Index of the next filename to read.
2520 buffer_size: The size of the read buffer, None to use default.
2521 _account_id: Internal use only. See cloudstorage documentation.
2522 delimiter: Delimiter used as path separator. See class doc for details.
2524 self
._filenames
= filenames
2526 self
._buffer
_size
= buffer_size
2527 self
._account
_id
= _account_id
2528 self
._delimiter
= delimiter
2530 self
._bucket
_iter
= None
2532 def _next_file(self
):
2533 """Find next filename.
2535 self._filenames may need to be expanded via listbucket.
2538 None if no more file is left. Filename otherwise.
2541 if self
._bucket
_iter
:
2543 return self
._bucket
_iter
.next().filename
2544 except StopIteration:
2545 self
._bucket
_iter
= None
2547 if self
._index
>= len(self
._filenames
):
2549 filename
= self
._filenames
[self
._index
]
2551 if self
._delimiter
is None or not filename
.endswith(self
._delimiter
):
2553 self
._bucket
= cloudstorage
.listbucket(filename
,
2554 delimiter
=self
._delimiter
)
2555 self
._bucket
_iter
= iter(self
._bucket
)
2558 def get_params(cls
, mapper_spec
, allowed_keys
=None, allow_old
=True):
2559 params
= _get_params(mapper_spec
, allowed_keys
, allow_old
)
2562 if (mapper_spec
.params
.get(cls
.BUCKET_NAME_PARAM
) is not None and
2563 params
.get(cls
.BUCKET_NAME_PARAM
) is None):
2564 params
[cls
.BUCKET_NAME_PARAM
] = mapper_spec
.params
[cls
.BUCKET_NAME_PARAM
]
2568 def validate(cls
, mapper_spec
):
2569 """Validate mapper specification.
2572 mapper_spec: an instance of model.MapperSpec
2575 BadReaderParamsError: if the specification is invalid for any reason such
2576 as missing the bucket name or providing an invalid bucket name.
2578 reader_spec
= cls
.get_params(mapper_spec
, allow_old
=False)
2581 if cls
.BUCKET_NAME_PARAM
not in reader_spec
:
2582 raise errors
.BadReaderParamsError(
2583 "%s is required for Google Cloud Storage" %
2584 cls
.BUCKET_NAME_PARAM
)
2586 cloudstorage
.validate_bucket_name(
2587 reader_spec
[cls
.BUCKET_NAME_PARAM
])
2588 except ValueError, error
:
2589 raise errors
.BadReaderParamsError("Bad bucket name, %s" % (error
))
2592 if cls
.OBJECT_NAMES_PARAM
not in reader_spec
:
2593 raise errors
.BadReaderParamsError(
2594 "%s is required for Google Cloud Storage" %
2595 cls
.OBJECT_NAMES_PARAM
)
2596 filenames
= reader_spec
[cls
.OBJECT_NAMES_PARAM
]
2597 if not isinstance(filenames
, list):
2598 raise errors
.BadReaderParamsError(
2599 "Object name list is not a list but a %s" %
2600 filenames
.__class
__.__name
__)
2601 for filename
in filenames
:
2602 if not isinstance(filename
, basestring
):
2603 raise errors
.BadReaderParamsError(
2604 "Object name is not a string but a %s" %
2605 filename
.__class
__.__name
__)
2606 if cls
.DELIMITER_PARAM
in reader_spec
:
2607 delimiter
= reader_spec
[cls
.DELIMITER_PARAM
]
2608 if not isinstance(delimiter
, basestring
):
2609 raise errors
.BadReaderParamsError(
2610 "%s is not a string but a %s" %
2611 (cls
.DELIMITER_PARAM
, type(delimiter
)))
2614 def split_input(cls
, mapper_spec
):
2615 """Returns a list of input readers.
2617 An equal number of input files are assigned to each shard (+/- 1). If there
2618 are fewer files than shards, fewer than the requested number of shards will
2619 be used. Input files are currently never split (although for some formats
2620 could be and may be split in a future implementation).
2623 mapper_spec: an instance of model.MapperSpec.
2626 A list of InputReaders. None when no input data can be found.
2628 reader_spec
= cls
.get_params(mapper_spec
, allow_old
=False)
2629 bucket
= reader_spec
[cls
.BUCKET_NAME_PARAM
]
2630 filenames
= reader_spec
[cls
.OBJECT_NAMES_PARAM
]
2631 delimiter
= reader_spec
.get(cls
.DELIMITER_PARAM
)
2632 account_id
= reader_spec
.get(cls
._ACCOUNT
_ID
_PARAM
)
2633 buffer_size
= reader_spec
.get(cls
.BUFFER_SIZE_PARAM
)
2637 for filename
in filenames
:
2638 if filename
.endswith("*"):
2639 all_filenames
.extend(
2640 [file_stat
.filename
for file_stat
in cloudstorage
.listbucket(
2641 "/" + bucket
+ "/" + filename
[:-1], delimiter
=delimiter
,
2642 _account_id
=account_id
)])
2644 all_filenames
.append("/%s/%s" % (bucket
, filename
))
2648 for shard
in range(0, mapper_spec
.shard_count
):
2649 shard_filenames
= all_filenames
[shard
::mapper_spec
.shard_count
]
2652 shard_filenames
, buffer_size
=buffer_size
, _account_id
=account_id
,
2653 delimiter
=delimiter
))
2657 def from_json(cls
, state
):
2658 obj
= pickle
.loads(state
[cls
._JSON
_PICKLE
])
2660 obj
._bucket
_iter
= iter(obj
._bucket
)
2664 before_iter
= self
._bucket
_iter
2665 self
._bucket
_iter
= None
2667 return {self
._JSON
_PICKLE
: pickle
.dumps(self
)}
2669 self
._bucket
_itr
= before_iter
2672 """Returns the next input from this input reader, a block of bytes.
2674 Non existent files will be logged and skipped. The file might have been
2675 removed after input splitting.
2678 The next input from this input reader in the form of a cloudstorage
2679 ReadBuffer that supports a File-like interface (read, readline, seek,
2680 tell, and close). An error may be raised if the file can not be opened.
2683 StopIteration: The list of files has been exhausted.
2686 if self
._buffer
_size
:
2687 options
["read_buffer_size"] = self
._buffer
_size
2688 if self
._account
_id
:
2689 options
["_account_id"] = self
._account
_id
2691 filename
= self
._next
_file
()
2692 if filename
is None:
2693 raise StopIteration()
2695 start_time
= time
.time()
2696 handle
= cloudstorage
.open(filename
, **options
)
2700 operation
.counters
.Increment(
2701 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2704 except cloudstorage
.NotFoundError
:
2705 logging
.warning("File %s may have been removed. Skipping file.",
2710 num_files
= len(self
._filenames
)
2711 if num_files
> self
._STRING
_MAX
_FILES
_LISTED
:
2712 names
= "%s...%s + %d not shown" % (
2713 ",".join(self
._filenames
[0:self
._STRING
_MAX
_FILES
_LISTED
-1]),
2714 self
._filenames
[-1],
2715 num_files
- self
._STRING
_MAX
_FILES
_LISTED
)
2717 names
= ",".join(self
._filenames
)
2719 if self
._index
> num_files
:
2722 status
= "Next %s (%d of %d)" % (
2723 self
._filenames
[self
._index
],
2726 return "CloudStorage [%s, %s]" % (status
, names
)
2729 class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader
):
2730 """Read data from a Google Cloud Storage file using LevelDB format.
2732 See the _GoogleCloudStorageOutputWriter for additional configuration options.
2735 def __getstate__(self
):
2736 result
= self
.__dict
__.copy()
2738 if "_record_reader" in result
:
2741 result
.pop("_record_reader")
2745 """Returns the next input from this input reader, a record.
2748 The next input from this input reader in the form of a record read from
2752 StopIteration: The ordered set records has been exhausted.
2755 if not hasattr(self
, "_cur_handle") or self
._cur
_handle
is None:
2757 self
._cur
_handle
= super(_GoogleCloudStorageRecordInputReader
,
2759 if not hasattr(self
, "_record_reader") or self
._record
_reader
is None:
2760 self
._record
_reader
= records
.RecordsReader(self
._cur
_handle
)
2763 start_time
= time
.time()
2764 content
= self
._record
_reader
.read()
2768 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
2769 operation
.counters
.Increment(
2770 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2774 self
._cur
_handle
= None
2775 self
._record
_reader
= None
2778 class _ReducerReader(_GoogleCloudStorageRecordInputReader
):
2779 """Reader to read KeyValues records from GCS."""
2781 expand_parameters
= True
2783 def __init__(self
, filenames
, index
=0, buffer_size
=None, _account_id
=None,
2785 super(_ReducerReader
, self
).__init
__(filenames
, index
, buffer_size
,
2786 _account_id
, delimiter
)
2787 self
.current_key
= None
2788 self
.current_values
= None
2795 combiner_spec
= ctx
.mapreduce_spec
.mapper
.params
.get("combiner_spec")
2797 combiner
= util
.handler_for_name(combiner_spec
)
2801 binary_record
= super(_ReducerReader
, self
).next()
2802 proto
= file_service_pb
.KeyValues()
2803 proto
.ParseFromString(binary_record
)
2806 if self
.current_key
is not None and self
.current_key
!= proto
.key():
2807 to_yield
= (self
.current_key
, self
.current_values
)
2808 self
.current_key
= None
2809 self
.current_values
= None
2811 if self
.current_key
is None:
2812 self
.current_key
= proto
.key()
2813 self
.current_values
= []
2816 combiner_result
= combiner(
2817 self
.current_key
, proto
.value_list(), self
.current_values
)
2819 if not util
.is_generator(combiner_result
):
2820 raise errors
.BadCombinerOutputError(
2821 "Combiner %s should yield values instead of returning them "
2822 "(%s)" % (combiner
, combiner_result
))
2824 self
.current_values
= []
2825 for value
in combiner_result
:
2826 if isinstance(value
, operation
.Operation
):
2830 self
.current_values
.append(value
)
2836 yield ALLOW_CHECKPOINT
2839 self
.current_values
.extend(proto
.value_list())
2844 yield ALLOW_CHECKPOINT
2845 except StopIteration:
2850 if self
.current_key
is not None:
2851 to_yield
= (self
.current_key
, self
.current_values
)
2852 self
.current_key
= None
2853 self
.current_values
= None
2857 def encode_data(data
):
2858 """Encodes the given data, which may have include raw bytes.
2860 Works around limitations in JSON encoding, which cannot handle raw bytes.
2863 data: the data to encode.
2868 return base64
.b64encode(pickle
.dumps(data
))
2871 def decode_data(data
):
2872 """Decodes data encoded with the encode_data function."""
2873 return pickle
.loads(base64
.b64decode(data
))
2876 """Returns an input shard state for the remaining inputs.
2879 A json-izable version of the remaining InputReader.
2881 result
= super(_ReducerReader
, self
).to_json()
2882 result
["current_key"] = self
.encode_data(self
.current_key
)
2883 result
["current_values"] = self
.encode_data(self
.current_values
)
2887 def from_json(cls
, json
):
2888 """Creates an instance of the InputReader for the given input shard state.
2891 json: The InputReader state as a dict-like object.
2894 An instance of the InputReader configured using the values of json.
2896 result
= super(_ReducerReader
, cls
).from_json(json
)
2897 result
.current_key
= _ReducerReader
.decode_data(json
["current_key"])
2898 result
.current_values
= _ReducerReader
.decode_data(json
["current_values"])