3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Defines input readers for MapReduce."""
35 "AbstractDatastoreInputReader",
37 "BadReaderParamsError",
38 "BlobstoreLineInputReader",
39 "BlobstoreZipInputReader",
40 "BlobstoreZipLineInputReader",
41 "COUNTER_IO_READ_BYTES",
42 "COUNTER_IO_READ_MSEC",
43 "DatastoreEntityInputReader",
44 "DatastoreInputReader",
45 "DatastoreKeyInputReader",
47 "RandomStringInputReader",
48 "RawDatastoreInputReader",
52 "NamespaceInputReader",
69 from google
.net
.proto
import ProtocolBuffer
70 from google
.appengine
.ext
import ndb
72 from google
.appengine
.api
import datastore
73 from google
.appengine
.api
import files
74 from google
.appengine
.api
import logservice
75 from google
.appengine
.api
.files
import file_service_pb
76 from google
.appengine
.api
.logservice
import log_service_pb
77 from google
.appengine
.ext
import blobstore
78 from google
.appengine
.ext
import db
79 from google
.appengine
.ext
import key_range
80 from google
.appengine
.ext
.db
import metadata
81 from google
.appengine
.ext
.mapreduce
import context
82 from google
.appengine
.ext
.mapreduce
import datastore_range_iterators
as db_iters
83 from google
.appengine
.ext
.mapreduce
import errors
84 from google
.appengine
.ext
.mapreduce
import file_format_parser
85 from google
.appengine
.ext
.mapreduce
import file_format_root
86 from google
.appengine
.ext
.mapreduce
import json_util
87 from google
.appengine
.ext
.mapreduce
import key_ranges
88 from google
.appengine
.ext
.mapreduce
import model
89 from google
.appengine
.ext
.mapreduce
import namespace_range
90 from google
.appengine
.ext
.mapreduce
import operation
91 from google
.appengine
.ext
.mapreduce
import property_range
92 from google
.appengine
.ext
.mapreduce
import records
93 from google
.appengine
.ext
.mapreduce
import util
99 from google
.appengine
.ext
import cloudstorage
100 if hasattr(cloudstorage
, "_STUB"):
108 BadReaderParamsError
= errors
.BadReaderParamsError
112 COUNTER_IO_READ_BYTES
= "io-read-bytes"
115 COUNTER_IO_READ_MSEC
= "io-read-msec"
120 ALLOW_CHECKPOINT
= object()
123 class InputReader(json_util
.JsonMixin
):
124 """Abstract base class for input readers.
126 InputReaders have the following properties:
127 * They are created by using the split_input method to generate a set of
128 InputReaders from a MapperSpec.
129 * They generate inputs to the mapper via the iterator interface.
130 * After creation, they can be serialized and resumed using the JsonMixin
132 * They are cast to string for a user-readable description; it may be
133 valuable to implement __str__.
139 expand_parameters
= False
143 NAMESPACE_PARAM
= "namespace"
144 NAMESPACES_PARAM
= "namespaces"
150 """Returns the next input from this input reader as a key, value pair.
153 The next input from this input reader.
155 raise NotImplementedError("next() not implemented in %s" % self
.__class
__)
158 def from_json(cls
, input_shard_state
):
159 """Creates an instance of the InputReader for the given input shard state.
162 input_shard_state: The InputReader state as a dict-like object.
165 An instance of the InputReader configured using the values of json.
167 raise NotImplementedError("from_json() not implemented in %s" % cls
)
170 """Returns an input shard state for the remaining inputs.
173 A json-izable version of the remaining InputReader.
175 raise NotImplementedError("to_json() not implemented in %s" %
179 def split_input(cls
, mapper_spec
):
180 """Returns a list of input readers.
182 This method creates a list of input readers, each for one shard.
183 It attempts to split inputs among readers evenly.
186 mapper_spec: model.MapperSpec specifies the inputs and additional
187 parameters to define the behavior of input readers.
190 A list of InputReaders. None or [] when no input data can be found.
192 raise NotImplementedError("split_input() not implemented in %s" % cls
)
195 def validate(cls
, mapper_spec
):
196 """Validates mapper spec and all mapper parameters.
198 Input reader parameters are expected to be passed as "input_reader"
199 subdictionary in mapper_spec.params.
201 Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
202 to be compatible, input reader check mapper_spec.params as well and
203 issue a warning if "input_reader" subdicationary is not present.
206 mapper_spec: The MapperSpec for this InputReader.
209 BadReaderParamsError: required parameters are missing or invalid.
211 if mapper_spec
.input_reader_class() != cls
:
212 raise BadReaderParamsError("Input reader class mismatch")
215 def _get_params(mapper_spec
, allowed_keys
=None, allow_old
=True):
216 """Obtain input reader parameters.
218 Utility function for input readers implementation. Fetches parameters
219 from mapreduce specification giving appropriate usage warnings.
222 mapper_spec: The MapperSpec for the job
223 allowed_keys: set of all allowed keys in parameters as strings. If it is not
224 None, then parameters are expected to be in a separate "input_reader"
225 subdictionary of mapper_spec parameters.
226 allow_old: Allow parameters to exist outside of the input_reader
227 subdictionary for compatability.
230 mapper parameters as dict
233 BadReaderParamsError: if parameters are invalid/missing or not allowed.
235 if "input_reader" not in mapper_spec
.params
:
236 message
= ("Input reader's parameters should be specified in "
237 "input_reader subdictionary.")
238 if not allow_old
or allowed_keys
:
239 raise errors
.BadReaderParamsError(message
)
240 params
= mapper_spec
.params
241 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
243 if not isinstance(mapper_spec
.params
.get("input_reader"), dict):
244 raise errors
.BadReaderParamsError(
245 "Input reader parameters should be a dictionary")
246 params
= mapper_spec
.params
.get("input_reader")
247 params
= dict((str(n
), v
) for n
, v
in params
.iteritems())
249 params_diff
= set(params
.keys()) - allowed_keys
251 raise errors
.BadReaderParamsError(
252 "Invalid input_reader parameters: %s" % ",".join(params_diff
))
256 class FileInputReader(InputReader
):
257 """Reader to read Files API files of user specified format.
259 This class currently only supports Google Storage files. It will be extended
260 to support blobstore files in the future.
263 files: a list of filenames or filename patterns.
264 filename must be of format '/gs/bucket/filename'.
265 filename pattern has format '/gs/bucket/prefix*'.
266 filename pattern will be expanded to filenames with the given prefix.
267 Please see parseGlob in the file api.files.gs.py which is included in the
268 App Engine SDK for supported patterns.
271 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
272 includes "file1", all files under bucket2, and files under bucket3 with
273 a prefix "p" in its name.
275 format: format string determines what your map function gets as its input.
276 format string can be "lines", "bytes", "zip", or a cascade of them plus
277 optional parameters. See file_formats.FORMATS for all supported formats.
278 See file_format_parser._FileFormatParser for format string syntax.
281 "lines": your map function gets files' contents line by line.
282 "bytes": your map function gets files' contents entirely.
283 "zip": InputReader unzips files and feeds your map function each of
284 the archive's member files as a whole.
285 "zip[bytes]: same as above.
286 "zip[lines]": InputReader unzips files and feeds your map function
287 files' contents line by line.
288 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
289 file with utf32 encoding and feeds your map function line by line.
290 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
291 base64 encoding, unzips each file, reads each of them with utf32
292 encoding and feeds your map function line by line.
294 Note that "encoding" only teaches InputReader how to interpret files.
295 The input your map function gets is always a Python str.
299 FILES_PARAM
= "files"
300 FORMAT_PARAM
= "format"
302 def __init__(self
, format_root
):
303 """Initialize input reader.
306 format_root: a FileFormatRoot instance.
308 self
._file
_format
_root
= format_root
317 start_time
= time
.time()
319 content
= self
._file
_format
_root
.next().read()
322 operation
.counters
.Increment(
323 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
324 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
329 def split_input(cls
, mapper_spec
):
331 params
= _get_params(mapper_spec
)
335 for f
in params
[cls
.FILES_PARAM
]:
336 parsedName
= files
.gs
.parseGlob(f
)
337 if isinstance(parsedName
, tuple):
338 filenames
.extend(files
.gs
.listdir(parsedName
[0],
339 {"prefix": parsedName
[1]}))
341 filenames
.append(parsedName
)
343 file_format_roots
= file_format_root
.split(filenames
,
344 params
[cls
.FORMAT_PARAM
],
345 mapper_spec
.shard_count
)
347 if file_format_roots
is None:
349 return [cls(root
) for root
in file_format_roots
]
352 def validate(cls
, mapper_spec
):
354 if mapper_spec
.input_reader_class() != cls
:
355 raise BadReaderParamsError("Mapper input reader class mismatch")
358 params
= _get_params(mapper_spec
)
359 if cls
.FILES_PARAM
not in params
:
360 raise BadReaderParamsError("Must specify %s" % cls
.FILES_PARAM
)
361 if cls
.FORMAT_PARAM
not in params
:
362 raise BadReaderParamsError("Must specify %s" % cls
.FORMAT_PARAM
)
364 format_string
= params
[cls
.FORMAT_PARAM
]
365 if not isinstance(format_string
, basestring
):
366 raise BadReaderParamsError("format should be string but is %s" %
369 file_format_parser
.parse(format_string
)
370 except ValueError, e
:
371 raise BadReaderParamsError(e
)
373 paths
= params
[cls
.FILES_PARAM
]
374 if not (paths
and isinstance(paths
, list)):
375 raise BadReaderParamsError("files should be a list of filenames.")
380 files
.gs
.parseGlob(path
)
381 except files
.InvalidFileNameError
:
382 raise BadReaderParamsError("Invalid filename %s." % path
)
385 def from_json(cls
, json
):
388 file_format_root
.FileFormatRoot
.from_json(json
["file_format_root"]))
392 return {"file_format_root": self
._file
_format
_root
.to_json()}
395 class AbstractDatastoreInputReader(InputReader
):
396 """Abstract class for datastore input readers."""
402 _MAX_SHARD_COUNT
= 256
407 MAX_NAMESPACES_FOR_KEY_SHARD
= 10
410 ENTITY_KIND_PARAM
= "entity_kind"
411 KEYS_ONLY_PARAM
= "keys_only"
412 BATCH_SIZE_PARAM
= "batch_size"
413 KEY_RANGE_PARAM
= "key_range"
414 FILTERS_PARAM
= "filters"
416 _KEY_RANGE_ITER_CLS
= db_iters
.AbstractKeyRangeIterator
418 def __init__(self
, iterator
):
419 """Create new DatastoreInputReader object.
421 This is internal constructor. Use split_input to create readers instead.
424 iterator: an iterator that generates objects for this input reader.
426 self
._iter
= iterator
429 """Yields whatever internal iterator yields."""
434 """Returns the string representation of this InputReader."""
435 return repr(self
._iter
)
438 """Serializes input reader to json compatible format.
441 all the data in json-compatible map.
443 return self
._iter
.to_json()
446 def from_json(cls
, json
):
447 """Create new DatastoreInputReader from json, encoded by to_json.
450 json: json representation of DatastoreInputReader.
453 an instance of DatastoreInputReader with all data deserialized from json.
455 return cls(db_iters
.RangeIteratorFactory
.from_json(json
))
458 def _get_query_spec(cls
, mapper_spec
):
459 """Construct a model.QuerySpec from model.MapperSpec."""
460 params
= _get_params(mapper_spec
)
461 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
462 filters
= params
.get(cls
.FILTERS_PARAM
)
463 app
= params
.get(cls
._APP
_PARAM
)
464 ns
= params
.get(cls
.NAMESPACE_PARAM
)
466 return model
.QuerySpec(
467 entity_kind
=cls
._get
_raw
_entity
_kind
(entity_kind
),
468 keys_only
=bool(params
.get(cls
.KEYS_ONLY_PARAM
, False)),
470 batch_size
=int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
)),
471 model_class_path
=entity_kind
,
476 def split_input(cls
, mapper_spec
):
478 shard_count
= mapper_spec
.shard_count
479 query_spec
= cls
._get
_query
_spec
(mapper_spec
)
482 if query_spec
.ns
is not None:
483 k_ranges
= cls
._to
_key
_ranges
_by
_shard
(
484 query_spec
.app
, [query_spec
.ns
], shard_count
, query_spec
)
486 ns_keys
= namespace_range
.get_namespace_keys(
487 query_spec
.app
, cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
494 elif len(ns_keys
) <= cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
495 namespaces
= [ns_key
.name() or "" for ns_key
in ns_keys
]
496 k_ranges
= cls
._to
_key
_ranges
_by
_shard
(
497 query_spec
.app
, namespaces
, shard_count
, query_spec
)
500 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
502 can_query
=lambda: True,
504 k_ranges
= [key_ranges
.KeyRangesFactory
.create_from_ns_range(ns_range
)
505 for ns_range
in ns_ranges
]
507 iters
= [db_iters
.RangeIteratorFactory
.create_key_ranges_iterator(
508 r
, query_spec
, cls
._KEY
_RANGE
_ITER
_CLS
) for r
in k_ranges
]
510 return [cls(i
) for i
in iters
]
513 def _to_key_ranges_by_shard(cls
, app
, namespaces
, shard_count
, query_spec
):
514 """Get a list of key_ranges.KeyRanges objects, one for each shard.
516 This method uses scatter index to split each namespace into pieces
517 and assign those pieces to shards.
521 namespaces: a list of namespaces in str.
522 shard_count: number of shards to split.
523 query_spec: model.QuerySpec.
526 a list of key_ranges.KeyRanges objects.
528 key_ranges_by_ns
= []
531 for namespace
in namespaces
:
532 ranges
= cls
._split
_ns
_by
_scatter
(
535 query_spec
.entity_kind
,
539 random
.shuffle(ranges
)
540 key_ranges_by_ns
.append(ranges
)
545 ranges_by_shard
= [[] for _
in range(shard_count
)]
546 for ranges
in key_ranges_by_ns
:
547 for i
, k_range
in enumerate(ranges
):
549 ranges_by_shard
[i
].append(k_range
)
551 key_ranges_by_shard
= []
552 for ranges
in ranges_by_shard
:
554 key_ranges_by_shard
.append(key_ranges
.KeyRangesFactory
.create_from_list(
556 return key_ranges_by_shard
559 def _split_ns_by_scatter(cls
,
564 """Split a namespace by scatter index into key_range.KeyRange.
566 TODO: Power this with key_range.KeyRange.compute_split_points.
569 shard_count: number of shards.
570 namespace: namespace name to split. str.
571 raw_entity_kind: low level datastore API entity kind.
575 A list of key_range.KeyRange objects. If there are not enough entities to
576 splits into requested shards, the returned list will contain KeyRanges
577 ordered lexicographically with any Nones appearing at the end.
581 return [key_range
.KeyRange(namespace
=namespace
, _app
=app
)]
583 ds_query
= datastore
.Query(kind
=raw_entity_kind
,
587 ds_query
.Order("__scatter__")
588 oversampling_factor
= 32
589 random_keys
= ds_query
.Get(shard_count
* oversampling_factor
)
594 return ([key_range
.KeyRange(namespace
=namespace
, _app
=app
)] +
595 [None] * (shard_count
- 1))
599 if len(random_keys
) >= shard_count
:
601 random_keys
= cls
._choose
_split
_points
(random_keys
, shard_count
)
605 k_ranges
.append(key_range
.KeyRange(
607 key_end
=random_keys
[0],
608 direction
=key_range
.KeyRange
.ASC
,
614 for i
in range(0, len(random_keys
) - 1):
615 k_ranges
.append(key_range
.KeyRange(
616 key_start
=random_keys
[i
],
617 key_end
=random_keys
[i
+1],
618 direction
=key_range
.KeyRange
.ASC
,
624 k_ranges
.append(key_range
.KeyRange(
625 key_start
=random_keys
[-1],
627 direction
=key_range
.KeyRange
.ASC
,
633 if len(k_ranges
) < shard_count
:
635 k_ranges
+= [None] * (shard_count
- len(k_ranges
))
639 def _choose_split_points(cls
, sorted_keys
, shard_count
):
640 """Returns the best split points given a random set of datastore.Keys."""
641 assert len(sorted_keys
) >= shard_count
642 index_stride
= len(sorted_keys
) / float(shard_count
)
643 return [sorted_keys
[int(round(index_stride
* i
))]
644 for i
in range(1, shard_count
)]
647 def validate(cls
, mapper_spec
):
649 params
= _get_params(mapper_spec
)
650 if cls
.ENTITY_KIND_PARAM
not in params
:
651 raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
652 if cls
.BATCH_SIZE_PARAM
in params
:
654 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
656 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
657 except ValueError, e
:
658 raise BadReaderParamsError("Bad batch size: %s" % e
)
660 bool(params
.get(cls
.KEYS_ONLY_PARAM
, False))
662 raise BadReaderParamsError("keys_only expects a boolean value but got %s",
663 params
[cls
.KEYS_ONLY_PARAM
])
664 if cls
.NAMESPACE_PARAM
in params
:
665 if not isinstance(params
[cls
.NAMESPACE_PARAM
],
666 (str, unicode, type(None))):
667 raise BadReaderParamsError(
668 "Expected a single namespace string")
669 if cls
.NAMESPACES_PARAM
in params
:
670 raise BadReaderParamsError("Multiple namespaces are no longer supported")
671 if cls
.FILTERS_PARAM
in params
:
672 filters
= params
[cls
.FILTERS_PARAM
]
673 if not isinstance(filters
, list):
674 raise BadReaderParamsError("Expected list for filters parameter")
676 if not isinstance(f
, (tuple, list)):
677 raise BadReaderParamsError("Filter should be a tuple or list: %s", f
)
679 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f
)
681 if not isinstance(prop
, basestring
):
682 raise BadReaderParamsError("Property should be string: %s", prop
)
683 if not isinstance(op
, basestring
):
684 raise BadReaderParamsError("Operator should be string: %s", op
)
687 def _get_raw_entity_kind(cls
, entity_kind_or_model_classpath
):
688 """Returns the entity kind to use with low level datastore calls.
691 entity_kind_or_model_classpath: user specified entity kind or model
695 the entity kind in str to use with low level datastore calls.
697 return entity_kind_or_model_classpath
700 class RawDatastoreInputReader(AbstractDatastoreInputReader
):
701 """Iterates over an entity kind and yields datastore.Entity."""
703 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeEntityIterator
706 def validate(cls
, mapper_spec
):
708 super(RawDatastoreInputReader
, cls
).validate(mapper_spec
)
709 params
= _get_params(mapper_spec
)
710 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
711 if "." in entity_kind
:
713 ". detected in entity kind %s specified for reader %s."
714 "Assuming entity kind contains the dot.",
715 entity_kind
, cls
.__name
__)
716 if cls
.FILTERS_PARAM
in params
:
717 filters
= params
[cls
.FILTERS_PARAM
]
720 raise BadReaderParamsError(
721 "Only equality filters are supported: %s", f
)
724 class DatastoreInputReader(AbstractDatastoreInputReader
):
725 """Iterates over a Model and yields model instances.
727 Supports both db.model and ndb.model.
730 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeModelIterator
733 def _get_raw_entity_kind(cls
, model_classpath
):
734 entity_type
= util
.for_name(model_classpath
)
735 if isinstance(entity_type
, db
.Model
):
736 return entity_type
.kind()
737 elif isinstance(entity_type
, (ndb
.Model
, ndb
.MetaModel
)):
739 return entity_type
._get
_kind
()
741 return util
.get_short_name(model_classpath
)
744 def validate(cls
, mapper_spec
):
746 super(DatastoreInputReader
, cls
).validate(mapper_spec
)
747 params
= _get_params(mapper_spec
)
748 entity_kind
= params
[cls
.ENTITY_KIND_PARAM
]
751 model_class
= util
.for_name(entity_kind
)
752 except ImportError, e
:
753 raise BadReaderParamsError("Bad entity kind: %s" % e
)
754 if cls
.FILTERS_PARAM
in params
:
755 filters
= params
[cls
.FILTERS_PARAM
]
756 if issubclass(model_class
, db
.Model
):
757 cls
._validate
_filters
(filters
, model_class
)
759 cls
._validate
_filters
_ndb
(filters
, model_class
)
760 property_range
.PropertyRange(filters
, entity_kind
)
763 def _validate_filters(cls
, filters
, model_class
):
764 """Validate user supplied filters.
766 Validate filters are on existing properties and filter values
767 have valid semantics.
770 filters: user supplied filters. Each filter should be a list or tuple of
771 format (<property_name_as_str>, <query_operator_as_str>,
772 <value_of_certain_type>). Value type is up to the property's type.
773 model_class: the db.Model class for the entity type to apply filters on.
776 BadReaderParamsError: if any filter is invalid in any way.
781 properties
= model_class
.properties()
785 if prop
not in properties
:
786 raise errors
.BadReaderParamsError(
787 "Property %s is not defined for entity type %s",
788 prop
, model_class
.kind())
793 properties
[prop
].validate(val
)
794 except db
.BadValueError
, e
:
795 raise errors
.BadReaderParamsError(e
)
799 def _validate_filters_ndb(cls
, filters
, model_class
):
800 """Validate ndb.Model filters."""
804 properties
= model_class
._properties
808 if prop
not in properties
:
809 raise errors
.BadReaderParamsError(
810 "Property %s is not defined for entity type %s",
811 prop
, model_class
._get
_kind
())
816 properties
[prop
]._do
_validate
(val
)
817 except db
.BadValueError
, e
:
818 raise errors
.BadReaderParamsError(e
)
821 def split_input(cls
, mapper_spec
):
823 shard_count
= mapper_spec
.shard_count
824 query_spec
= cls
._get
_query
_spec
(mapper_spec
)
826 if not property_range
.should_shard_by_property_range(query_spec
.filters
):
827 return super(DatastoreInputReader
, cls
).split_input(mapper_spec
)
829 p_range
= property_range
.PropertyRange(query_spec
.filters
,
830 query_spec
.model_class_path
)
831 p_ranges
= p_range
.split(shard_count
)
835 ns_range
= namespace_range
.NamespaceRange(
836 namespace_start
=query_spec
.ns
,
837 namespace_end
=query_spec
.ns
,
839 ns_ranges
= [copy
.copy(ns_range
) for _
in p_ranges
]
841 ns_keys
= namespace_range
.get_namespace_keys(
842 query_spec
.app
, cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
847 if len(ns_keys
) <= cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
848 ns_ranges
= [namespace_range
.NamespaceRange(_app
=query_spec
.app
)
852 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
854 can_query
=lambda: True,
856 p_ranges
= [copy
.copy(p_range
) for _
in ns_ranges
]
858 assert len(p_ranges
) == len(ns_ranges
)
861 db_iters
.RangeIteratorFactory
.create_property_range_iterator(
862 p
, ns
, query_spec
) for p
, ns
in zip(p_ranges
, ns_ranges
)]
863 return [cls(i
) for i
in iters
]
866 class DatastoreKeyInputReader(RawDatastoreInputReader
):
867 """Iterate over an entity kind and yields datastore.Key."""
869 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeKeyIterator
873 DatastoreEntityInputReader
= RawDatastoreInputReader
878 class _OldAbstractDatastoreInputReader(InputReader
):
879 """Abstract base class for classes that iterate over datastore entities.
881 Concrete subclasses must implement _iter_key_range(self, k_range). See the
882 docstring for that method for details.
889 _MAX_SHARD_COUNT
= 256
892 _OVERSAMPLING_FACTOR
= 32
897 MAX_NAMESPACES_FOR_KEY_SHARD
= 10
900 ENTITY_KIND_PARAM
= "entity_kind"
901 KEYS_ONLY_PARAM
= "keys_only"
902 BATCH_SIZE_PARAM
= "batch_size"
903 KEY_RANGE_PARAM
= "key_range"
904 NAMESPACE_RANGE_PARAM
= "namespace_range"
905 CURRENT_KEY_RANGE_PARAM
= "current_key_range"
906 FILTERS_PARAM
= "filters"
916 batch_size
=_BATCH_SIZE
,
917 current_key_range
=None,
919 """Create new AbstractDatastoreInputReader object.
921 This is internal constructor. Use split_query in a concrete class instead.
924 entity_kind: entity kind as string.
925 key_ranges: a sequence of key_range.KeyRange instances to process. Only
926 one of key_ranges or ns_range can be non-None.
927 ns_range: a namespace_range.NamespaceRange to process. Only one of
928 key_ranges or ns_range can be non-None.
929 batch_size: size of read batch as int.
930 current_key_range: the current key_range.KeyRange being processed.
931 filters: optional list of filters to apply to the query. Each filter is
932 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
933 User filters are applied first.
935 assert key_ranges
is not None or ns_range
is not None, (
936 "must specify one of 'key_ranges' or 'ns_range'")
937 assert key_ranges
is None or ns_range
is None, (
938 "can't specify both 'key_ranges ' and 'ns_range'")
940 self
._entity
_kind
= entity_kind
943 self
._key
_ranges
= key_ranges
and list(reversed(key_ranges
))
945 self
._ns
_range
= ns_range
946 self
._batch
_size
= int(batch_size
)
947 self
._current
_key
_range
= current_key_range
948 self
._filters
= filters
951 def _get_raw_entity_kind(cls
, entity_kind
):
952 if "." in entity_kind
:
954 ". detected in entity kind %s specified for reader %s."
955 "Assuming entity kind contains the dot.",
956 entity_kind
, cls
.__name
__)
960 """Iterates over the given KeyRanges or NamespaceRange.
962 This method iterates over the given KeyRanges or NamespaceRange and sets
963 the self._current_key_range to the KeyRange currently being processed. It
964 then delegates to the _iter_key_range method to yield that actual
968 Forwards the objects yielded by the subclasses concrete _iter_key_range()
969 method. The caller must consume the result yielded because self.to_json()
972 if self
._key
_ranges
is not None:
973 for o
in self
._iter
_key
_ranges
():
975 elif self
._ns
_range
is not None:
976 for o
in self
._iter
_ns
_range
():
979 assert False, "self._key_ranges and self._ns_range are both None"
981 def _iter_key_ranges(self
):
982 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
984 if self
._current
_key
_range
is None:
986 self
._current
_key
_range
= self
._key
_ranges
.pop()
993 for key
, o
in self
._iter
_key
_range
(
994 copy
.deepcopy(self
._current
_key
_range
)):
997 self
._current
_key
_range
.advance(key
)
999 self
._current
_key
_range
= None
1001 def _iter_ns_range(self
):
1002 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
1004 if self
._current
_key
_range
is None:
1005 query
= self
._ns
_range
.make_datastore_query()
1006 namespace_result
= query
.Get(1)
1007 if not namespace_result
:
1010 namespace
= namespace_result
[0].name() or ""
1011 self
._current
_key
_range
= key_range
.KeyRange(
1012 namespace
=namespace
, _app
=self
._ns
_range
.app
)
1013 yield ALLOW_CHECKPOINT
1015 for key
, o
in self
._iter
_key
_range
(
1016 copy
.deepcopy(self
._current
_key
_range
)):
1019 self
._current
_key
_range
.advance(key
)
1022 if (self
._ns
_range
.is_single_namespace
or
1023 self
._current
_key
_range
.namespace
== self
._ns
_range
.namespace_end
):
1025 self
._ns
_range
= self
._ns
_range
.with_start_after(
1026 self
._current
_key
_range
.namespace
)
1027 self
._current
_key
_range
= None
1029 def _iter_key_range(self
, k_range
):
1030 """Yields a db.Key and the value that should be yielded by self.__iter__().
1033 k_range: The key_range.KeyRange to iterate over.
1036 A 2-tuple containing the last db.Key processed and the value that should
1037 be yielded by __iter__. The returned db.Key will be used to determine the
1038 InputReader's current position in self._current_key_range.
1040 raise NotImplementedError("_iter_key_range() not implemented in %s" %
1044 """Returns the string representation of this InputReader."""
1045 if self
._ns
_range
is None:
1046 return repr(self
._key
_ranges
)
1048 return repr(self
._ns
_range
)
1051 def _choose_split_points(cls
, sorted_keys
, shard_count
):
1052 """Returns the best split points given a random set of db.Keys."""
1053 assert len(sorted_keys
) >= shard_count
1054 index_stride
= len(sorted_keys
) / float(shard_count
)
1055 return [sorted_keys
[int(round(index_stride
* i
))]
1056 for i
in range(1, shard_count
)]
1061 def _split_input_from_namespace(cls
, app
, namespace
, entity_kind
,
1063 """Helper for _split_input_from_params.
1065 If there are not enough Entities to make all of the given shards, the
1066 returned list of KeyRanges will include Nones. The returned list will
1067 contain KeyRanges ordered lexographically with any Nones appearing at the
1072 namespace: the namespace.
1073 entity_kind: entity kind as string.
1074 shard_count: the number of shards.
1080 raw_entity_kind
= cls
._get
_raw
_entity
_kind
(entity_kind
)
1081 if shard_count
== 1:
1083 return [key_range
.KeyRange(namespace
=namespace
, _app
=app
)]
1085 ds_query
= datastore
.Query(kind
=raw_entity_kind
,
1086 namespace
=namespace
,
1089 ds_query
.Order("__scatter__")
1090 random_keys
= ds_query
.Get(shard_count
* cls
._OVERSAMPLING
_FACTOR
)
1095 return ([key_range
.KeyRange(namespace
=namespace
, _app
=app
)] +
1096 [None] * (shard_count
- 1))
1100 if len(random_keys
) >= shard_count
:
1102 random_keys
= cls
._choose
_split
_points
(random_keys
, shard_count
)
1107 key_ranges
.append(key_range
.KeyRange(
1109 key_end
=random_keys
[0],
1110 direction
=key_range
.KeyRange
.ASC
,
1111 include_start
=False,
1113 namespace
=namespace
,
1116 for i
in range(0, len(random_keys
) - 1):
1117 key_ranges
.append(key_range
.KeyRange(
1118 key_start
=random_keys
[i
],
1119 key_end
=random_keys
[i
+1],
1120 direction
=key_range
.KeyRange
.ASC
,
1123 namespace
=namespace
,
1126 key_ranges
.append(key_range
.KeyRange(
1127 key_start
=random_keys
[-1],
1129 direction
=key_range
.KeyRange
.ASC
,
1132 namespace
=namespace
,
1135 if len(key_ranges
) < shard_count
:
1137 key_ranges
+= [None] * (shard_count
- len(key_ranges
))
1142 def _split_input_from_params(cls
, app
, namespaces
, entity_kind_name
,
1143 params
, shard_count
):
1144 """Return input reader objects. Helper for split_input."""
1147 for namespace
in namespaces
:
1149 cls
._split
_input
_from
_namespace
(app
,
1157 shared_ranges
= [[] for _
in range(shard_count
)]
1158 for i
, k_range
in enumerate(key_ranges
):
1159 shared_ranges
[i
% shard_count
].append(k_range
)
1160 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
1162 return [cls(entity_kind_name
,
1163 key_ranges
=key_ranges
,
1165 batch_size
=batch_size
)
1166 for key_ranges
in shared_ranges
if key_ranges
]
1169 def validate(cls
, mapper_spec
):
1170 """Validates mapper spec and all mapper parameters.
1173 mapper_spec: The MapperSpec for this InputReader.
1176 BadReaderParamsError: required parameters are missing or invalid.
1178 if mapper_spec
.input_reader_class() != cls
:
1179 raise BadReaderParamsError("Input reader class mismatch")
1180 params
= _get_params(mapper_spec
)
1181 if cls
.ENTITY_KIND_PARAM
not in params
:
1182 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
1183 if cls
.BATCH_SIZE_PARAM
in params
:
1185 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
1187 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
1188 except ValueError, e
:
1189 raise BadReaderParamsError("Bad batch size: %s" % e
)
1190 if cls
.NAMESPACE_PARAM
in params
:
1191 if not isinstance(params
[cls
.NAMESPACE_PARAM
],
1192 (str, unicode, type(None))):
1193 raise BadReaderParamsError(
1194 "Expected a single namespace string")
1195 if cls
.NAMESPACES_PARAM
in params
:
1196 raise BadReaderParamsError("Multiple namespaces are no longer supported")
1197 if cls
.FILTERS_PARAM
in params
:
1198 filters
= params
[cls
.FILTERS_PARAM
]
1199 if not isinstance(filters
, list):
1200 raise BadReaderParamsError("Expected list for filters parameter")
1202 if not isinstance(f
, (tuple, list)):
1203 raise BadReaderParamsError("Filter should be a tuple or list: %s", f
)
1205 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f
)
1206 if not isinstance(f
[0], basestring
):
1207 raise BadReaderParamsError("First element should be string: %s", f
)
1209 raise BadReaderParamsError(
1210 "Only equality filters are supported: %s", f
)
1213 def split_input(cls
, mapper_spec
):
1214 """Splits query into shards without fetching query results.
1216 Tries as best as it can to split the whole query result set into equal
1217 shards. Due to difficulty of making the perfect split, resulting shards'
1218 sizes might differ significantly from each other.
1221 mapper_spec: MapperSpec with params containing 'entity_kind'.
1222 May have 'namespace' in the params as a string containing a single
1223 namespace. If specified then the input reader will only yield values
1224 in the given namespace. If 'namespace' is not given then values from
1225 all namespaces will be yielded. May also have 'batch_size' in the params
1226 to specify the number of entities to process in each batch.
1229 A list of InputReader objects. If the query results are empty then the
1230 empty list will be returned. Otherwise, the list will always have a length
1231 equal to number_of_shards but may be padded with Nones if there are too
1232 few results for effective sharding.
1234 params
= _get_params(mapper_spec
)
1235 entity_kind_name
= params
[cls
.ENTITY_KIND_PARAM
]
1236 batch_size
= int(params
.get(cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
1237 shard_count
= mapper_spec
.shard_count
1238 namespace
= params
.get(cls
.NAMESPACE_PARAM
)
1239 app
= params
.get(cls
._APP
_PARAM
)
1240 filters
= params
.get(cls
.FILTERS_PARAM
)
1242 if namespace
is None:
1254 namespace_query
= datastore
.Query("__namespace__",
1257 namespace_keys
= namespace_query
.Get(
1258 limit
=cls
.MAX_NAMESPACES_FOR_KEY_SHARD
+1)
1260 if len(namespace_keys
) > cls
.MAX_NAMESPACES_FOR_KEY_SHARD
:
1261 ns_ranges
= namespace_range
.NamespaceRange
.split(n
=shard_count
,
1264 return [cls(entity_kind_name
,
1267 batch_size
=batch_size
,
1269 for ns_range
in ns_ranges
]
1270 elif not namespace_keys
:
1271 return [cls(entity_kind_name
,
1273 ns_range
=namespace_range
.NamespaceRange(_app
=app
),
1274 batch_size
=shard_count
,
1277 namespaces
= [namespace_key
.name() or ""
1278 for namespace_key
in namespace_keys
]
1280 namespaces
= [namespace
]
1282 readers
= cls
._split
_input
_from
_params
(
1283 app
, namespaces
, entity_kind_name
, params
, shard_count
)
1285 for reader
in readers
:
1286 reader
._filters
= filters
1290 """Serializes all the data in this query range into json form.
1293 all the data in json-compatible map.
1295 if self
._key
_ranges
is None:
1296 key_ranges_json
= None
1298 key_ranges_json
= []
1299 for k
in self
._key
_ranges
:
1301 key_ranges_json
.append(k
.to_json())
1303 key_ranges_json
.append(None)
1305 if self
._ns
_range
is None:
1306 namespace_range_json
= None
1308 namespace_range_json
= self
._ns
_range
.to_json_object()
1310 if self
._current
_key
_range
is None:
1311 current_key_range_json
= None
1313 current_key_range_json
= self
._current
_key
_range
.to_json()
1315 json_dict
= {self
.KEY_RANGE_PARAM
: key_ranges_json
,
1316 self
.NAMESPACE_RANGE_PARAM
: namespace_range_json
,
1317 self
.CURRENT_KEY_RANGE_PARAM
: current_key_range_json
,
1318 self
.ENTITY_KIND_PARAM
: self
._entity
_kind
,
1319 self
.BATCH_SIZE_PARAM
: self
._batch
_size
,
1320 self
.FILTERS_PARAM
: self
._filters
}
1324 def from_json(cls
, json
):
1325 """Create new DatastoreInputReader from the json, encoded by to_json.
1328 json: json map representation of DatastoreInputReader.
1331 an instance of DatastoreInputReader with all data deserialized from json.
1333 if json
[cls
.KEY_RANGE_PARAM
] is None:
1338 for k
in json
[cls
.KEY_RANGE_PARAM
]:
1340 key_ranges
.append(key_range
.KeyRange
.from_json(k
))
1342 key_ranges
.append(None)
1344 if json
[cls
.NAMESPACE_RANGE_PARAM
] is None:
1347 ns_range
= namespace_range
.NamespaceRange
.from_json_object(
1348 json
[cls
.NAMESPACE_RANGE_PARAM
])
1350 if json
[cls
.CURRENT_KEY_RANGE_PARAM
] is None:
1351 current_key_range
= None
1353 current_key_range
= key_range
.KeyRange
.from_json(
1354 json
[cls
.CURRENT_KEY_RANGE_PARAM
])
1357 json
[cls
.ENTITY_KIND_PARAM
],
1360 json
[cls
.BATCH_SIZE_PARAM
],
1362 filters
=json
.get(cls
.FILTERS_PARAM
))
1365 class BlobstoreLineInputReader(InputReader
):
1366 """Input reader for a newline delimited blob in Blobstore."""
1369 _BLOB_BUFFER_SIZE
= 64000
1372 _MAX_SHARD_COUNT
= 256
1375 _MAX_BLOB_KEYS_COUNT
= 246
1378 BLOB_KEYS_PARAM
= "blob_keys"
1381 INITIAL_POSITION_PARAM
= "initial_position"
1382 END_POSITION_PARAM
= "end_position"
1383 BLOB_KEY_PARAM
= "blob_key"
1385 def __init__(self
, blob_key
, start_position
, end_position
):
1386 """Initializes this instance with the given blob key and character range.
1388 This BlobstoreInputReader will read from the first record starting after
1389 strictly after start_position until the first record ending at or after
1390 end_position (exclusive). As an exception, if start_position is 0, then
1391 this InputReader starts reading at the first record.
1394 blob_key: the BlobKey that this input reader is processing.
1395 start_position: the position to start reading at.
1396 end_position: a position in the last record to read.
1398 self
._blob
_key
= blob_key
1399 self
._blob
_reader
= blobstore
.BlobReader(blob_key
,
1400 self
._BLOB
_BUFFER
_SIZE
,
1402 self
._end
_position
= end_position
1403 self
._has
_iterated
= False
1404 self
._read
_before
_start
= bool(start_position
)
1407 """Returns the next input from as an (offset, line) tuple."""
1408 self
._has
_iterated
= True
1410 if self
._read
_before
_start
:
1411 self
._blob
_reader
.readline()
1412 self
._read
_before
_start
= False
1413 start_position
= self
._blob
_reader
.tell()
1415 if start_position
> self
._end
_position
:
1416 raise StopIteration()
1418 line
= self
._blob
_reader
.readline()
1421 raise StopIteration()
1423 return start_position
, line
.rstrip("\n")
1426 """Returns an json-compatible input shard spec for remaining inputs."""
1427 new_pos
= self
._blob
_reader
.tell()
1428 if self
._has
_iterated
:
1430 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1431 self
.INITIAL_POSITION_PARAM
: new_pos
,
1432 self
.END_POSITION_PARAM
: self
._end
_position
}
1435 """Returns the string representation of this BlobstoreLineInputReader."""
1436 return "blobstore.BlobKey(%r):[%d, %d]" % (
1437 self
._blob
_key
, self
._blob
_reader
.tell(), self
._end
_position
)
1440 def from_json(cls
, json
):
1441 """Instantiates an instance of this InputReader for the given shard spec."""
1442 return cls(json
[cls
.BLOB_KEY_PARAM
],
1443 json
[cls
.INITIAL_POSITION_PARAM
],
1444 json
[cls
.END_POSITION_PARAM
])
1447 def validate(cls
, mapper_spec
):
1448 """Validates mapper spec and all mapper parameters.
1451 mapper_spec: The MapperSpec for this InputReader.
1454 BadReaderParamsError: required parameters are missing or invalid.
1456 if mapper_spec
.input_reader_class() != cls
:
1457 raise BadReaderParamsError("Mapper input reader class mismatch")
1458 params
= _get_params(mapper_spec
)
1459 if cls
.BLOB_KEYS_PARAM
not in params
:
1460 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1461 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1462 if isinstance(blob_keys
, basestring
):
1465 blob_keys
= blob_keys
.split(",")
1466 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1467 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1469 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1470 for blob_key
in blob_keys
:
1471 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1473 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1477 def split_input(cls
, mapper_spec
):
1478 """Returns a list of shard_count input_spec_shards for input_spec.
1481 mapper_spec: The mapper specification to split from. Must contain
1482 'blob_keys' parameter with one or more blob keys.
1485 A list of BlobstoreInputReaders corresponding to the specified shards.
1487 params
= _get_params(mapper_spec
)
1488 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1489 if isinstance(blob_keys
, basestring
):
1492 blob_keys
= blob_keys
.split(",")
1495 for blob_key
in blob_keys
:
1496 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1497 blob_sizes
[blob_key
] = blob_info
.size
1499 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1500 shards_per_blob
= shard_count
// len(blob_keys
)
1501 if shards_per_blob
== 0:
1505 for blob_key
, blob_size
in blob_sizes
.items():
1506 blob_chunk_size
= blob_size
// shards_per_blob
1507 for i
in xrange(shards_per_blob
- 1):
1508 chunks
.append(BlobstoreLineInputReader
.from_json(
1509 {cls
.BLOB_KEY_PARAM
: blob_key
,
1510 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* i
,
1511 cls
.END_POSITION_PARAM
: blob_chunk_size
* (i
+ 1)}))
1512 chunks
.append(BlobstoreLineInputReader
.from_json(
1513 {cls
.BLOB_KEY_PARAM
: blob_key
,
1514 cls
.INITIAL_POSITION_PARAM
: blob_chunk_size
* (shards_per_blob
- 1),
1515 cls
.END_POSITION_PARAM
: blob_size
}))
1519 class BlobstoreZipInputReader(InputReader
):
1520 """Input reader for files from a zip archive stored in the Blobstore.
1522 Each instance of the reader will read the TOC, from the end of the zip file,
1523 and then only the contained files which it is responsible for.
1527 _MAX_SHARD_COUNT
= 256
1530 BLOB_KEY_PARAM
= "blob_key"
1531 START_INDEX_PARAM
= "start_index"
1532 END_INDEX_PARAM
= "end_index"
1534 def __init__(self
, blob_key
, start_index
, end_index
,
1535 _reader
=blobstore
.BlobReader
):
1536 """Initializes this instance with the given blob key and file range.
1538 This BlobstoreZipInputReader will read from the file with index start_index
1539 up to but not including the file with index end_index.
1542 blob_key: the BlobKey that this input reader is processing.
1543 start_index: the index of the first file to read.
1544 end_index: the index of the first file that will not be read.
1545 _reader: a callable that returns a file-like object for reading blobs.
1546 Used for dependency injection.
1548 self
._blob
_key
= blob_key
1549 self
._start
_index
= start_index
1550 self
._end
_index
= end_index
1551 self
._reader
= _reader
1553 self
._entries
= None
1556 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1559 The next input from this input reader, in the form of a 2-tuple.
1560 The first element of the tuple is a zipfile.ZipInfo object.
1561 The second element of the tuple is a zero-argument function that, when
1562 called, returns the complete body of the file.
1565 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1567 self
._entries
= self
._zip
.infolist()[self
._start
_index
:self
._end
_index
]
1568 self
._entries
.reverse()
1569 if not self
._entries
:
1570 raise StopIteration()
1571 entry
= self
._entries
.pop()
1572 self
._start
_index
+= 1
1573 return (entry
, lambda: self
._read
(entry
))
1575 def _read(self
, entry
):
1576 """Read entry content.
1579 entry: zip file entry as zipfile.ZipInfo.
1581 Entry content as string.
1583 start_time
= time
.time()
1584 content
= self
._zip
.read(entry
.filename
)
1588 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1589 operation
.counters
.Increment(
1590 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1595 def from_json(cls
, json
):
1596 """Creates an instance of the InputReader for the given input shard state.
1599 json: The InputReader state as a dict-like object.
1602 An instance of the InputReader configured using the values of json.
1604 return cls(json
[cls
.BLOB_KEY_PARAM
],
1605 json
[cls
.START_INDEX_PARAM
],
1606 json
[cls
.END_INDEX_PARAM
])
1609 """Returns an input shard state for the remaining inputs.
1612 A json-izable version of the remaining InputReader.
1614 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1615 self
.START_INDEX_PARAM
: self
._start
_index
,
1616 self
.END_INDEX_PARAM
: self
._end
_index
}
1619 """Returns the string representation of this BlobstoreZipInputReader."""
1620 return "blobstore.BlobKey(%r):[%d, %d]" % (
1621 self
._blob
_key
, self
._start
_index
, self
._end
_index
)
1624 def validate(cls
, mapper_spec
):
1625 """Validates mapper spec and all mapper parameters.
1628 mapper_spec: The MapperSpec for this InputReader.
1631 BadReaderParamsError: required parameters are missing or invalid.
1633 if mapper_spec
.input_reader_class() != cls
:
1634 raise BadReaderParamsError("Mapper input reader class mismatch")
1635 params
= _get_params(mapper_spec
)
1636 if cls
.BLOB_KEY_PARAM
not in params
:
1637 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1638 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1639 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1641 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1645 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1646 """Returns a list of input shard states for the input spec.
1649 mapper_spec: The MapperSpec for this InputReader. Must contain
1650 'blob_key' parameter with one blob key.
1651 _reader: a callable that returns a file-like object for reading blobs.
1652 Used for dependency injection.
1655 A list of InputReaders spanning files within the zip.
1657 params
= _get_params(mapper_spec
)
1658 blob_key
= params
[cls
.BLOB_KEY_PARAM
]
1659 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1660 zfiles
= zip_input
.infolist()
1661 total_size
= sum(x
.file_size
for x
in zfiles
)
1662 num_shards
= min(mapper_spec
.shard_count
, cls
._MAX
_SHARD
_COUNT
)
1663 size_per_shard
= total_size
// num_shards
1667 shard_start_indexes
= [0]
1668 current_shard_size
= 0
1669 for i
, fileinfo
in enumerate(zfiles
):
1670 current_shard_size
+= fileinfo
.file_size
1671 if current_shard_size
>= size_per_shard
:
1672 shard_start_indexes
.append(i
+ 1)
1673 current_shard_size
= 0
1675 if shard_start_indexes
[-1] != len(zfiles
):
1676 shard_start_indexes
.append(len(zfiles
))
1678 return [cls(blob_key
, start_index
, end_index
, _reader
)
1679 for start_index
, end_index
1680 in zip(shard_start_indexes
, shard_start_indexes
[1:])]
1683 class BlobstoreZipLineInputReader(InputReader
):
1684 """Input reader for newline delimited files in zip archives from Blobstore.
1686 This has the same external interface as the BlobstoreLineInputReader, in that
1687 it takes a list of blobs as its input and yields lines to the reader.
1688 However the blobs themselves are expected to be zip archives of line delimited
1689 files instead of the files themselves.
1691 This is useful as many line delimited files gain greatly from compression.
1695 _MAX_SHARD_COUNT
= 256
1698 _MAX_BLOB_KEYS_COUNT
= 246
1701 BLOB_KEYS_PARAM
= "blob_keys"
1704 BLOB_KEY_PARAM
= "blob_key"
1705 START_FILE_INDEX_PARAM
= "start_file_index"
1706 END_FILE_INDEX_PARAM
= "end_file_index"
1707 OFFSET_PARAM
= "offset"
1709 def __init__(self
, blob_key
, start_file_index
, end_file_index
, offset
,
1710 _reader
=blobstore
.BlobReader
):
1711 """Initializes this instance with the given blob key and file range.
1713 This BlobstoreZipLineInputReader will read from the file with index
1714 start_file_index up to but not including the file with index end_file_index.
1715 It will return lines starting at offset within file[start_file_index]
1718 blob_key: the BlobKey that this input reader is processing.
1719 start_file_index: the index of the first file to read within the zip.
1720 end_file_index: the index of the first file that will not be read.
1721 offset: the byte offset within blob_key.zip[start_file_index] to start
1722 reading. The reader will continue to the end of the file.
1723 _reader: a callable that returns a file-like object for reading blobs.
1724 Used for dependency injection.
1726 self
._blob
_key
= blob_key
1727 self
._start
_file
_index
= start_file_index
1728 self
._end
_file
_index
= end_file_index
1729 self
._initial
_offset
= offset
1730 self
._reader
= _reader
1732 self
._entries
= None
1733 self
._filestream
= None
1736 def validate(cls
, mapper_spec
):
1737 """Validates mapper spec and all mapper parameters.
1740 mapper_spec: The MapperSpec for this InputReader.
1743 BadReaderParamsError: required parameters are missing or invalid.
1745 if mapper_spec
.input_reader_class() != cls
:
1746 raise BadReaderParamsError("Mapper input reader class mismatch")
1747 params
= _get_params(mapper_spec
)
1748 if cls
.BLOB_KEYS_PARAM
not in params
:
1749 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1751 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1752 if isinstance(blob_keys
, basestring
):
1755 blob_keys
= blob_keys
.split(",")
1756 if len(blob_keys
) > cls
._MAX
_BLOB
_KEYS
_COUNT
:
1757 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1759 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1760 for blob_key
in blob_keys
:
1761 blob_info
= blobstore
.BlobInfo
.get(blobstore
.BlobKey(blob_key
))
1763 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1767 def split_input(cls
, mapper_spec
, _reader
=blobstore
.BlobReader
):
1768 """Returns a list of input readers for the input spec.
1771 mapper_spec: The MapperSpec for this InputReader. Must contain
1772 'blob_keys' parameter with one or more blob keys.
1773 _reader: a callable that returns a file-like object for reading blobs.
1774 Used for dependency injection.
1777 A list of InputReaders spanning the subfiles within the blobs.
1778 There will be at least one reader per blob, but it will otherwise
1779 attempt to keep the expanded size even.
1781 params
= _get_params(mapper_spec
)
1782 blob_keys
= params
[cls
.BLOB_KEYS_PARAM
]
1783 if isinstance(blob_keys
, basestring
):
1786 blob_keys
= blob_keys
.split(",")
1790 for blob_key
in blob_keys
:
1791 zip_input
= zipfile
.ZipFile(_reader(blob_key
))
1792 blob_files
[blob_key
] = zip_input
.infolist()
1793 total_size
+= sum(x
.file_size
for x
in blob_files
[blob_key
])
1795 shard_count
= min(cls
._MAX
_SHARD
_COUNT
, mapper_spec
.shard_count
)
1801 size_per_shard
= total_size
// shard_count
1804 for blob_key
in blob_keys
:
1805 bfiles
= blob_files
[blob_key
]
1806 current_shard_size
= 0
1807 start_file_index
= 0
1809 for fileinfo
in bfiles
:
1810 next_file_index
+= 1
1811 current_shard_size
+= fileinfo
.file_size
1812 if current_shard_size
>= size_per_shard
:
1813 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1815 current_shard_size
= 0
1816 start_file_index
= next_file_index
1817 if current_shard_size
!= 0:
1818 readers
.append(cls(blob_key
, start_file_index
, next_file_index
, 0,
1824 """Returns the next line from this input reader as (lineinfo, line) tuple.
1827 The next input from this input reader, in the form of a 2-tuple.
1828 The first element of the tuple describes the source, it is itself
1829 a tuple (blobkey, filenumber, byteoffset).
1830 The second element of the tuple is the line found at that offset.
1832 if not self
._filestream
:
1834 self
._zip
= zipfile
.ZipFile(self
._reader
(self
._blob
_key
))
1836 self
._entries
= self
._zip
.infolist()[self
._start
_file
_index
:
1837 self
._end
_file
_index
]
1838 self
._entries
.reverse()
1839 if not self
._entries
:
1840 raise StopIteration()
1841 entry
= self
._entries
.pop()
1842 value
= self
._zip
.read(entry
.filename
)
1843 self
._filestream
= StringIO
.StringIO(value
)
1844 if self
._initial
_offset
:
1845 self
._filestream
.seek(self
._initial
_offset
)
1846 self
._filestream
.readline()
1848 start_position
= self
._filestream
.tell()
1849 line
= self
._filestream
.readline()
1853 self
._filestream
.close()
1854 self
._filestream
= None
1855 self
._start
_file
_index
+= 1
1856 self
._initial
_offset
= 0
1859 return ((self
._blob
_key
, self
._start
_file
_index
, start_position
),
1862 def _next_offset(self
):
1863 """Return the offset of the next line to read."""
1864 if self
._filestream
:
1865 offset
= self
._filestream
.tell()
1869 offset
= self
._initial
_offset
1874 """Returns an input shard state for the remaining inputs.
1877 A json-izable version of the remaining InputReader.
1880 return {self
.BLOB_KEY_PARAM
: self
._blob
_key
,
1881 self
.START_FILE_INDEX_PARAM
: self
._start
_file
_index
,
1882 self
.END_FILE_INDEX_PARAM
: self
._end
_file
_index
,
1883 self
.OFFSET_PARAM
: self
._next
_offset
()}
1886 def from_json(cls
, json
, _reader
=blobstore
.BlobReader
):
1887 """Creates an instance of the InputReader for the given input shard state.
1890 json: The InputReader state as a dict-like object.
1891 _reader: For dependency injection.
1894 An instance of the InputReader configured using the values of json.
1896 return cls(json
[cls
.BLOB_KEY_PARAM
],
1897 json
[cls
.START_FILE_INDEX_PARAM
],
1898 json
[cls
.END_FILE_INDEX_PARAM
],
1899 json
[cls
.OFFSET_PARAM
],
1903 """Returns the string representation of this reader.
1906 string blobkey:[start file num, end file num]:current offset.
1908 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1909 self
._blob
_key
, self
._start
_file
_index
, self
._end
_file
_index
,
1910 self
._next
_offset
())
1913 class RandomStringInputReader(InputReader
):
1914 """RandomStringInputReader generates random strings as output.
1916 Primary usage is to populate output with testing entries.
1922 STRING_LENGTH
= "string_length"
1924 DEFAULT_STRING_LENGTH
= 10
1926 def __init__(self
, count
, string_length
):
1927 """Initialize input reader.
1930 count: number of entries this shard should generate.
1931 string_length: the length of generated random strings.
1934 self
._string
_length
= string_length
1941 start_time
= time
.time()
1942 content
= "".join(random
.choice(string
.ascii_lowercase
)
1943 for _
in range(self
._string
_length
))
1945 operation
.counters
.Increment(
1946 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
1947 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
1951 def split_input(cls
, mapper_spec
):
1952 params
= _get_params(mapper_spec
)
1953 count
= params
[cls
.COUNT
]
1954 string_length
= cls
.DEFAULT_STRING_LENGTH
1955 if cls
.STRING_LENGTH
in params
:
1956 string_length
= params
[cls
.STRING_LENGTH
]
1958 shard_count
= mapper_spec
.shard_count
1959 count_per_shard
= count
// shard_count
1961 mr_input_readers
= [
1962 cls(count_per_shard
, string_length
) for _
in range(shard_count
)]
1964 left
= count
- count_per_shard
*shard_count
1966 mr_input_readers
.append(cls(left
, string_length
))
1968 return mr_input_readers
1971 def validate(cls
, mapper_spec
):
1972 if mapper_spec
.input_reader_class() != cls
:
1973 raise BadReaderParamsError("Mapper input reader class mismatch")
1975 params
= _get_params(mapper_spec
)
1976 if cls
.COUNT
not in params
:
1977 raise BadReaderParamsError("Must specify %s" % cls
.COUNT
)
1978 if not isinstance(params
[cls
.COUNT
], int):
1979 raise BadReaderParamsError("%s should be an int but is %s" %
1980 (cls
.COUNT
, type(params
[cls
.COUNT
])))
1981 if params
[cls
.COUNT
] <= 0:
1982 raise BadReaderParamsError("%s should be a positive int")
1983 if cls
.STRING_LENGTH
in params
and not (
1984 isinstance(params
[cls
.STRING_LENGTH
], int) and
1985 params
[cls
.STRING_LENGTH
] > 0):
1986 raise BadReaderParamsError("%s should be a positive int but is %s" %
1987 (cls
.STRING_LENGTH
, params
[cls
.STRING_LENGTH
]))
1988 if (not isinstance(mapper_spec
.shard_count
, int) or
1989 mapper_spec
.shard_count
<= 0):
1990 raise BadReaderParamsError(
1991 "shard_count should be a positive int but is %s" %
1992 mapper_spec
.shard_count
)
1995 def from_json(cls
, json
):
1996 return cls(json
[cls
.COUNT
], json
[cls
.STRING_LENGTH
])
1999 return {self
.COUNT
: self
._count
, self
.STRING_LENGTH
: self
._string
_length
}
2008 class NamespaceInputReader(InputReader
):
2009 """An input reader to iterate over namespaces.
2011 This reader yields namespace names as string.
2012 It will always produce only one shard.
2015 NAMESPACE_RANGE_PARAM
= "namespace_range"
2016 BATCH_SIZE_PARAM
= "batch_size"
2019 def __init__(self
, ns_range
, batch_size
=_BATCH_SIZE
):
2020 self
.ns_range
= ns_range
2021 self
._batch
_size
= batch_size
2024 """Serializes all the data in this query range into json form.
2027 all the data in json-compatible map.
2029 return {self
.NAMESPACE_RANGE_PARAM
: self
.ns_range
.to_json_object(),
2030 self
.BATCH_SIZE_PARAM
: self
._batch
_size
}
2033 def from_json(cls
, json
):
2034 """Create new DatastoreInputReader from the json, encoded by to_json.
2037 json: json map representation of DatastoreInputReader.
2040 an instance of DatastoreInputReader with all data deserialized from json.
2043 namespace_range
.NamespaceRange
.from_json_object(
2044 json
[cls
.NAMESPACE_RANGE_PARAM
]),
2045 json
[cls
.BATCH_SIZE_PARAM
])
2048 def validate(cls
, mapper_spec
):
2049 """Validates mapper spec.
2052 mapper_spec: The MapperSpec for this InputReader.
2055 BadReaderParamsError: required parameters are missing or invalid.
2057 if mapper_spec
.input_reader_class() != cls
:
2058 raise BadReaderParamsError("Input reader class mismatch")
2059 params
= _get_params(mapper_spec
)
2060 if cls
.BATCH_SIZE_PARAM
in params
:
2062 batch_size
= int(params
[cls
.BATCH_SIZE_PARAM
])
2064 raise BadReaderParamsError("Bad batch size: %s" % batch_size
)
2065 except ValueError, e
:
2066 raise BadReaderParamsError("Bad batch size: %s" % e
)
2069 def split_input(cls
, mapper_spec
):
2070 """Returns a list of input readers for the input spec.
2073 mapper_spec: The MapperSpec for this InputReader.
2076 A list of InputReaders.
2078 batch_size
= int(_get_params(mapper_spec
).get(
2079 cls
.BATCH_SIZE_PARAM
, cls
._BATCH
_SIZE
))
2080 shard_count
= mapper_spec
.shard_count
2081 namespace_ranges
= namespace_range
.NamespaceRange
.split(shard_count
,
2083 return [NamespaceInputReader(ns_range
, batch_size
)
2084 for ns_range
in namespace_ranges
]
2088 keys
= self
.ns_range
.make_datastore_query().Get(limit
=self
._batch
_size
)
2093 namespace
= metadata
.Namespace
.key_to_namespace(key
)
2094 self
.ns_range
= self
.ns_range
.with_start_after(namespace
)
2098 return repr(self
.ns_range
)
2101 class RecordsReader(InputReader
):
2102 """Reader to read a list of Files API file in records format.
2104 The number of input shards can be specified by the SHARDS_PARAM
2105 mapper parameter. Input files cannot be split, so there will be at most
2106 one shard per file. Also the number of shards will not be reduced based on
2107 the number of input files, so shards in always equals shards out.
2111 FILES_PARAM
= "files"
2113 def __init__(self
, filenames
, position
):
2117 filenames: list of filenames.
2118 position: file position to start reading from as int.
2120 self
._filenames
= filenames
2122 self
._reader
= records
.RecordsReader(
2123 files
.BufferedFile(self
._filenames
[0]))
2124 self
._reader
.seek(position
)
2129 """Iterate over records in file.
2138 start_time
= time
.time()
2139 record
= self
._reader
.read()
2141 operation
.counters
.Increment(
2142 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2143 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(record
))(ctx
)
2145 except (files
.ExistenceError
), e
:
2146 raise errors
.FailJobError("ExistenceError: %s" % e
)
2147 except (files
.UnknownError
), e
:
2148 raise errors
.RetrySliceError("UnknownError: %s" % e
)
2150 self
._filenames
.pop(0)
2151 if not self
._filenames
:
2154 self
._reader
= records
.RecordsReader(
2155 files
.BufferedFile(self
._filenames
[0]))
2158 def from_json(cls
, json
):
2159 """Creates an instance of the InputReader for the given input shard state.
2162 json: The InputReader state as a dict-like object.
2165 An instance of the InputReader configured using the values of json.
2167 return cls(json
["filenames"], json
["position"])
2170 """Returns an input shard state for the remaining inputs.
2173 A json-izable version of the remaining InputReader.
2176 "filenames": self
._filenames
,
2180 result
["position"] = self
._reader
.tell()
2184 def split_input(cls
, mapper_spec
):
2185 """Returns a list of input readers for the input spec.
2188 mapper_spec: The MapperSpec for this InputReader.
2191 A list of InputReaders.
2193 params
= _get_params(mapper_spec
)
2194 shard_count
= mapper_spec
.shard_count
2196 if cls
.FILES_PARAM
in params
:
2197 filenames
= params
[cls
.FILES_PARAM
]
2198 if isinstance(filenames
, basestring
):
2199 filenames
= filenames
.split(",")
2201 filenames
= [params
[cls
.FILE_PARAM
]]
2203 batch_list
= [[] for _
in xrange(shard_count
)]
2204 for index
, _
in enumerate(filenames
):
2206 batch_list
[index
% shard_count
].append(filenames
[index
])
2209 batch_list
.sort(reverse
=True, key
=len)
2210 return [cls(batch
, 0) for batch
in batch_list
]
2213 def validate(cls
, mapper_spec
):
2214 """Validates mapper spec and all mapper parameters.
2217 mapper_spec: The MapperSpec for this InputReader.
2220 BadReaderParamsError: required parameters are missing or invalid.
2222 if mapper_spec
.input_reader_class() != cls
:
2223 raise errors
.BadReaderParamsError("Input reader class mismatch")
2224 params
= _get_params(mapper_spec
)
2225 if (cls
.FILES_PARAM
not in params
and
2226 cls
.FILE_PARAM
not in params
):
2227 raise BadReaderParamsError(
2228 "Must specify '%s' or '%s' parameter for mapper input" %
2229 (cls
.FILES_PARAM
, cls
.FILE_PARAM
))
2234 position
= self
._reader
.tell()
2235 return "%s:%s" % (self
._filenames
, position
)
2238 class LogInputReader(InputReader
):
2239 """Input reader for a time range of logs via the Logs Reader API.
2241 The number of input shards may be specified by the SHARDS_PARAM mapper
2242 parameter. A starting and ending time (in seconds since the Unix epoch) are
2243 required to generate time ranges over which to shard the input.
2246 START_TIME_PARAM
= "start_time"
2247 END_TIME_PARAM
= "end_time"
2248 MINIMUM_LOG_LEVEL_PARAM
= "minimum_log_level"
2249 INCLUDE_INCOMPLETE_PARAM
= "include_incomplete"
2250 INCLUDE_APP_LOGS_PARAM
= "include_app_logs"
2251 VERSION_IDS_PARAM
= "version_ids"
2252 MODULE_VERSIONS_PARAM
= "module_versions"
2255 _OFFSET_PARAM
= "offset"
2256 _PROTOTYPE_REQUEST_PARAM
= "prototype_request"
2258 _PARAMS
= frozenset([START_TIME_PARAM
, END_TIME_PARAM
, _OFFSET_PARAM
,
2259 MINIMUM_LOG_LEVEL_PARAM
, INCLUDE_INCOMPLETE_PARAM
,
2260 INCLUDE_APP_LOGS_PARAM
, VERSION_IDS_PARAM
,
2261 MODULE_VERSIONS_PARAM
, _PROTOTYPE_REQUEST_PARAM
])
2262 _KWARGS
= frozenset([_OFFSET_PARAM
, _PROTOTYPE_REQUEST_PARAM
])
2267 minimum_log_level
=None,
2268 include_incomplete
=False,
2269 include_app_logs
=False,
2271 module_versions
=None,
2276 start_time: The earliest request completion or last-update time of logs
2277 that should be mapped over, in seconds since the Unix epoch.
2278 end_time: The latest request completion or last-update time that logs
2279 should be mapped over, in seconds since the Unix epoch.
2280 minimum_log_level: An application log level which serves as a filter on
2281 the requests mapped over--requests with no application log at or above
2282 the specified level will be omitted, even if include_app_logs is False.
2283 include_incomplete: Whether or not to include requests that have started
2284 but not yet finished, as a boolean. Defaults to False.
2285 include_app_logs: Whether or not to include application level logs in the
2286 mapped logs, as a boolean. Defaults to False.
2287 version_ids: A list of version ids whose logs should be read. This can not
2288 be used with module_versions
2289 module_versions: A list of tuples containing a module and version id
2290 whose logs should be read. This can not be used with version_ids
2291 **kwargs: A dictionary of keywords associated with this input reader.
2293 InputReader
.__init
__(self
)
2297 self
.__params
= dict(kwargs
)
2299 if start_time
is not None:
2300 self
.__params
[self
.START_TIME_PARAM
] = start_time
2301 if end_time
is not None:
2302 self
.__params
[self
.END_TIME_PARAM
] = end_time
2303 if minimum_log_level
is not None:
2304 self
.__params
[self
.MINIMUM_LOG_LEVEL_PARAM
] = minimum_log_level
2305 if include_incomplete
is not None:
2306 self
.__params
[self
.INCLUDE_INCOMPLETE_PARAM
] = include_incomplete
2307 if include_app_logs
is not None:
2308 self
.__params
[self
.INCLUDE_APP_LOGS_PARAM
] = include_app_logs
2310 self
.__params
[self
.VERSION_IDS_PARAM
] = version_ids
2312 self
.__params
[self
.MODULE_VERSIONS_PARAM
] = module_versions
2315 if self
._PROTOTYPE
_REQUEST
_PARAM
in self
.__params
:
2316 prototype_request
= log_service_pb
.LogReadRequest(
2317 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
])
2318 self
.__params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
2321 """Iterates over logs in a given range of time.
2324 A RequestLog containing all the information for a single request.
2326 for log
in logservice
.fetch(**self
.__params
):
2327 self
.__params
[self
._OFFSET
_PARAM
] = log
.offset
2331 def from_json(cls
, json
):
2332 """Creates an instance of the InputReader for the given input shard's state.
2335 json: The InputReader state as a dict-like object.
2338 An instance of the InputReader configured using the given JSON parameters.
2341 params
= dict((str(k
), v
) for k
, v
in json
.iteritems()
2342 if k
in cls
._PARAMS
)
2347 if cls
._OFFSET
_PARAM
in params
:
2348 params
[cls
._OFFSET
_PARAM
] = base64
.b64decode(params
[cls
._OFFSET
_PARAM
])
2349 return cls(**params
)
2352 """Returns an input shard state for the remaining inputs.
2355 A JSON serializable version of the remaining input to read.
2358 params
= dict(self
.__params
)
2359 if self
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2360 prototype_request
= params
[self
._PROTOTYPE
_REQUEST
_PARAM
]
2361 params
[self
._PROTOTYPE
_REQUEST
_PARAM
] = prototype_request
.Encode()
2362 if self
._OFFSET
_PARAM
in params
:
2363 params
[self
._OFFSET
_PARAM
] = base64
.b64encode(params
[self
._OFFSET
_PARAM
])
2367 def split_input(cls
, mapper_spec
):
2368 """Returns a list of input readers for the given input specification.
2371 mapper_spec: The MapperSpec for this InputReader.
2374 A list of InputReaders.
2376 params
= _get_params(mapper_spec
)
2377 shard_count
= mapper_spec
.shard_count
2380 start_time
= params
[cls
.START_TIME_PARAM
]
2381 end_time
= params
[cls
.END_TIME_PARAM
]
2382 seconds_per_shard
= (end_time
- start_time
) / shard_count
2386 for _
in xrange(shard_count
- 1):
2387 params
[cls
.END_TIME_PARAM
] = (params
[cls
.START_TIME_PARAM
] +
2389 shards
.append(LogInputReader(**params
))
2390 params
[cls
.START_TIME_PARAM
] = params
[cls
.END_TIME_PARAM
]
2393 params
[cls
.END_TIME_PARAM
] = end_time
2394 return shards
+ [LogInputReader(**params
)]
2397 def validate(cls
, mapper_spec
):
2398 """Validates the mapper's specification and all necessary parameters.
2401 mapper_spec: The MapperSpec to be used with this InputReader.
2404 BadReaderParamsError: If the user fails to specify both a starting time
2405 and an ending time, or if the starting time is later than the ending
2408 if mapper_spec
.input_reader_class() != cls
:
2409 raise errors
.BadReaderParamsError("Input reader class mismatch")
2411 params
= _get_params(mapper_spec
, allowed_keys
=cls
._PARAMS
)
2412 if (cls
.VERSION_IDS_PARAM
not in params
and
2413 cls
.MODULE_VERSIONS_PARAM
not in params
):
2414 raise errors
.BadReaderParamsError("Must specify a list of version ids or "
2415 "module/version ids for mapper input")
2416 if (cls
.VERSION_IDS_PARAM
in params
and
2417 cls
.MODULE_VERSIONS_PARAM
in params
):
2418 raise errors
.BadReaderParamsError("Can not supply both version ids or "
2419 "module/version ids. Use only one.")
2420 if (cls
.START_TIME_PARAM
not in params
or
2421 params
[cls
.START_TIME_PARAM
] is None):
2422 raise errors
.BadReaderParamsError("Must specify a starting time for "
2424 if cls
.END_TIME_PARAM
not in params
or params
[cls
.END_TIME_PARAM
] is None:
2425 params
[cls
.END_TIME_PARAM
] = time
.time()
2427 if params
[cls
.START_TIME_PARAM
] >= params
[cls
.END_TIME_PARAM
]:
2428 raise errors
.BadReaderParamsError("The starting time cannot be later "
2429 "than or the same as the ending time.")
2431 if cls
._PROTOTYPE
_REQUEST
_PARAM
in params
:
2433 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
] = log_service_pb
.LogReadRequest(
2434 params
[cls
._PROTOTYPE
_REQUEST
_PARAM
])
2435 except (TypeError, ProtocolBuffer
.ProtocolBufferDecodeError
):
2436 raise errors
.BadReaderParamsError("The prototype request must be "
2437 "parseable as a LogReadRequest.")
2443 logservice
.fetch(**params
)
2444 except logservice
.InvalidArgumentError
, e
:
2445 raise errors
.BadReaderParamsError("One or more parameters are not valid "
2446 "inputs to logservice.fetch(): %s" % e
)
2449 """Returns the string representation of this LogInputReader."""
2451 for key
in sorted(self
.__params
.keys()):
2452 value
= self
.__params
[key
]
2453 if key
is self
._PROTOTYPE
_REQUEST
_PARAM
:
2454 params
.append("%s='%s'" % (key
, value
))
2455 elif key
is self
._OFFSET
_PARAM
:
2456 params
.append("%s='%s'" % (key
, value
))
2458 params
.append("%s=%s" % (key
, value
))
2460 return "LogInputReader(%s)" % ", ".join(params
)
2463 class _GoogleCloudStorageInputReader(InputReader
):
2464 """Input reader from Google Cloud Storage using the cloudstorage library.
2466 This class is expected to be subclassed with a reader that understands
2469 Required configuration in the mapper_spec.input_reader dictionary.
2470 BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
2471 suffixed such as directories.
2472 OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
2473 in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
2474 treated as prefix and all objects with matching names will be read.
2475 Entries should not start with a slash unless that is part of the object's
2476 name. An example list could be:
2477 ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
2478 To retrieve all files "*" will match every object in the bucket. If a file
2479 is listed twice or is covered by multiple prefixes it will be read twice,
2480 there is no deduplication.
2482 Optional configuration in the mapper_sec.input_reader dictionary.
2483 BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
2484 DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
2485 The delimiter is used as a path separator to designate directory
2486 hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
2487 will stop at the first directory instead of matching
2488 all files under the directory. This allows MR to process bucket with
2489 hundreds of thousands of files.
2493 BUCKET_NAME_PARAM
= "bucket_name"
2494 OBJECT_NAMES_PARAM
= "objects"
2495 BUFFER_SIZE_PARAM
= "buffer_size"
2496 DELIMITER_PARAM
= "delimiter"
2499 _ACCOUNT_ID_PARAM
= "account_id"
2502 _JSON_PICKLE
= "pickle"
2503 _STRING_MAX_FILES_LISTED
= 10
2511 def __init__(self
, filenames
, index
=0, buffer_size
=None, _account_id
=None,
2513 """Initialize a GoogleCloudStorageInputReader instance.
2516 filenames: A list of Google Cloud Storage filenames of the form
2517 '/bucket/objectname'.
2518 index: Index of the next filename to read.
2519 buffer_size: The size of the read buffer, None to use default.
2520 _account_id: Internal use only. See cloudstorage documentation.
2521 delimiter: Delimiter used as path separator. See class doc for details.
2523 self
._filenames
= filenames
2525 self
._buffer
_size
= buffer_size
2526 self
._account
_id
= _account_id
2527 self
._delimiter
= delimiter
2529 self
._bucket
_iter
= None
2531 def _next_file(self
):
2532 """Find next filename.
2534 self._filenames may need to be expanded via listbucket.
2537 None if no more file is left. Filename otherwise.
2540 if self
._bucket
_iter
:
2542 return self
._bucket
_iter
.next().filename
2543 except StopIteration:
2544 self
._bucket
_iter
= None
2546 if self
._index
>= len(self
._filenames
):
2548 filename
= self
._filenames
[self
._index
]
2550 if self
._delimiter
is None or not filename
.endswith(self
._delimiter
):
2552 self
._bucket
= cloudstorage
.listbucket(filename
,
2553 delimiter
=self
._delimiter
)
2554 self
._bucket
_iter
= iter(self
._bucket
)
2557 def validate(cls
, mapper_spec
):
2558 """Validate mapper specification.
2561 mapper_spec: an instance of model.MapperSpec
2564 BadReaderParamsError: if the specification is invalid for any reason such
2565 as missing the bucket name or providing an invalid bucket name.
2567 reader_spec
= _get_params(mapper_spec
, allow_old
=False)
2570 if cls
.BUCKET_NAME_PARAM
not in reader_spec
:
2571 raise errors
.BadReaderParamsError(
2572 "%s is required for Google Cloud Storage" %
2573 cls
.BUCKET_NAME_PARAM
)
2575 cloudstorage
.validate_bucket_name(
2576 reader_spec
[cls
.BUCKET_NAME_PARAM
])
2577 except ValueError, error
:
2578 raise errors
.BadReaderParamsError("Bad bucket name, %s" % (error
))
2581 if cls
.OBJECT_NAMES_PARAM
not in reader_spec
:
2582 raise errors
.BadReaderParamsError(
2583 "%s is required for Google Cloud Storage" %
2584 cls
.OBJECT_NAMES_PARAM
)
2585 filenames
= reader_spec
[cls
.OBJECT_NAMES_PARAM
]
2586 if not isinstance(filenames
, list):
2587 raise errors
.BadReaderParamsError(
2588 "Object name list is not a list but a %s" %
2589 filenames
.__class
__.__name
__)
2590 for filename
in filenames
:
2591 if not isinstance(filename
, basestring
):
2592 raise errors
.BadReaderParamsError(
2593 "Object name is not a string but a %s" %
2594 filename
.__class
__.__name
__)
2595 if cls
.DELIMITER_PARAM
in reader_spec
:
2596 delimiter
= reader_spec
[cls
.DELIMITER_PARAM
]
2597 if not isinstance(delimiter
, basestring
):
2598 raise errors
.BadReaderParamsError(
2599 "%s is not a string but a %s" %
2600 (cls
.DELIMITER_PARAM
, type(delimiter
)))
2603 def split_input(cls
, mapper_spec
):
2604 """Returns a list of input readers.
2606 An equal number of input files are assigned to each shard (+/- 1). If there
2607 are fewer files than shards, fewer than the requested number of shards will
2608 be used. Input files are currently never split (although for some formats
2609 could be and may be split in a future implementation).
2612 mapper_spec: an instance of model.MapperSpec.
2615 A list of InputReaders. None when no input data can be found.
2617 reader_spec
= _get_params(mapper_spec
, allow_old
=False)
2618 bucket
= reader_spec
[cls
.BUCKET_NAME_PARAM
]
2619 filenames
= reader_spec
[cls
.OBJECT_NAMES_PARAM
]
2620 delimiter
= reader_spec
.get(cls
.DELIMITER_PARAM
)
2621 account_id
= reader_spec
.get(cls
._ACCOUNT
_ID
_PARAM
)
2622 buffer_size
= reader_spec
.get(cls
.BUFFER_SIZE_PARAM
)
2626 for filename
in filenames
:
2627 if filename
.endswith("*"):
2628 all_filenames
.extend(
2629 [file_stat
.filename
for file_stat
in cloudstorage
.listbucket(
2630 "/" + bucket
+ "/" + filename
[:-1], delimiter
=delimiter
,
2631 _account_id
=account_id
)])
2633 all_filenames
.append("/%s/%s" % (bucket
, filename
))
2637 for shard
in range(0, mapper_spec
.shard_count
):
2638 shard_filenames
= all_filenames
[shard
::mapper_spec
.shard_count
]
2641 shard_filenames
, buffer_size
=buffer_size
, _account_id
=account_id
,
2642 delimiter
=delimiter
))
2646 def from_json(cls
, state
):
2647 obj
= pickle
.loads(state
[cls
._JSON
_PICKLE
])
2649 obj
._bucket
_iter
= iter(obj
._bucket
)
2653 before_iter
= self
._bucket
_iter
2654 self
._bucket
_iter
= None
2656 return {self
._JSON
_PICKLE
: pickle
.dumps(self
)}
2658 self
._bucket
_itr
= before_iter
2661 """Returns the next input from this input reader, a block of bytes.
2663 Non existent files will be logged and skipped. The file might have been
2664 removed after input splitting.
2667 The next input from this input reader in the form of a cloudstorage
2668 ReadBuffer that supports a File-like interface (read, readline, seek,
2669 tell, and close). An error may be raised if the file can not be opened.
2672 StopIteration: The list of files has been exhausted.
2675 if self
._buffer
_size
:
2676 options
["read_buffer_size"] = self
._buffer
_size
2677 if self
._account
_id
:
2678 options
["_account_id"] = self
._account
_id
2680 filename
= self
._next
_file
()
2681 if filename
is None:
2682 raise StopIteration()
2684 start_time
= time
.time()
2685 handle
= cloudstorage
.open(filename
, **options
)
2689 operation
.counters
.Increment(
2690 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2693 except cloudstorage
.NotFoundError
:
2694 logging
.warning("File %s may have been removed. Skipping file.",
2699 num_files
= len(self
._filenames
)
2700 if num_files
> self
._STRING
_MAX
_FILES
_LISTED
:
2701 names
= "%s...%s + %d not shown" % (
2702 ",".join(self
._filenames
[0:self
._STRING
_MAX
_FILES
_LISTED
-1]),
2703 self
._filenames
[-1],
2704 num_files
- self
._STRING
_MAX
_FILES
_LISTED
)
2706 names
= ",".join(self
._filenames
)
2708 if self
._index
> num_files
:
2711 status
= "Next %s (%d of %d)" % (
2712 self
._filenames
[self
._index
],
2715 return "CloudStorage [%s, %s]" % (status
, names
)
2718 class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader
):
2719 """Read data from a Google Cloud Storage file using LevelDB format.
2721 See the _GoogleCloudStorageOutputWriter for additional configuration options.
2724 def __getstate__(self
):
2725 result
= self
.__dict
__.copy()
2727 if "_record_reader" in result
:
2730 result
.pop("_record_reader")
2734 """Returns the next input from this input reader, a record.
2737 The next input from this input reader in the form of a record read from
2741 StopIteration: The ordered set records has been exhausted.
2744 if not hasattr(self
, "_cur_handle") or self
._cur
_handle
is None:
2746 self
._cur
_handle
= super(_GoogleCloudStorageRecordInputReader
,
2748 if not hasattr(self
, "_record_reader") or self
._record
_reader
is None:
2749 self
._record
_reader
= records
.RecordsReader(self
._cur
_handle
)
2752 start_time
= time
.time()
2753 content
= self
._record
_reader
.read()
2757 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
2758 operation
.counters
.Increment(
2759 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
2763 self
._cur
_handle
= None
2764 self
._record
_reader
= None
2768 class _ReducerReader(RecordsReader
):
2769 """Reader to read KeyValues records files from Files API."""
2771 expand_parameters
= True
2773 def __init__(self
, filenames
, position
):
2774 super(_ReducerReader
, self
).__init
__(filenames
, position
)
2775 self
.current_key
= None
2776 self
.current_values
= None
2783 combiner_spec
= ctx
.mapreduce_spec
.mapper
.params
.get("combiner_spec")
2785 combiner
= util
.handler_for_name(combiner_spec
)
2787 for binary_record
in super(_ReducerReader
, self
).__iter
__():
2788 proto
= file_service_pb
.KeyValues()
2789 proto
.ParseFromString(binary_record
)
2792 if self
.current_key
is not None and self
.current_key
!= proto
.key():
2793 to_yield
= (self
.current_key
, self
.current_values
)
2794 self
.current_key
= None
2795 self
.current_values
= None
2797 if self
.current_key
is None:
2798 self
.current_key
= proto
.key()
2799 self
.current_values
= []
2802 combiner_result
= combiner(
2803 self
.current_key
, proto
.value_list(), self
.current_values
)
2805 if not util
.is_generator(combiner_result
):
2806 raise errors
.BadCombinerOutputError(
2807 "Combiner %s should yield values instead of returning them (%s)" %
2808 (combiner
, combiner_result
))
2810 self
.current_values
= []
2811 for value
in combiner_result
:
2812 if isinstance(value
, operation
.Operation
):
2816 self
.current_values
.append(value
)
2822 yield ALLOW_CHECKPOINT
2825 self
.current_values
.extend(proto
.value_list())
2830 yield ALLOW_CHECKPOINT
2834 if self
.current_key
is not None:
2835 to_yield
= (self
.current_key
, self
.current_values
)
2836 self
.current_key
= None
2837 self
.current_values
= None
2841 def encode_data(data
):
2842 """Encodes the given data, which may have include raw bytes.
2844 Works around limitations in JSON encoding, which cannot handle raw bytes.
2847 data: the data to encode.
2852 return base64
.b64encode(pickle
.dumps(data
))
2855 def decode_data(data
):
2856 """Decodes data encoded with the encode_data function."""
2857 return pickle
.loads(base64
.b64decode(data
))
2860 """Returns an input shard state for the remaining inputs.
2863 A json-izable version of the remaining InputReader.
2865 result
= super(_ReducerReader
, self
).to_json()
2866 result
["current_key"] = self
.encode_data(self
.current_key
)
2867 result
["current_values"] = self
.encode_data(self
.current_values
)
2871 def from_json(cls
, json
):
2872 """Creates an instance of the InputReader for the given input shard state.
2875 json: The InputReader state as a dict-like object.
2878 An instance of the InputReader configured using the values of json.
2880 result
= super(_ReducerReader
, cls
).from_json(json
)
2881 result
.current_key
= _ReducerReader
.decode_data(json
["current_key"])
2882 result
.current_values
= _ReducerReader
.decode_data(json
["current_values"])