App Engine Python SDK version 1.9.3
[gae.git] / python / google / appengine / ext / mapreduce / input_readers.py
blobba3c4e427ce1ec3686a59cbd1ae3a9d5683bcbe6
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Defines input readers for MapReduce."""
34 __all__ = [
35 "AbstractDatastoreInputReader",
36 "ALLOW_CHECKPOINT",
37 "BadReaderParamsError",
38 "BlobstoreLineInputReader",
39 "BlobstoreZipInputReader",
40 "BlobstoreZipLineInputReader",
41 "COUNTER_IO_READ_BYTES",
42 "COUNTER_IO_READ_MSEC",
43 "DatastoreEntityInputReader",
44 "DatastoreInputReader",
45 "DatastoreKeyInputReader",
46 "FileInputReader",
47 "RandomStringInputReader",
48 "RawDatastoreInputReader",
49 "Error",
50 "InputReader",
51 "LogInputReader",
52 "NamespaceInputReader",
53 "RecordsReader",
59 import base64
60 import copy
61 import logging
62 import pickle
63 import random
64 import string
65 import StringIO
66 import time
67 import zipfile
69 from google.net.proto import ProtocolBuffer
70 from google.appengine.ext import ndb
72 from google.appengine.api import datastore
73 from google.appengine.api import files
74 from google.appengine.api import logservice
75 from google.appengine.api.files import file_service_pb
76 from google.appengine.api.logservice import log_service_pb
77 from google.appengine.ext import blobstore
78 from google.appengine.ext import db
79 from google.appengine.ext import key_range
80 from google.appengine.ext.db import metadata
81 from google.appengine.ext.mapreduce import context
82 from google.appengine.ext.mapreduce import datastore_range_iterators as db_iters
83 from google.appengine.ext.mapreduce import errors
84 from google.appengine.ext.mapreduce import file_format_parser
85 from google.appengine.ext.mapreduce import file_format_root
86 from google.appengine.ext.mapreduce import json_util
87 from google.appengine.ext.mapreduce import key_ranges
88 from google.appengine.ext.mapreduce import model
89 from google.appengine.ext.mapreduce import namespace_range
90 from google.appengine.ext.mapreduce import operation
91 from google.appengine.ext.mapreduce import property_range
92 from google.appengine.ext.mapreduce import records
93 from google.appengine.ext.mapreduce import util
97 try:
99 from google.appengine.ext import cloudstorage
100 if hasattr(cloudstorage, "_STUB"):
101 cloudstorage = None
102 except ImportError:
103 pass
107 Error = errors.Error
108 BadReaderParamsError = errors.BadReaderParamsError
112 COUNTER_IO_READ_BYTES = "io-read-bytes"
115 COUNTER_IO_READ_MSEC = "io-read-msec"
120 ALLOW_CHECKPOINT = object()
123 class InputReader(json_util.JsonMixin):
124 """Abstract base class for input readers.
126 InputReaders have the following properties:
127 * They are created by using the split_input method to generate a set of
128 InputReaders from a MapperSpec.
129 * They generate inputs to the mapper via the iterator interface.
130 * After creation, they can be serialized and resumed using the JsonMixin
131 interface.
132 * They are cast to string for a user-readable description; it may be
133 valuable to implement __str__.
139 expand_parameters = False
142 _APP_PARAM = "_app"
143 NAMESPACE_PARAM = "namespace"
144 NAMESPACES_PARAM = "namespaces"
146 def __iter__(self):
147 return self
149 def next(self):
150 """Returns the next input from this input reader as a key, value pair.
152 Returns:
153 The next input from this input reader.
155 raise NotImplementedError("next() not implemented in %s" % self.__class__)
157 @classmethod
158 def from_json(cls, input_shard_state):
159 """Creates an instance of the InputReader for the given input shard state.
161 Args:
162 input_shard_state: The InputReader state as a dict-like object.
164 Returns:
165 An instance of the InputReader configured using the values of json.
167 raise NotImplementedError("from_json() not implemented in %s" % cls)
169 def to_json(self):
170 """Returns an input shard state for the remaining inputs.
172 Returns:
173 A json-izable version of the remaining InputReader.
175 raise NotImplementedError("to_json() not implemented in %s" %
176 self.__class__)
178 @classmethod
179 def split_input(cls, mapper_spec):
180 """Returns a list of input readers.
182 This method creates a list of input readers, each for one shard.
183 It attempts to split inputs among readers evenly.
185 Args:
186 mapper_spec: model.MapperSpec specifies the inputs and additional
187 parameters to define the behavior of input readers.
189 Returns:
190 A list of InputReaders. None or [] when no input data can be found.
192 raise NotImplementedError("split_input() not implemented in %s" % cls)
194 @classmethod
195 def validate(cls, mapper_spec):
196 """Validates mapper spec and all mapper parameters.
198 Input reader parameters are expected to be passed as "input_reader"
199 subdictionary in mapper_spec.params.
201 Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
202 to be compatible, input reader check mapper_spec.params as well and
203 issue a warning if "input_reader" subdicationary is not present.
205 Args:
206 mapper_spec: The MapperSpec for this InputReader.
208 Raises:
209 BadReaderParamsError: required parameters are missing or invalid.
211 if mapper_spec.input_reader_class() != cls:
212 raise BadReaderParamsError("Input reader class mismatch")
215 def _get_params(mapper_spec, allowed_keys=None, allow_old=True):
216 """Obtain input reader parameters.
218 Utility function for input readers implementation. Fetches parameters
219 from mapreduce specification giving appropriate usage warnings.
221 Args:
222 mapper_spec: The MapperSpec for the job
223 allowed_keys: set of all allowed keys in parameters as strings. If it is not
224 None, then parameters are expected to be in a separate "input_reader"
225 subdictionary of mapper_spec parameters.
226 allow_old: Allow parameters to exist outside of the input_reader
227 subdictionary for compatability.
229 Returns:
230 mapper parameters as dict
232 Raises:
233 BadReaderParamsError: if parameters are invalid/missing or not allowed.
235 if "input_reader" not in mapper_spec.params:
236 message = ("Input reader's parameters should be specified in "
237 "input_reader subdictionary.")
238 if not allow_old or allowed_keys:
239 raise errors.BadReaderParamsError(message)
240 params = mapper_spec.params
241 params = dict((str(n), v) for n, v in params.iteritems())
242 else:
243 if not isinstance(mapper_spec.params.get("input_reader"), dict):
244 raise errors.BadReaderParamsError(
245 "Input reader parameters should be a dictionary")
246 params = mapper_spec.params.get("input_reader")
247 params = dict((str(n), v) for n, v in params.iteritems())
248 if allowed_keys:
249 params_diff = set(params.keys()) - allowed_keys
250 if params_diff:
251 raise errors.BadReaderParamsError(
252 "Invalid input_reader parameters: %s" % ",".join(params_diff))
253 return params
256 class FileInputReader(InputReader):
257 """Reader to read Files API files of user specified format.
259 This class currently only supports Google Storage files. It will be extended
260 to support blobstore files in the future.
262 Reader Parameters:
263 files: a list of filenames or filename patterns.
264 filename must be of format '/gs/bucket/filename'.
265 filename pattern has format '/gs/bucket/prefix*'.
266 filename pattern will be expanded to filenames with the given prefix.
267 Please see parseGlob in the file api.files.gs.py which is included in the
268 App Engine SDK for supported patterns.
270 Example:
271 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
272 includes "file1", all files under bucket2, and files under bucket3 with
273 a prefix "p" in its name.
275 format: format string determines what your map function gets as its input.
276 format string can be "lines", "bytes", "zip", or a cascade of them plus
277 optional parameters. See file_formats.FORMATS for all supported formats.
278 See file_format_parser._FileFormatParser for format string syntax.
280 Example:
281 "lines": your map function gets files' contents line by line.
282 "bytes": your map function gets files' contents entirely.
283 "zip": InputReader unzips files and feeds your map function each of
284 the archive's member files as a whole.
285 "zip[bytes]: same as above.
286 "zip[lines]": InputReader unzips files and feeds your map function
287 files' contents line by line.
288 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
289 file with utf32 encoding and feeds your map function line by line.
290 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
291 base64 encoding, unzips each file, reads each of them with utf32
292 encoding and feeds your map function line by line.
294 Note that "encoding" only teaches InputReader how to interpret files.
295 The input your map function gets is always a Python str.
299 FILES_PARAM = "files"
300 FORMAT_PARAM = "format"
302 def __init__(self, format_root):
303 """Initialize input reader.
305 Args:
306 format_root: a FileFormatRoot instance.
308 self._file_format_root = format_root
310 def __iter__(self):
311 """Inherit docs."""
312 return self
314 def next(self):
315 """Inherit docs."""
316 ctx = context.get()
317 start_time = time.time()
319 content = self._file_format_root.next().read()
321 if ctx:
322 operation.counters.Increment(
323 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
324 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
326 return content
328 @classmethod
329 def split_input(cls, mapper_spec):
330 """Inherit docs."""
331 params = _get_params(mapper_spec)
334 filenames = []
335 for f in params[cls.FILES_PARAM]:
336 parsedName = files.gs.parseGlob(f)
337 if isinstance(parsedName, tuple):
338 filenames.extend(files.gs.listdir(parsedName[0],
339 {"prefix": parsedName[1]}))
340 else:
341 filenames.append(parsedName)
343 file_format_roots = file_format_root.split(filenames,
344 params[cls.FORMAT_PARAM],
345 mapper_spec.shard_count)
347 if file_format_roots is None:
348 return []
349 return [cls(root) for root in file_format_roots]
351 @classmethod
352 def validate(cls, mapper_spec):
353 """Inherit docs."""
354 if mapper_spec.input_reader_class() != cls:
355 raise BadReaderParamsError("Mapper input reader class mismatch")
358 params = _get_params(mapper_spec)
359 if cls.FILES_PARAM not in params:
360 raise BadReaderParamsError("Must specify %s" % cls.FILES_PARAM)
361 if cls.FORMAT_PARAM not in params:
362 raise BadReaderParamsError("Must specify %s" % cls.FORMAT_PARAM)
364 format_string = params[cls.FORMAT_PARAM]
365 if not isinstance(format_string, basestring):
366 raise BadReaderParamsError("format should be string but is %s" %
367 cls.FORMAT_PARAM)
368 try:
369 file_format_parser.parse(format_string)
370 except ValueError, e:
371 raise BadReaderParamsError(e)
373 paths = params[cls.FILES_PARAM]
374 if not (paths and isinstance(paths, list)):
375 raise BadReaderParamsError("files should be a list of filenames.")
378 try:
379 for path in paths:
380 files.gs.parseGlob(path)
381 except files.InvalidFileNameError:
382 raise BadReaderParamsError("Invalid filename %s." % path)
384 @classmethod
385 def from_json(cls, json):
386 """Inherit docs."""
387 return cls(
388 file_format_root.FileFormatRoot.from_json(json["file_format_root"]))
390 def to_json(self):
391 """Inherit docs."""
392 return {"file_format_root": self._file_format_root.to_json()}
395 class AbstractDatastoreInputReader(InputReader):
396 """Abstract class for datastore input readers."""
399 _BATCH_SIZE = 50
402 _MAX_SHARD_COUNT = 256
407 MAX_NAMESPACES_FOR_KEY_SHARD = 10
410 ENTITY_KIND_PARAM = "entity_kind"
411 KEYS_ONLY_PARAM = "keys_only"
412 BATCH_SIZE_PARAM = "batch_size"
413 KEY_RANGE_PARAM = "key_range"
414 FILTERS_PARAM = "filters"
416 _KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator
418 def __init__(self, iterator):
419 """Create new DatastoreInputReader object.
421 This is internal constructor. Use split_input to create readers instead.
423 Args:
424 iterator: an iterator that generates objects for this input reader.
426 self._iter = iterator
428 def __iter__(self):
429 """Yields whatever internal iterator yields."""
430 for o in self._iter:
431 yield o
433 def __str__(self):
434 """Returns the string representation of this InputReader."""
435 return repr(self._iter)
437 def to_json(self):
438 """Serializes input reader to json compatible format.
440 Returns:
441 all the data in json-compatible map.
443 return self._iter.to_json()
445 @classmethod
446 def from_json(cls, json):
447 """Create new DatastoreInputReader from json, encoded by to_json.
449 Args:
450 json: json representation of DatastoreInputReader.
452 Returns:
453 an instance of DatastoreInputReader with all data deserialized from json.
455 return cls(db_iters.RangeIteratorFactory.from_json(json))
457 @classmethod
458 def _get_query_spec(cls, mapper_spec):
459 """Construct a model.QuerySpec from model.MapperSpec."""
460 params = _get_params(mapper_spec)
461 entity_kind = params[cls.ENTITY_KIND_PARAM]
462 filters = params.get(cls.FILTERS_PARAM)
463 app = params.get(cls._APP_PARAM)
464 ns = params.get(cls.NAMESPACE_PARAM)
466 return model.QuerySpec(
467 entity_kind=cls._get_raw_entity_kind(entity_kind),
468 keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)),
469 filters=filters,
470 batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)),
471 model_class_path=entity_kind,
472 app=app,
473 ns=ns)
475 @classmethod
476 def split_input(cls, mapper_spec):
477 """Inherit doc."""
478 shard_count = mapper_spec.shard_count
479 query_spec = cls._get_query_spec(mapper_spec)
481 namespaces = None
482 if query_spec.ns is not None:
483 k_ranges = cls._to_key_ranges_by_shard(
484 query_spec.app, [query_spec.ns], shard_count, query_spec)
485 else:
486 ns_keys = namespace_range.get_namespace_keys(
487 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
490 if not ns_keys:
491 return
494 elif len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
495 namespaces = [ns_key.name() or "" for ns_key in ns_keys]
496 k_ranges = cls._to_key_ranges_by_shard(
497 query_spec.app, namespaces, shard_count, query_spec)
499 else:
500 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
501 contiguous=False,
502 can_query=lambda: True,
503 _app=query_spec.app)
504 k_ranges = [key_ranges.KeyRangesFactory.create_from_ns_range(ns_range)
505 for ns_range in ns_ranges]
507 iters = [db_iters.RangeIteratorFactory.create_key_ranges_iterator(
508 r, query_spec, cls._KEY_RANGE_ITER_CLS) for r in k_ranges]
510 return [cls(i) for i in iters]
512 @classmethod
513 def _to_key_ranges_by_shard(cls, app, namespaces, shard_count, query_spec):
514 """Get a list of key_ranges.KeyRanges objects, one for each shard.
516 This method uses scatter index to split each namespace into pieces
517 and assign those pieces to shards.
519 Args:
520 app: app_id in str.
521 namespaces: a list of namespaces in str.
522 shard_count: number of shards to split.
523 query_spec: model.QuerySpec.
525 Returns:
526 a list of key_ranges.KeyRanges objects.
528 key_ranges_by_ns = []
531 for namespace in namespaces:
532 ranges = cls._split_ns_by_scatter(
533 shard_count,
534 namespace,
535 query_spec.entity_kind,
536 app)
539 random.shuffle(ranges)
540 key_ranges_by_ns.append(ranges)
545 ranges_by_shard = [[] for _ in range(shard_count)]
546 for ranges in key_ranges_by_ns:
547 for i, k_range in enumerate(ranges):
548 if k_range:
549 ranges_by_shard[i].append(k_range)
551 key_ranges_by_shard = []
552 for ranges in ranges_by_shard:
553 if ranges:
554 key_ranges_by_shard.append(key_ranges.KeyRangesFactory.create_from_list(
555 ranges))
556 return key_ranges_by_shard
558 @classmethod
559 def _split_ns_by_scatter(cls,
560 shard_count,
561 namespace,
562 raw_entity_kind,
563 app):
564 """Split a namespace by scatter index into key_range.KeyRange.
566 TODO: Power this with key_range.KeyRange.compute_split_points.
568 Args:
569 shard_count: number of shards.
570 namespace: namespace name to split. str.
571 raw_entity_kind: low level datastore API entity kind.
572 app: app id in str.
574 Returns:
575 A list of key_range.KeyRange objects. If there are not enough entities to
576 splits into requested shards, the returned list will contain KeyRanges
577 ordered lexicographically with any Nones appearing at the end.
579 if shard_count == 1:
581 return [key_range.KeyRange(namespace=namespace, _app=app)]
583 ds_query = datastore.Query(kind=raw_entity_kind,
584 namespace=namespace,
585 _app=app,
586 keys_only=True)
587 ds_query.Order("__scatter__")
588 oversampling_factor = 32
589 random_keys = ds_query.Get(shard_count * oversampling_factor)
591 if not random_keys:
594 return ([key_range.KeyRange(namespace=namespace, _app=app)] +
595 [None] * (shard_count - 1))
597 random_keys.sort()
599 if len(random_keys) >= shard_count:
601 random_keys = cls._choose_split_points(random_keys, shard_count)
603 k_ranges = []
605 k_ranges.append(key_range.KeyRange(
606 key_start=None,
607 key_end=random_keys[0],
608 direction=key_range.KeyRange.ASC,
609 include_start=False,
610 include_end=False,
611 namespace=namespace,
612 _app=app))
614 for i in range(0, len(random_keys) - 1):
615 k_ranges.append(key_range.KeyRange(
616 key_start=random_keys[i],
617 key_end=random_keys[i+1],
618 direction=key_range.KeyRange.ASC,
619 include_start=True,
620 include_end=False,
621 namespace=namespace,
622 _app=app))
624 k_ranges.append(key_range.KeyRange(
625 key_start=random_keys[-1],
626 key_end=None,
627 direction=key_range.KeyRange.ASC,
628 include_start=True,
629 include_end=False,
630 namespace=namespace,
631 _app=app))
633 if len(k_ranges) < shard_count:
635 k_ranges += [None] * (shard_count - len(k_ranges))
636 return k_ranges
638 @classmethod
639 def _choose_split_points(cls, sorted_keys, shard_count):
640 """Returns the best split points given a random set of datastore.Keys."""
641 assert len(sorted_keys) >= shard_count
642 index_stride = len(sorted_keys) / float(shard_count)
643 return [sorted_keys[int(round(index_stride * i))]
644 for i in range(1, shard_count)]
646 @classmethod
647 def validate(cls, mapper_spec):
648 """Inherit docs."""
649 params = _get_params(mapper_spec)
650 if cls.ENTITY_KIND_PARAM not in params:
651 raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
652 if cls.BATCH_SIZE_PARAM in params:
653 try:
654 batch_size = int(params[cls.BATCH_SIZE_PARAM])
655 if batch_size < 1:
656 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
657 except ValueError, e:
658 raise BadReaderParamsError("Bad batch size: %s" % e)
659 try:
660 bool(params.get(cls.KEYS_ONLY_PARAM, False))
661 except:
662 raise BadReaderParamsError("keys_only expects a boolean value but got %s",
663 params[cls.KEYS_ONLY_PARAM])
664 if cls.NAMESPACE_PARAM in params:
665 if not isinstance(params[cls.NAMESPACE_PARAM],
666 (str, unicode, type(None))):
667 raise BadReaderParamsError(
668 "Expected a single namespace string")
669 if cls.NAMESPACES_PARAM in params:
670 raise BadReaderParamsError("Multiple namespaces are no longer supported")
671 if cls.FILTERS_PARAM in params:
672 filters = params[cls.FILTERS_PARAM]
673 if not isinstance(filters, list):
674 raise BadReaderParamsError("Expected list for filters parameter")
675 for f in filters:
676 if not isinstance(f, (tuple, list)):
677 raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
678 if len(f) != 3:
679 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
680 prop, op, _ = f
681 if not isinstance(prop, basestring):
682 raise BadReaderParamsError("Property should be string: %s", prop)
683 if not isinstance(op, basestring):
684 raise BadReaderParamsError("Operator should be string: %s", op)
686 @classmethod
687 def _get_raw_entity_kind(cls, entity_kind_or_model_classpath):
688 """Returns the entity kind to use with low level datastore calls.
690 Args:
691 entity_kind_or_model_classpath: user specified entity kind or model
692 classpath.
694 Returns:
695 the entity kind in str to use with low level datastore calls.
697 return entity_kind_or_model_classpath
700 class RawDatastoreInputReader(AbstractDatastoreInputReader):
701 """Iterates over an entity kind and yields datastore.Entity."""
703 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityIterator
705 @classmethod
706 def validate(cls, mapper_spec):
707 """Inherit docs."""
708 super(RawDatastoreInputReader, cls).validate(mapper_spec)
709 params = _get_params(mapper_spec)
710 entity_kind = params[cls.ENTITY_KIND_PARAM]
711 if "." in entity_kind:
712 logging.warning(
713 ". detected in entity kind %s specified for reader %s."
714 "Assuming entity kind contains the dot.",
715 entity_kind, cls.__name__)
716 if cls.FILTERS_PARAM in params:
717 filters = params[cls.FILTERS_PARAM]
718 for f in filters:
719 if f[1] != "=":
720 raise BadReaderParamsError(
721 "Only equality filters are supported: %s", f)
724 class DatastoreInputReader(AbstractDatastoreInputReader):
725 """Iterates over a Model and yields model instances.
727 Supports both db.model and ndb.model.
730 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeModelIterator
732 @classmethod
733 def _get_raw_entity_kind(cls, model_classpath):
734 entity_type = util.for_name(model_classpath)
735 if isinstance(entity_type, db.Model):
736 return entity_type.kind()
737 elif isinstance(entity_type, (ndb.Model, ndb.MetaModel)):
739 return entity_type._get_kind()
740 else:
741 return util.get_short_name(model_classpath)
743 @classmethod
744 def validate(cls, mapper_spec):
745 """Inherit docs."""
746 super(DatastoreInputReader, cls).validate(mapper_spec)
747 params = _get_params(mapper_spec)
748 entity_kind = params[cls.ENTITY_KIND_PARAM]
750 try:
751 model_class = util.for_name(entity_kind)
752 except ImportError, e:
753 raise BadReaderParamsError("Bad entity kind: %s" % e)
754 if cls.FILTERS_PARAM in params:
755 filters = params[cls.FILTERS_PARAM]
756 if issubclass(model_class, db.Model):
757 cls._validate_filters(filters, model_class)
758 else:
759 cls._validate_filters_ndb(filters, model_class)
760 property_range.PropertyRange(filters, entity_kind)
762 @classmethod
763 def _validate_filters(cls, filters, model_class):
764 """Validate user supplied filters.
766 Validate filters are on existing properties and filter values
767 have valid semantics.
769 Args:
770 filters: user supplied filters. Each filter should be a list or tuple of
771 format (<property_name_as_str>, <query_operator_as_str>,
772 <value_of_certain_type>). Value type is up to the property's type.
773 model_class: the db.Model class for the entity type to apply filters on.
775 Raises:
776 BadReaderParamsError: if any filter is invalid in any way.
778 if not filters:
779 return
781 properties = model_class.properties()
783 for f in filters:
784 prop, _, val = f
785 if prop not in properties:
786 raise errors.BadReaderParamsError(
787 "Property %s is not defined for entity type %s",
788 prop, model_class.kind())
792 try:
793 properties[prop].validate(val)
794 except db.BadValueError, e:
795 raise errors.BadReaderParamsError(e)
797 @classmethod
799 def _validate_filters_ndb(cls, filters, model_class):
800 """Validate ndb.Model filters."""
801 if not filters:
802 return
804 properties = model_class._properties
806 for f in filters:
807 prop, _, val = f
808 if prop not in properties:
809 raise errors.BadReaderParamsError(
810 "Property %s is not defined for entity type %s",
811 prop, model_class._get_kind())
815 try:
816 properties[prop]._do_validate(val)
817 except db.BadValueError, e:
818 raise errors.BadReaderParamsError(e)
820 @classmethod
821 def split_input(cls, mapper_spec):
822 """Inherit docs."""
823 shard_count = mapper_spec.shard_count
824 query_spec = cls._get_query_spec(mapper_spec)
826 if not property_range.should_shard_by_property_range(query_spec.filters):
827 return super(DatastoreInputReader, cls).split_input(mapper_spec)
829 p_range = property_range.PropertyRange(query_spec.filters,
830 query_spec.model_class_path)
831 p_ranges = p_range.split(shard_count)
834 if query_spec.ns:
835 ns_range = namespace_range.NamespaceRange(
836 namespace_start=query_spec.ns,
837 namespace_end=query_spec.ns,
838 _app=query_spec.app)
839 ns_ranges = [copy.copy(ns_range) for _ in p_ranges]
840 else:
841 ns_keys = namespace_range.get_namespace_keys(
842 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
843 if not ns_keys:
844 return
847 if len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
848 ns_ranges = [namespace_range.NamespaceRange(_app=query_spec.app)
849 for _ in p_ranges]
851 else:
852 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
853 contiguous=False,
854 can_query=lambda: True,
855 _app=query_spec.app)
856 p_ranges = [copy.copy(p_range) for _ in ns_ranges]
858 assert len(p_ranges) == len(ns_ranges)
860 iters = [
861 db_iters.RangeIteratorFactory.create_property_range_iterator(
862 p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)]
863 return [cls(i) for i in iters]
866 class DatastoreKeyInputReader(RawDatastoreInputReader):
867 """Iterate over an entity kind and yields datastore.Key."""
869 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeKeyIterator
873 DatastoreEntityInputReader = RawDatastoreInputReader
878 class _OldAbstractDatastoreInputReader(InputReader):
879 """Abstract base class for classes that iterate over datastore entities.
881 Concrete subclasses must implement _iter_key_range(self, k_range). See the
882 docstring for that method for details.
886 _BATCH_SIZE = 50
889 _MAX_SHARD_COUNT = 256
892 _OVERSAMPLING_FACTOR = 32
897 MAX_NAMESPACES_FOR_KEY_SHARD = 10
900 ENTITY_KIND_PARAM = "entity_kind"
901 KEYS_ONLY_PARAM = "keys_only"
902 BATCH_SIZE_PARAM = "batch_size"
903 KEY_RANGE_PARAM = "key_range"
904 NAMESPACE_RANGE_PARAM = "namespace_range"
905 CURRENT_KEY_RANGE_PARAM = "current_key_range"
906 FILTERS_PARAM = "filters"
912 def __init__(self,
913 entity_kind,
914 key_ranges=None,
915 ns_range=None,
916 batch_size=_BATCH_SIZE,
917 current_key_range=None,
918 filters=None):
919 """Create new AbstractDatastoreInputReader object.
921 This is internal constructor. Use split_query in a concrete class instead.
923 Args:
924 entity_kind: entity kind as string.
925 key_ranges: a sequence of key_range.KeyRange instances to process. Only
926 one of key_ranges or ns_range can be non-None.
927 ns_range: a namespace_range.NamespaceRange to process. Only one of
928 key_ranges or ns_range can be non-None.
929 batch_size: size of read batch as int.
930 current_key_range: the current key_range.KeyRange being processed.
931 filters: optional list of filters to apply to the query. Each filter is
932 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
933 User filters are applied first.
935 assert key_ranges is not None or ns_range is not None, (
936 "must specify one of 'key_ranges' or 'ns_range'")
937 assert key_ranges is None or ns_range is None, (
938 "can't specify both 'key_ranges ' and 'ns_range'")
940 self._entity_kind = entity_kind
943 self._key_ranges = key_ranges and list(reversed(key_ranges))
945 self._ns_range = ns_range
946 self._batch_size = int(batch_size)
947 self._current_key_range = current_key_range
948 self._filters = filters
950 @classmethod
951 def _get_raw_entity_kind(cls, entity_kind):
952 if "." in entity_kind:
953 logging.warning(
954 ". detected in entity kind %s specified for reader %s."
955 "Assuming entity kind contains the dot.",
956 entity_kind, cls.__name__)
957 return entity_kind
959 def __iter__(self):
960 """Iterates over the given KeyRanges or NamespaceRange.
962 This method iterates over the given KeyRanges or NamespaceRange and sets
963 the self._current_key_range to the KeyRange currently being processed. It
964 then delegates to the _iter_key_range method to yield that actual
965 results.
967 Yields:
968 Forwards the objects yielded by the subclasses concrete _iter_key_range()
969 method. The caller must consume the result yielded because self.to_json()
970 will not include it.
972 if self._key_ranges is not None:
973 for o in self._iter_key_ranges():
974 yield o
975 elif self._ns_range is not None:
976 for o in self._iter_ns_range():
977 yield o
978 else:
979 assert False, "self._key_ranges and self._ns_range are both None"
981 def _iter_key_ranges(self):
982 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
983 while True:
984 if self._current_key_range is None:
985 if self._key_ranges:
986 self._current_key_range = self._key_ranges.pop()
989 continue
990 else:
991 break
993 for key, o in self._iter_key_range(
994 copy.deepcopy(self._current_key_range)):
997 self._current_key_range.advance(key)
998 yield o
999 self._current_key_range = None
1001 def _iter_ns_range(self):
1002 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
1003 while True:
1004 if self._current_key_range is None:
1005 query = self._ns_range.make_datastore_query()
1006 namespace_result = query.Get(1)
1007 if not namespace_result:
1008 break
1010 namespace = namespace_result[0].name() or ""
1011 self._current_key_range = key_range.KeyRange(
1012 namespace=namespace, _app=self._ns_range.app)
1013 yield ALLOW_CHECKPOINT
1015 for key, o in self._iter_key_range(
1016 copy.deepcopy(self._current_key_range)):
1019 self._current_key_range.advance(key)
1020 yield o
1022 if (self._ns_range.is_single_namespace or
1023 self._current_key_range.namespace == self._ns_range.namespace_end):
1024 break
1025 self._ns_range = self._ns_range.with_start_after(
1026 self._current_key_range.namespace)
1027 self._current_key_range = None
1029 def _iter_key_range(self, k_range):
1030 """Yields a db.Key and the value that should be yielded by self.__iter__().
1032 Args:
1033 k_range: The key_range.KeyRange to iterate over.
1035 Yields:
1036 A 2-tuple containing the last db.Key processed and the value that should
1037 be yielded by __iter__. The returned db.Key will be used to determine the
1038 InputReader's current position in self._current_key_range.
1040 raise NotImplementedError("_iter_key_range() not implemented in %s" %
1041 self.__class__)
1043 def __str__(self):
1044 """Returns the string representation of this InputReader."""
1045 if self._ns_range is None:
1046 return repr(self._key_ranges)
1047 else:
1048 return repr(self._ns_range)
1050 @classmethod
1051 def _choose_split_points(cls, sorted_keys, shard_count):
1052 """Returns the best split points given a random set of db.Keys."""
1053 assert len(sorted_keys) >= shard_count
1054 index_stride = len(sorted_keys) / float(shard_count)
1055 return [sorted_keys[int(round(index_stride * i))]
1056 for i in range(1, shard_count)]
1060 @classmethod
1061 def _split_input_from_namespace(cls, app, namespace, entity_kind,
1062 shard_count):
1063 """Helper for _split_input_from_params.
1065 If there are not enough Entities to make all of the given shards, the
1066 returned list of KeyRanges will include Nones. The returned list will
1067 contain KeyRanges ordered lexographically with any Nones appearing at the
1068 end.
1070 Args:
1071 app: the app.
1072 namespace: the namespace.
1073 entity_kind: entity kind as string.
1074 shard_count: the number of shards.
1076 Returns:
1077 KeyRange objects.
1080 raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
1081 if shard_count == 1:
1083 return [key_range.KeyRange(namespace=namespace, _app=app)]
1085 ds_query = datastore.Query(kind=raw_entity_kind,
1086 namespace=namespace,
1087 _app=app,
1088 keys_only=True)
1089 ds_query.Order("__scatter__")
1090 random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
1092 if not random_keys:
1095 return ([key_range.KeyRange(namespace=namespace, _app=app)] +
1096 [None] * (shard_count - 1))
1098 random_keys.sort()
1100 if len(random_keys) >= shard_count:
1102 random_keys = cls._choose_split_points(random_keys, shard_count)
1105 key_ranges = []
1107 key_ranges.append(key_range.KeyRange(
1108 key_start=None,
1109 key_end=random_keys[0],
1110 direction=key_range.KeyRange.ASC,
1111 include_start=False,
1112 include_end=False,
1113 namespace=namespace,
1114 _app=app))
1116 for i in range(0, len(random_keys) - 1):
1117 key_ranges.append(key_range.KeyRange(
1118 key_start=random_keys[i],
1119 key_end=random_keys[i+1],
1120 direction=key_range.KeyRange.ASC,
1121 include_start=True,
1122 include_end=False,
1123 namespace=namespace,
1124 _app=app))
1126 key_ranges.append(key_range.KeyRange(
1127 key_start=random_keys[-1],
1128 key_end=None,
1129 direction=key_range.KeyRange.ASC,
1130 include_start=True,
1131 include_end=False,
1132 namespace=namespace,
1133 _app=app))
1135 if len(key_ranges) < shard_count:
1137 key_ranges += [None] * (shard_count - len(key_ranges))
1139 return key_ranges
1141 @classmethod
1142 def _split_input_from_params(cls, app, namespaces, entity_kind_name,
1143 params, shard_count):
1144 """Return input reader objects. Helper for split_input."""
1146 key_ranges = []
1147 for namespace in namespaces:
1148 key_ranges.extend(
1149 cls._split_input_from_namespace(app,
1150 namespace,
1151 entity_kind_name,
1152 shard_count))
1157 shared_ranges = [[] for _ in range(shard_count)]
1158 for i, k_range in enumerate(key_ranges):
1159 shared_ranges[i % shard_count].append(k_range)
1160 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1162 return [cls(entity_kind_name,
1163 key_ranges=key_ranges,
1164 ns_range=None,
1165 batch_size=batch_size)
1166 for key_ranges in shared_ranges if key_ranges]
1168 @classmethod
1169 def validate(cls, mapper_spec):
1170 """Validates mapper spec and all mapper parameters.
1172 Args:
1173 mapper_spec: The MapperSpec for this InputReader.
1175 Raises:
1176 BadReaderParamsError: required parameters are missing or invalid.
1178 if mapper_spec.input_reader_class() != cls:
1179 raise BadReaderParamsError("Input reader class mismatch")
1180 params = _get_params(mapper_spec)
1181 if cls.ENTITY_KIND_PARAM not in params:
1182 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
1183 if cls.BATCH_SIZE_PARAM in params:
1184 try:
1185 batch_size = int(params[cls.BATCH_SIZE_PARAM])
1186 if batch_size < 1:
1187 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
1188 except ValueError, e:
1189 raise BadReaderParamsError("Bad batch size: %s" % e)
1190 if cls.NAMESPACE_PARAM in params:
1191 if not isinstance(params[cls.NAMESPACE_PARAM],
1192 (str, unicode, type(None))):
1193 raise BadReaderParamsError(
1194 "Expected a single namespace string")
1195 if cls.NAMESPACES_PARAM in params:
1196 raise BadReaderParamsError("Multiple namespaces are no longer supported")
1197 if cls.FILTERS_PARAM in params:
1198 filters = params[cls.FILTERS_PARAM]
1199 if not isinstance(filters, list):
1200 raise BadReaderParamsError("Expected list for filters parameter")
1201 for f in filters:
1202 if not isinstance(f, (tuple, list)):
1203 raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
1204 if len(f) != 3:
1205 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
1206 if not isinstance(f[0], basestring):
1207 raise BadReaderParamsError("First element should be string: %s", f)
1208 if f[1] != "=":
1209 raise BadReaderParamsError(
1210 "Only equality filters are supported: %s", f)
1212 @classmethod
1213 def split_input(cls, mapper_spec):
1214 """Splits query into shards without fetching query results.
1216 Tries as best as it can to split the whole query result set into equal
1217 shards. Due to difficulty of making the perfect split, resulting shards'
1218 sizes might differ significantly from each other.
1220 Args:
1221 mapper_spec: MapperSpec with params containing 'entity_kind'.
1222 May have 'namespace' in the params as a string containing a single
1223 namespace. If specified then the input reader will only yield values
1224 in the given namespace. If 'namespace' is not given then values from
1225 all namespaces will be yielded. May also have 'batch_size' in the params
1226 to specify the number of entities to process in each batch.
1228 Returns:
1229 A list of InputReader objects. If the query results are empty then the
1230 empty list will be returned. Otherwise, the list will always have a length
1231 equal to number_of_shards but may be padded with Nones if there are too
1232 few results for effective sharding.
1234 params = _get_params(mapper_spec)
1235 entity_kind_name = params[cls.ENTITY_KIND_PARAM]
1236 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1237 shard_count = mapper_spec.shard_count
1238 namespace = params.get(cls.NAMESPACE_PARAM)
1239 app = params.get(cls._APP_PARAM)
1240 filters = params.get(cls.FILTERS_PARAM)
1242 if namespace is None:
1254 namespace_query = datastore.Query("__namespace__",
1255 keys_only=True,
1256 _app=app)
1257 namespace_keys = namespace_query.Get(
1258 limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
1260 if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
1261 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
1262 contiguous=True,
1263 _app=app)
1264 return [cls(entity_kind_name,
1265 key_ranges=None,
1266 ns_range=ns_range,
1267 batch_size=batch_size,
1268 filters=filters)
1269 for ns_range in ns_ranges]
1270 elif not namespace_keys:
1271 return [cls(entity_kind_name,
1272 key_ranges=None,
1273 ns_range=namespace_range.NamespaceRange(_app=app),
1274 batch_size=shard_count,
1275 filters=filters)]
1276 else:
1277 namespaces = [namespace_key.name() or ""
1278 for namespace_key in namespace_keys]
1279 else:
1280 namespaces = [namespace]
1282 readers = cls._split_input_from_params(
1283 app, namespaces, entity_kind_name, params, shard_count)
1284 if filters:
1285 for reader in readers:
1286 reader._filters = filters
1287 return readers
1289 def to_json(self):
1290 """Serializes all the data in this query range into json form.
1292 Returns:
1293 all the data in json-compatible map.
1295 if self._key_ranges is None:
1296 key_ranges_json = None
1297 else:
1298 key_ranges_json = []
1299 for k in self._key_ranges:
1300 if k:
1301 key_ranges_json.append(k.to_json())
1302 else:
1303 key_ranges_json.append(None)
1305 if self._ns_range is None:
1306 namespace_range_json = None
1307 else:
1308 namespace_range_json = self._ns_range.to_json_object()
1310 if self._current_key_range is None:
1311 current_key_range_json = None
1312 else:
1313 current_key_range_json = self._current_key_range.to_json()
1315 json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
1316 self.NAMESPACE_RANGE_PARAM: namespace_range_json,
1317 self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
1318 self.ENTITY_KIND_PARAM: self._entity_kind,
1319 self.BATCH_SIZE_PARAM: self._batch_size,
1320 self.FILTERS_PARAM: self._filters}
1321 return json_dict
1323 @classmethod
1324 def from_json(cls, json):
1325 """Create new DatastoreInputReader from the json, encoded by to_json.
1327 Args:
1328 json: json map representation of DatastoreInputReader.
1330 Returns:
1331 an instance of DatastoreInputReader with all data deserialized from json.
1333 if json[cls.KEY_RANGE_PARAM] is None:
1335 key_ranges = None
1336 else:
1337 key_ranges = []
1338 for k in json[cls.KEY_RANGE_PARAM]:
1339 if k:
1340 key_ranges.append(key_range.KeyRange.from_json(k))
1341 else:
1342 key_ranges.append(None)
1344 if json[cls.NAMESPACE_RANGE_PARAM] is None:
1345 ns_range = None
1346 else:
1347 ns_range = namespace_range.NamespaceRange.from_json_object(
1348 json[cls.NAMESPACE_RANGE_PARAM])
1350 if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
1351 current_key_range = None
1352 else:
1353 current_key_range = key_range.KeyRange.from_json(
1354 json[cls.CURRENT_KEY_RANGE_PARAM])
1356 return cls(
1357 json[cls.ENTITY_KIND_PARAM],
1358 key_ranges,
1359 ns_range,
1360 json[cls.BATCH_SIZE_PARAM],
1361 current_key_range,
1362 filters=json.get(cls.FILTERS_PARAM))
1365 class BlobstoreLineInputReader(InputReader):
1366 """Input reader for a newline delimited blob in Blobstore."""
1369 _BLOB_BUFFER_SIZE = 64000
1372 _MAX_SHARD_COUNT = 256
1375 _MAX_BLOB_KEYS_COUNT = 246
1378 BLOB_KEYS_PARAM = "blob_keys"
1381 INITIAL_POSITION_PARAM = "initial_position"
1382 END_POSITION_PARAM = "end_position"
1383 BLOB_KEY_PARAM = "blob_key"
1385 def __init__(self, blob_key, start_position, end_position):
1386 """Initializes this instance with the given blob key and character range.
1388 This BlobstoreInputReader will read from the first record starting after
1389 strictly after start_position until the first record ending at or after
1390 end_position (exclusive). As an exception, if start_position is 0, then
1391 this InputReader starts reading at the first record.
1393 Args:
1394 blob_key: the BlobKey that this input reader is processing.
1395 start_position: the position to start reading at.
1396 end_position: a position in the last record to read.
1398 self._blob_key = blob_key
1399 self._blob_reader = blobstore.BlobReader(blob_key,
1400 self._BLOB_BUFFER_SIZE,
1401 start_position)
1402 self._end_position = end_position
1403 self._has_iterated = False
1404 self._read_before_start = bool(start_position)
1406 def next(self):
1407 """Returns the next input from as an (offset, line) tuple."""
1408 self._has_iterated = True
1410 if self._read_before_start:
1411 self._blob_reader.readline()
1412 self._read_before_start = False
1413 start_position = self._blob_reader.tell()
1415 if start_position > self._end_position:
1416 raise StopIteration()
1418 line = self._blob_reader.readline()
1420 if not line:
1421 raise StopIteration()
1423 return start_position, line.rstrip("\n")
1425 def to_json(self):
1426 """Returns an json-compatible input shard spec for remaining inputs."""
1427 new_pos = self._blob_reader.tell()
1428 if self._has_iterated:
1429 new_pos -= 1
1430 return {self.BLOB_KEY_PARAM: self._blob_key,
1431 self.INITIAL_POSITION_PARAM: new_pos,
1432 self.END_POSITION_PARAM: self._end_position}
1434 def __str__(self):
1435 """Returns the string representation of this BlobstoreLineInputReader."""
1436 return "blobstore.BlobKey(%r):[%d, %d]" % (
1437 self._blob_key, self._blob_reader.tell(), self._end_position)
1439 @classmethod
1440 def from_json(cls, json):
1441 """Instantiates an instance of this InputReader for the given shard spec."""
1442 return cls(json[cls.BLOB_KEY_PARAM],
1443 json[cls.INITIAL_POSITION_PARAM],
1444 json[cls.END_POSITION_PARAM])
1446 @classmethod
1447 def validate(cls, mapper_spec):
1448 """Validates mapper spec and all mapper parameters.
1450 Args:
1451 mapper_spec: The MapperSpec for this InputReader.
1453 Raises:
1454 BadReaderParamsError: required parameters are missing or invalid.
1456 if mapper_spec.input_reader_class() != cls:
1457 raise BadReaderParamsError("Mapper input reader class mismatch")
1458 params = _get_params(mapper_spec)
1459 if cls.BLOB_KEYS_PARAM not in params:
1460 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1461 blob_keys = params[cls.BLOB_KEYS_PARAM]
1462 if isinstance(blob_keys, basestring):
1465 blob_keys = blob_keys.split(",")
1466 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1467 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1468 if not blob_keys:
1469 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1470 for blob_key in blob_keys:
1471 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1472 if not blob_info:
1473 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1474 blob_key)
1476 @classmethod
1477 def split_input(cls, mapper_spec):
1478 """Returns a list of shard_count input_spec_shards for input_spec.
1480 Args:
1481 mapper_spec: The mapper specification to split from. Must contain
1482 'blob_keys' parameter with one or more blob keys.
1484 Returns:
1485 A list of BlobstoreInputReaders corresponding to the specified shards.
1487 params = _get_params(mapper_spec)
1488 blob_keys = params[cls.BLOB_KEYS_PARAM]
1489 if isinstance(blob_keys, basestring):
1492 blob_keys = blob_keys.split(",")
1494 blob_sizes = {}
1495 for blob_key in blob_keys:
1496 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1497 blob_sizes[blob_key] = blob_info.size
1499 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1500 shards_per_blob = shard_count // len(blob_keys)
1501 if shards_per_blob == 0:
1502 shards_per_blob = 1
1504 chunks = []
1505 for blob_key, blob_size in blob_sizes.items():
1506 blob_chunk_size = blob_size // shards_per_blob
1507 for i in xrange(shards_per_blob - 1):
1508 chunks.append(BlobstoreLineInputReader.from_json(
1509 {cls.BLOB_KEY_PARAM: blob_key,
1510 cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
1511 cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
1512 chunks.append(BlobstoreLineInputReader.from_json(
1513 {cls.BLOB_KEY_PARAM: blob_key,
1514 cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
1515 cls.END_POSITION_PARAM: blob_size}))
1516 return chunks
1519 class BlobstoreZipInputReader(InputReader):
1520 """Input reader for files from a zip archive stored in the Blobstore.
1522 Each instance of the reader will read the TOC, from the end of the zip file,
1523 and then only the contained files which it is responsible for.
1527 _MAX_SHARD_COUNT = 256
1530 BLOB_KEY_PARAM = "blob_key"
1531 START_INDEX_PARAM = "start_index"
1532 END_INDEX_PARAM = "end_index"
1534 def __init__(self, blob_key, start_index, end_index,
1535 _reader=blobstore.BlobReader):
1536 """Initializes this instance with the given blob key and file range.
1538 This BlobstoreZipInputReader will read from the file with index start_index
1539 up to but not including the file with index end_index.
1541 Args:
1542 blob_key: the BlobKey that this input reader is processing.
1543 start_index: the index of the first file to read.
1544 end_index: the index of the first file that will not be read.
1545 _reader: a callable that returns a file-like object for reading blobs.
1546 Used for dependency injection.
1548 self._blob_key = blob_key
1549 self._start_index = start_index
1550 self._end_index = end_index
1551 self._reader = _reader
1552 self._zip = None
1553 self._entries = None
1555 def next(self):
1556 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1558 Returns:
1559 The next input from this input reader, in the form of a 2-tuple.
1560 The first element of the tuple is a zipfile.ZipInfo object.
1561 The second element of the tuple is a zero-argument function that, when
1562 called, returns the complete body of the file.
1564 if not self._zip:
1565 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1567 self._entries = self._zip.infolist()[self._start_index:self._end_index]
1568 self._entries.reverse()
1569 if not self._entries:
1570 raise StopIteration()
1571 entry = self._entries.pop()
1572 self._start_index += 1
1573 return (entry, lambda: self._read(entry))
1575 def _read(self, entry):
1576 """Read entry content.
1578 Args:
1579 entry: zip file entry as zipfile.ZipInfo.
1580 Returns:
1581 Entry content as string.
1583 start_time = time.time()
1584 content = self._zip.read(entry.filename)
1586 ctx = context.get()
1587 if ctx:
1588 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1589 operation.counters.Increment(
1590 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1592 return content
1594 @classmethod
1595 def from_json(cls, json):
1596 """Creates an instance of the InputReader for the given input shard state.
1598 Args:
1599 json: The InputReader state as a dict-like object.
1601 Returns:
1602 An instance of the InputReader configured using the values of json.
1604 return cls(json[cls.BLOB_KEY_PARAM],
1605 json[cls.START_INDEX_PARAM],
1606 json[cls.END_INDEX_PARAM])
1608 def to_json(self):
1609 """Returns an input shard state for the remaining inputs.
1611 Returns:
1612 A json-izable version of the remaining InputReader.
1614 return {self.BLOB_KEY_PARAM: self._blob_key,
1615 self.START_INDEX_PARAM: self._start_index,
1616 self.END_INDEX_PARAM: self._end_index}
1618 def __str__(self):
1619 """Returns the string representation of this BlobstoreZipInputReader."""
1620 return "blobstore.BlobKey(%r):[%d, %d]" % (
1621 self._blob_key, self._start_index, self._end_index)
1623 @classmethod
1624 def validate(cls, mapper_spec):
1625 """Validates mapper spec and all mapper parameters.
1627 Args:
1628 mapper_spec: The MapperSpec for this InputReader.
1630 Raises:
1631 BadReaderParamsError: required parameters are missing or invalid.
1633 if mapper_spec.input_reader_class() != cls:
1634 raise BadReaderParamsError("Mapper input reader class mismatch")
1635 params = _get_params(mapper_spec)
1636 if cls.BLOB_KEY_PARAM not in params:
1637 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1638 blob_key = params[cls.BLOB_KEY_PARAM]
1639 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1640 if not blob_info:
1641 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1642 blob_key)
1644 @classmethod
1645 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1646 """Returns a list of input shard states for the input spec.
1648 Args:
1649 mapper_spec: The MapperSpec for this InputReader. Must contain
1650 'blob_key' parameter with one blob key.
1651 _reader: a callable that returns a file-like object for reading blobs.
1652 Used for dependency injection.
1654 Returns:
1655 A list of InputReaders spanning files within the zip.
1657 params = _get_params(mapper_spec)
1658 blob_key = params[cls.BLOB_KEY_PARAM]
1659 zip_input = zipfile.ZipFile(_reader(blob_key))
1660 zfiles = zip_input.infolist()
1661 total_size = sum(x.file_size for x in zfiles)
1662 num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
1663 size_per_shard = total_size // num_shards
1667 shard_start_indexes = [0]
1668 current_shard_size = 0
1669 for i, fileinfo in enumerate(zfiles):
1670 current_shard_size += fileinfo.file_size
1671 if current_shard_size >= size_per_shard:
1672 shard_start_indexes.append(i + 1)
1673 current_shard_size = 0
1675 if shard_start_indexes[-1] != len(zfiles):
1676 shard_start_indexes.append(len(zfiles))
1678 return [cls(blob_key, start_index, end_index, _reader)
1679 for start_index, end_index
1680 in zip(shard_start_indexes, shard_start_indexes[1:])]
1683 class BlobstoreZipLineInputReader(InputReader):
1684 """Input reader for newline delimited files in zip archives from Blobstore.
1686 This has the same external interface as the BlobstoreLineInputReader, in that
1687 it takes a list of blobs as its input and yields lines to the reader.
1688 However the blobs themselves are expected to be zip archives of line delimited
1689 files instead of the files themselves.
1691 This is useful as many line delimited files gain greatly from compression.
1695 _MAX_SHARD_COUNT = 256
1698 _MAX_BLOB_KEYS_COUNT = 246
1701 BLOB_KEYS_PARAM = "blob_keys"
1704 BLOB_KEY_PARAM = "blob_key"
1705 START_FILE_INDEX_PARAM = "start_file_index"
1706 END_FILE_INDEX_PARAM = "end_file_index"
1707 OFFSET_PARAM = "offset"
1709 def __init__(self, blob_key, start_file_index, end_file_index, offset,
1710 _reader=blobstore.BlobReader):
1711 """Initializes this instance with the given blob key and file range.
1713 This BlobstoreZipLineInputReader will read from the file with index
1714 start_file_index up to but not including the file with index end_file_index.
1715 It will return lines starting at offset within file[start_file_index]
1717 Args:
1718 blob_key: the BlobKey that this input reader is processing.
1719 start_file_index: the index of the first file to read within the zip.
1720 end_file_index: the index of the first file that will not be read.
1721 offset: the byte offset within blob_key.zip[start_file_index] to start
1722 reading. The reader will continue to the end of the file.
1723 _reader: a callable that returns a file-like object for reading blobs.
1724 Used for dependency injection.
1726 self._blob_key = blob_key
1727 self._start_file_index = start_file_index
1728 self._end_file_index = end_file_index
1729 self._initial_offset = offset
1730 self._reader = _reader
1731 self._zip = None
1732 self._entries = None
1733 self._filestream = None
1735 @classmethod
1736 def validate(cls, mapper_spec):
1737 """Validates mapper spec and all mapper parameters.
1739 Args:
1740 mapper_spec: The MapperSpec for this InputReader.
1742 Raises:
1743 BadReaderParamsError: required parameters are missing or invalid.
1745 if mapper_spec.input_reader_class() != cls:
1746 raise BadReaderParamsError("Mapper input reader class mismatch")
1747 params = _get_params(mapper_spec)
1748 if cls.BLOB_KEYS_PARAM not in params:
1749 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1751 blob_keys = params[cls.BLOB_KEYS_PARAM]
1752 if isinstance(blob_keys, basestring):
1755 blob_keys = blob_keys.split(",")
1756 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1757 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1758 if not blob_keys:
1759 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1760 for blob_key in blob_keys:
1761 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1762 if not blob_info:
1763 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1764 blob_key)
1766 @classmethod
1767 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1768 """Returns a list of input readers for the input spec.
1770 Args:
1771 mapper_spec: The MapperSpec for this InputReader. Must contain
1772 'blob_keys' parameter with one or more blob keys.
1773 _reader: a callable that returns a file-like object for reading blobs.
1774 Used for dependency injection.
1776 Returns:
1777 A list of InputReaders spanning the subfiles within the blobs.
1778 There will be at least one reader per blob, but it will otherwise
1779 attempt to keep the expanded size even.
1781 params = _get_params(mapper_spec)
1782 blob_keys = params[cls.BLOB_KEYS_PARAM]
1783 if isinstance(blob_keys, basestring):
1786 blob_keys = blob_keys.split(",")
1788 blob_files = {}
1789 total_size = 0
1790 for blob_key in blob_keys:
1791 zip_input = zipfile.ZipFile(_reader(blob_key))
1792 blob_files[blob_key] = zip_input.infolist()
1793 total_size += sum(x.file_size for x in blob_files[blob_key])
1795 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1801 size_per_shard = total_size // shard_count
1803 readers = []
1804 for blob_key in blob_keys:
1805 bfiles = blob_files[blob_key]
1806 current_shard_size = 0
1807 start_file_index = 0
1808 next_file_index = 0
1809 for fileinfo in bfiles:
1810 next_file_index += 1
1811 current_shard_size += fileinfo.file_size
1812 if current_shard_size >= size_per_shard:
1813 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1814 _reader))
1815 current_shard_size = 0
1816 start_file_index = next_file_index
1817 if current_shard_size != 0:
1818 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1819 _reader))
1821 return readers
1823 def next(self):
1824 """Returns the next line from this input reader as (lineinfo, line) tuple.
1826 Returns:
1827 The next input from this input reader, in the form of a 2-tuple.
1828 The first element of the tuple describes the source, it is itself
1829 a tuple (blobkey, filenumber, byteoffset).
1830 The second element of the tuple is the line found at that offset.
1832 if not self._filestream:
1833 if not self._zip:
1834 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1836 self._entries = self._zip.infolist()[self._start_file_index:
1837 self._end_file_index]
1838 self._entries.reverse()
1839 if not self._entries:
1840 raise StopIteration()
1841 entry = self._entries.pop()
1842 value = self._zip.read(entry.filename)
1843 self._filestream = StringIO.StringIO(value)
1844 if self._initial_offset:
1845 self._filestream.seek(self._initial_offset)
1846 self._filestream.readline()
1848 start_position = self._filestream.tell()
1849 line = self._filestream.readline()
1851 if not line:
1853 self._filestream.close()
1854 self._filestream = None
1855 self._start_file_index += 1
1856 self._initial_offset = 0
1857 return self.next()
1859 return ((self._blob_key, self._start_file_index, start_position),
1860 line.rstrip("\n"))
1862 def _next_offset(self):
1863 """Return the offset of the next line to read."""
1864 if self._filestream:
1865 offset = self._filestream.tell()
1866 if offset:
1867 offset -= 1
1868 else:
1869 offset = self._initial_offset
1871 return offset
1873 def to_json(self):
1874 """Returns an input shard state for the remaining inputs.
1876 Returns:
1877 A json-izable version of the remaining InputReader.
1880 return {self.BLOB_KEY_PARAM: self._blob_key,
1881 self.START_FILE_INDEX_PARAM: self._start_file_index,
1882 self.END_FILE_INDEX_PARAM: self._end_file_index,
1883 self.OFFSET_PARAM: self._next_offset()}
1885 @classmethod
1886 def from_json(cls, json, _reader=blobstore.BlobReader):
1887 """Creates an instance of the InputReader for the given input shard state.
1889 Args:
1890 json: The InputReader state as a dict-like object.
1891 _reader: For dependency injection.
1893 Returns:
1894 An instance of the InputReader configured using the values of json.
1896 return cls(json[cls.BLOB_KEY_PARAM],
1897 json[cls.START_FILE_INDEX_PARAM],
1898 json[cls.END_FILE_INDEX_PARAM],
1899 json[cls.OFFSET_PARAM],
1900 _reader)
1902 def __str__(self):
1903 """Returns the string representation of this reader.
1905 Returns:
1906 string blobkey:[start file num, end file num]:current offset.
1908 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1909 self._blob_key, self._start_file_index, self._end_file_index,
1910 self._next_offset())
1913 class RandomStringInputReader(InputReader):
1914 """RandomStringInputReader generates random strings as output.
1916 Primary usage is to populate output with testing entries.
1920 COUNT = "count"
1922 STRING_LENGTH = "string_length"
1924 DEFAULT_STRING_LENGTH = 10
1926 def __init__(self, count, string_length):
1927 """Initialize input reader.
1929 Args:
1930 count: number of entries this shard should generate.
1931 string_length: the length of generated random strings.
1933 self._count = count
1934 self._string_length = string_length
1936 def __iter__(self):
1937 ctx = context.get()
1939 while self._count:
1940 self._count -= 1
1941 start_time = time.time()
1942 content = "".join(random.choice(string.ascii_lowercase)
1943 for _ in range(self._string_length))
1944 if ctx:
1945 operation.counters.Increment(
1946 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1947 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1948 yield content
1950 @classmethod
1951 def split_input(cls, mapper_spec):
1952 params = _get_params(mapper_spec)
1953 count = params[cls.COUNT]
1954 string_length = cls.DEFAULT_STRING_LENGTH
1955 if cls.STRING_LENGTH in params:
1956 string_length = params[cls.STRING_LENGTH]
1958 shard_count = mapper_spec.shard_count
1959 count_per_shard = count // shard_count
1961 mr_input_readers = [
1962 cls(count_per_shard, string_length) for _ in range(shard_count)]
1964 left = count - count_per_shard*shard_count
1965 if left > 0:
1966 mr_input_readers.append(cls(left, string_length))
1968 return mr_input_readers
1970 @classmethod
1971 def validate(cls, mapper_spec):
1972 if mapper_spec.input_reader_class() != cls:
1973 raise BadReaderParamsError("Mapper input reader class mismatch")
1975 params = _get_params(mapper_spec)
1976 if cls.COUNT not in params:
1977 raise BadReaderParamsError("Must specify %s" % cls.COUNT)
1978 if not isinstance(params[cls.COUNT], int):
1979 raise BadReaderParamsError("%s should be an int but is %s" %
1980 (cls.COUNT, type(params[cls.COUNT])))
1981 if params[cls.COUNT] <= 0:
1982 raise BadReaderParamsError("%s should be a positive int")
1983 if cls.STRING_LENGTH in params and not (
1984 isinstance(params[cls.STRING_LENGTH], int) and
1985 params[cls.STRING_LENGTH] > 0):
1986 raise BadReaderParamsError("%s should be a positive int but is %s" %
1987 (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
1988 if (not isinstance(mapper_spec.shard_count, int) or
1989 mapper_spec.shard_count <= 0):
1990 raise BadReaderParamsError(
1991 "shard_count should be a positive int but is %s" %
1992 mapper_spec.shard_count)
1994 @classmethod
1995 def from_json(cls, json):
1996 return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
1998 def to_json(self):
1999 return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
2008 class NamespaceInputReader(InputReader):
2009 """An input reader to iterate over namespaces.
2011 This reader yields namespace names as string.
2012 It will always produce only one shard.
2015 NAMESPACE_RANGE_PARAM = "namespace_range"
2016 BATCH_SIZE_PARAM = "batch_size"
2017 _BATCH_SIZE = 10
2019 def __init__(self, ns_range, batch_size=_BATCH_SIZE):
2020 self.ns_range = ns_range
2021 self._batch_size = batch_size
2023 def to_json(self):
2024 """Serializes all the data in this query range into json form.
2026 Returns:
2027 all the data in json-compatible map.
2029 return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
2030 self.BATCH_SIZE_PARAM: self._batch_size}
2032 @classmethod
2033 def from_json(cls, json):
2034 """Create new DatastoreInputReader from the json, encoded by to_json.
2036 Args:
2037 json: json map representation of DatastoreInputReader.
2039 Returns:
2040 an instance of DatastoreInputReader with all data deserialized from json.
2042 return cls(
2043 namespace_range.NamespaceRange.from_json_object(
2044 json[cls.NAMESPACE_RANGE_PARAM]),
2045 json[cls.BATCH_SIZE_PARAM])
2047 @classmethod
2048 def validate(cls, mapper_spec):
2049 """Validates mapper spec.
2051 Args:
2052 mapper_spec: The MapperSpec for this InputReader.
2054 Raises:
2055 BadReaderParamsError: required parameters are missing or invalid.
2057 if mapper_spec.input_reader_class() != cls:
2058 raise BadReaderParamsError("Input reader class mismatch")
2059 params = _get_params(mapper_spec)
2060 if cls.BATCH_SIZE_PARAM in params:
2061 try:
2062 batch_size = int(params[cls.BATCH_SIZE_PARAM])
2063 if batch_size < 1:
2064 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
2065 except ValueError, e:
2066 raise BadReaderParamsError("Bad batch size: %s" % e)
2068 @classmethod
2069 def split_input(cls, mapper_spec):
2070 """Returns a list of input readers for the input spec.
2072 Args:
2073 mapper_spec: The MapperSpec for this InputReader.
2075 Returns:
2076 A list of InputReaders.
2078 batch_size = int(_get_params(mapper_spec).get(
2079 cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
2080 shard_count = mapper_spec.shard_count
2081 namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
2082 contiguous=True)
2083 return [NamespaceInputReader(ns_range, batch_size)
2084 for ns_range in namespace_ranges]
2086 def __iter__(self):
2087 while True:
2088 keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
2089 if not keys:
2090 break
2092 for key in keys:
2093 namespace = metadata.Namespace.key_to_namespace(key)
2094 self.ns_range = self.ns_range.with_start_after(namespace)
2095 yield namespace
2097 def __str__(self):
2098 return repr(self.ns_range)
2101 class RecordsReader(InputReader):
2102 """Reader to read a list of Files API file in records format.
2104 The number of input shards can be specified by the SHARDS_PARAM
2105 mapper parameter. Input files cannot be split, so there will be at most
2106 one shard per file. Also the number of shards will not be reduced based on
2107 the number of input files, so shards in always equals shards out.
2110 FILE_PARAM = "file"
2111 FILES_PARAM = "files"
2113 def __init__(self, filenames, position):
2114 """Constructor.
2116 Args:
2117 filenames: list of filenames.
2118 position: file position to start reading from as int.
2120 self._filenames = filenames
2121 if self._filenames:
2122 self._reader = records.RecordsReader(
2123 files.BufferedFile(self._filenames[0]))
2124 self._reader.seek(position)
2125 else:
2126 self._reader = None
2128 def __iter__(self):
2129 """Iterate over records in file.
2131 Yields:
2132 Records as strings.
2134 ctx = context.get()
2136 while self._reader:
2137 try:
2138 start_time = time.time()
2139 record = self._reader.read()
2140 if ctx:
2141 operation.counters.Increment(
2142 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2143 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(record))(ctx)
2144 yield record
2145 except (files.ExistenceError), e:
2146 raise errors.FailJobError("ExistenceError: %s" % e)
2147 except (files.UnknownError), e:
2148 raise errors.RetrySliceError("UnknownError: %s" % e)
2149 except EOFError:
2150 self._filenames.pop(0)
2151 if not self._filenames:
2152 self._reader = None
2153 else:
2154 self._reader = records.RecordsReader(
2155 files.BufferedFile(self._filenames[0]))
2157 @classmethod
2158 def from_json(cls, json):
2159 """Creates an instance of the InputReader for the given input shard state.
2161 Args:
2162 json: The InputReader state as a dict-like object.
2164 Returns:
2165 An instance of the InputReader configured using the values of json.
2167 return cls(json["filenames"], json["position"])
2169 def to_json(self):
2170 """Returns an input shard state for the remaining inputs.
2172 Returns:
2173 A json-izable version of the remaining InputReader.
2175 result = {
2176 "filenames": self._filenames,
2177 "position": 0,
2179 if self._reader:
2180 result["position"] = self._reader.tell()
2181 return result
2183 @classmethod
2184 def split_input(cls, mapper_spec):
2185 """Returns a list of input readers for the input spec.
2187 Args:
2188 mapper_spec: The MapperSpec for this InputReader.
2190 Returns:
2191 A list of InputReaders.
2193 params = _get_params(mapper_spec)
2194 shard_count = mapper_spec.shard_count
2196 if cls.FILES_PARAM in params:
2197 filenames = params[cls.FILES_PARAM]
2198 if isinstance(filenames, basestring):
2199 filenames = filenames.split(",")
2200 else:
2201 filenames = [params[cls.FILE_PARAM]]
2203 batch_list = [[] for _ in xrange(shard_count)]
2204 for index, _ in enumerate(filenames):
2206 batch_list[index % shard_count].append(filenames[index])
2209 batch_list.sort(reverse=True, key=len)
2210 return [cls(batch, 0) for batch in batch_list]
2212 @classmethod
2213 def validate(cls, mapper_spec):
2214 """Validates mapper spec and all mapper parameters.
2216 Args:
2217 mapper_spec: The MapperSpec for this InputReader.
2219 Raises:
2220 BadReaderParamsError: required parameters are missing or invalid.
2222 if mapper_spec.input_reader_class() != cls:
2223 raise errors.BadReaderParamsError("Input reader class mismatch")
2224 params = _get_params(mapper_spec)
2225 if (cls.FILES_PARAM not in params and
2226 cls.FILE_PARAM not in params):
2227 raise BadReaderParamsError(
2228 "Must specify '%s' or '%s' parameter for mapper input" %
2229 (cls.FILES_PARAM, cls.FILE_PARAM))
2231 def __str__(self):
2232 position = 0
2233 if self._reader:
2234 position = self._reader.tell()
2235 return "%s:%s" % (self._filenames, position)
2238 class LogInputReader(InputReader):
2239 """Input reader for a time range of logs via the Logs Reader API.
2241 The number of input shards may be specified by the SHARDS_PARAM mapper
2242 parameter. A starting and ending time (in seconds since the Unix epoch) are
2243 required to generate time ranges over which to shard the input.
2246 START_TIME_PARAM = "start_time"
2247 END_TIME_PARAM = "end_time"
2248 MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
2249 INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
2250 INCLUDE_APP_LOGS_PARAM = "include_app_logs"
2251 VERSION_IDS_PARAM = "version_ids"
2252 MODULE_VERSIONS_PARAM = "module_versions"
2255 _OFFSET_PARAM = "offset"
2256 _PROTOTYPE_REQUEST_PARAM = "prototype_request"
2258 _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
2259 MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
2260 INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
2261 MODULE_VERSIONS_PARAM, _PROTOTYPE_REQUEST_PARAM])
2262 _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
2264 def __init__(self,
2265 start_time=None,
2266 end_time=None,
2267 minimum_log_level=None,
2268 include_incomplete=False,
2269 include_app_logs=False,
2270 version_ids=None,
2271 module_versions=None,
2272 **kwargs):
2273 """Constructor.
2275 Args:
2276 start_time: The earliest request completion or last-update time of logs
2277 that should be mapped over, in seconds since the Unix epoch.
2278 end_time: The latest request completion or last-update time that logs
2279 should be mapped over, in seconds since the Unix epoch.
2280 minimum_log_level: An application log level which serves as a filter on
2281 the requests mapped over--requests with no application log at or above
2282 the specified level will be omitted, even if include_app_logs is False.
2283 include_incomplete: Whether or not to include requests that have started
2284 but not yet finished, as a boolean. Defaults to False.
2285 include_app_logs: Whether or not to include application level logs in the
2286 mapped logs, as a boolean. Defaults to False.
2287 version_ids: A list of version ids whose logs should be read. This can not
2288 be used with module_versions
2289 module_versions: A list of tuples containing a module and version id
2290 whose logs should be read. This can not be used with version_ids
2291 **kwargs: A dictionary of keywords associated with this input reader.
2293 InputReader.__init__(self)
2297 self.__params = dict(kwargs)
2299 if start_time is not None:
2300 self.__params[self.START_TIME_PARAM] = start_time
2301 if end_time is not None:
2302 self.__params[self.END_TIME_PARAM] = end_time
2303 if minimum_log_level is not None:
2304 self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
2305 if include_incomplete is not None:
2306 self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
2307 if include_app_logs is not None:
2308 self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
2309 if version_ids:
2310 self.__params[self.VERSION_IDS_PARAM] = version_ids
2311 if module_versions:
2312 self.__params[self.MODULE_VERSIONS_PARAM] = module_versions
2315 if self._PROTOTYPE_REQUEST_PARAM in self.__params:
2316 prototype_request = log_service_pb.LogReadRequest(
2317 self.__params[self._PROTOTYPE_REQUEST_PARAM])
2318 self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
2320 def __iter__(self):
2321 """Iterates over logs in a given range of time.
2323 Yields:
2324 A RequestLog containing all the information for a single request.
2326 for log in logservice.fetch(**self.__params):
2327 self.__params[self._OFFSET_PARAM] = log.offset
2328 yield log
2330 @classmethod
2331 def from_json(cls, json):
2332 """Creates an instance of the InputReader for the given input shard's state.
2334 Args:
2335 json: The InputReader state as a dict-like object.
2337 Returns:
2338 An instance of the InputReader configured using the given JSON parameters.
2341 params = dict((str(k), v) for k, v in json.iteritems()
2342 if k in cls._PARAMS)
2347 if cls._OFFSET_PARAM in params:
2348 params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
2349 return cls(**params)
2351 def to_json(self):
2352 """Returns an input shard state for the remaining inputs.
2354 Returns:
2355 A JSON serializable version of the remaining input to read.
2358 params = dict(self.__params)
2359 if self._PROTOTYPE_REQUEST_PARAM in params:
2360 prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
2361 params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
2362 if self._OFFSET_PARAM in params:
2363 params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
2364 return params
2366 @classmethod
2367 def split_input(cls, mapper_spec):
2368 """Returns a list of input readers for the given input specification.
2370 Args:
2371 mapper_spec: The MapperSpec for this InputReader.
2373 Returns:
2374 A list of InputReaders.
2376 params = _get_params(mapper_spec)
2377 shard_count = mapper_spec.shard_count
2380 start_time = params[cls.START_TIME_PARAM]
2381 end_time = params[cls.END_TIME_PARAM]
2382 seconds_per_shard = (end_time - start_time) / shard_count
2385 shards = []
2386 for _ in xrange(shard_count - 1):
2387 params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
2388 seconds_per_shard)
2389 shards.append(LogInputReader(**params))
2390 params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
2393 params[cls.END_TIME_PARAM] = end_time
2394 return shards + [LogInputReader(**params)]
2396 @classmethod
2397 def validate(cls, mapper_spec):
2398 """Validates the mapper's specification and all necessary parameters.
2400 Args:
2401 mapper_spec: The MapperSpec to be used with this InputReader.
2403 Raises:
2404 BadReaderParamsError: If the user fails to specify both a starting time
2405 and an ending time, or if the starting time is later than the ending
2406 time.
2408 if mapper_spec.input_reader_class() != cls:
2409 raise errors.BadReaderParamsError("Input reader class mismatch")
2411 params = _get_params(mapper_spec, allowed_keys=cls._PARAMS)
2412 if (cls.VERSION_IDS_PARAM not in params and
2413 cls.MODULE_VERSIONS_PARAM not in params):
2414 raise errors.BadReaderParamsError("Must specify a list of version ids or "
2415 "module/version ids for mapper input")
2416 if (cls.VERSION_IDS_PARAM in params and
2417 cls.MODULE_VERSIONS_PARAM in params):
2418 raise errors.BadReaderParamsError("Can not supply both version ids or "
2419 "module/version ids. Use only one.")
2420 if (cls.START_TIME_PARAM not in params or
2421 params[cls.START_TIME_PARAM] is None):
2422 raise errors.BadReaderParamsError("Must specify a starting time for "
2423 "mapper input")
2424 if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
2425 params[cls.END_TIME_PARAM] = time.time()
2427 if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
2428 raise errors.BadReaderParamsError("The starting time cannot be later "
2429 "than or the same as the ending time.")
2431 if cls._PROTOTYPE_REQUEST_PARAM in params:
2432 try:
2433 params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
2434 params[cls._PROTOTYPE_REQUEST_PARAM])
2435 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
2436 raise errors.BadReaderParamsError("The prototype request must be "
2437 "parseable as a LogReadRequest.")
2442 try:
2443 logservice.fetch(**params)
2444 except logservice.InvalidArgumentError, e:
2445 raise errors.BadReaderParamsError("One or more parameters are not valid "
2446 "inputs to logservice.fetch(): %s" % e)
2448 def __str__(self):
2449 """Returns the string representation of this LogInputReader."""
2450 params = []
2451 for key in sorted(self.__params.keys()):
2452 value = self.__params[key]
2453 if key is self._PROTOTYPE_REQUEST_PARAM:
2454 params.append("%s='%s'" % (key, value))
2455 elif key is self._OFFSET_PARAM:
2456 params.append("%s='%s'" % (key, value))
2457 else:
2458 params.append("%s=%s" % (key, value))
2460 return "LogInputReader(%s)" % ", ".join(params)
2463 class _GoogleCloudStorageInputReader(InputReader):
2464 """Input reader from Google Cloud Storage using the cloudstorage library.
2466 This class is expected to be subclassed with a reader that understands
2467 user-level records.
2469 Required configuration in the mapper_spec.input_reader dictionary.
2470 BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
2471 suffixed such as directories.
2472 OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
2473 in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
2474 treated as prefix and all objects with matching names will be read.
2475 Entries should not start with a slash unless that is part of the object's
2476 name. An example list could be:
2477 ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
2478 To retrieve all files "*" will match every object in the bucket. If a file
2479 is listed twice or is covered by multiple prefixes it will be read twice,
2480 there is no deduplication.
2482 Optional configuration in the mapper_sec.input_reader dictionary.
2483 BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
2484 DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
2485 The delimiter is used as a path separator to designate directory
2486 hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
2487 will stop at the first directory instead of matching
2488 all files under the directory. This allows MR to process bucket with
2489 hundreds of thousands of files.
2493 BUCKET_NAME_PARAM = "bucket_name"
2494 OBJECT_NAMES_PARAM = "objects"
2495 BUFFER_SIZE_PARAM = "buffer_size"
2496 DELIMITER_PARAM = "delimiter"
2499 _ACCOUNT_ID_PARAM = "account_id"
2502 _JSON_PICKLE = "pickle"
2503 _STRING_MAX_FILES_LISTED = 10
2511 def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
2512 delimiter=None):
2513 """Initialize a GoogleCloudStorageInputReader instance.
2515 Args:
2516 filenames: A list of Google Cloud Storage filenames of the form
2517 '/bucket/objectname'.
2518 index: Index of the next filename to read.
2519 buffer_size: The size of the read buffer, None to use default.
2520 _account_id: Internal use only. See cloudstorage documentation.
2521 delimiter: Delimiter used as path separator. See class doc for details.
2523 self._filenames = filenames
2524 self._index = index
2525 self._buffer_size = buffer_size
2526 self._account_id = _account_id
2527 self._delimiter = delimiter
2528 self._bucket = None
2529 self._bucket_iter = None
2531 def _next_file(self):
2532 """Find next filename.
2534 self._filenames may need to be expanded via listbucket.
2536 Returns:
2537 None if no more file is left. Filename otherwise.
2539 while True:
2540 if self._bucket_iter:
2541 try:
2542 return self._bucket_iter.next().filename
2543 except StopIteration:
2544 self._bucket_iter = None
2545 self._bucket = None
2546 if self._index >= len(self._filenames):
2547 return
2548 filename = self._filenames[self._index]
2549 self._index += 1
2550 if self._delimiter is None or not filename.endswith(self._delimiter):
2551 return filename
2552 self._bucket = cloudstorage.listbucket(filename,
2553 delimiter=self._delimiter)
2554 self._bucket_iter = iter(self._bucket)
2556 @classmethod
2557 def validate(cls, mapper_spec):
2558 """Validate mapper specification.
2560 Args:
2561 mapper_spec: an instance of model.MapperSpec
2563 Raises:
2564 BadReaderParamsError: if the specification is invalid for any reason such
2565 as missing the bucket name or providing an invalid bucket name.
2567 reader_spec = _get_params(mapper_spec, allow_old=False)
2570 if cls.BUCKET_NAME_PARAM not in reader_spec:
2571 raise errors.BadReaderParamsError(
2572 "%s is required for Google Cloud Storage" %
2573 cls.BUCKET_NAME_PARAM)
2574 try:
2575 cloudstorage.validate_bucket_name(
2576 reader_spec[cls.BUCKET_NAME_PARAM])
2577 except ValueError, error:
2578 raise errors.BadReaderParamsError("Bad bucket name, %s" % (error))
2581 if cls.OBJECT_NAMES_PARAM not in reader_spec:
2582 raise errors.BadReaderParamsError(
2583 "%s is required for Google Cloud Storage" %
2584 cls.OBJECT_NAMES_PARAM)
2585 filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2586 if not isinstance(filenames, list):
2587 raise errors.BadReaderParamsError(
2588 "Object name list is not a list but a %s" %
2589 filenames.__class__.__name__)
2590 for filename in filenames:
2591 if not isinstance(filename, basestring):
2592 raise errors.BadReaderParamsError(
2593 "Object name is not a string but a %s" %
2594 filename.__class__.__name__)
2595 if cls.DELIMITER_PARAM in reader_spec:
2596 delimiter = reader_spec[cls.DELIMITER_PARAM]
2597 if not isinstance(delimiter, basestring):
2598 raise errors.BadReaderParamsError(
2599 "%s is not a string but a %s" %
2600 (cls.DELIMITER_PARAM, type(delimiter)))
2602 @classmethod
2603 def split_input(cls, mapper_spec):
2604 """Returns a list of input readers.
2606 An equal number of input files are assigned to each shard (+/- 1). If there
2607 are fewer files than shards, fewer than the requested number of shards will
2608 be used. Input files are currently never split (although for some formats
2609 could be and may be split in a future implementation).
2611 Args:
2612 mapper_spec: an instance of model.MapperSpec.
2614 Returns:
2615 A list of InputReaders. None when no input data can be found.
2617 reader_spec = _get_params(mapper_spec, allow_old=False)
2618 bucket = reader_spec[cls.BUCKET_NAME_PARAM]
2619 filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2620 delimiter = reader_spec.get(cls.DELIMITER_PARAM)
2621 account_id = reader_spec.get(cls._ACCOUNT_ID_PARAM)
2622 buffer_size = reader_spec.get(cls.BUFFER_SIZE_PARAM)
2625 all_filenames = []
2626 for filename in filenames:
2627 if filename.endswith("*"):
2628 all_filenames.extend(
2629 [file_stat.filename for file_stat in cloudstorage.listbucket(
2630 "/" + bucket + "/" + filename[:-1], delimiter=delimiter,
2631 _account_id=account_id)])
2632 else:
2633 all_filenames.append("/%s/%s" % (bucket, filename))
2636 readers = []
2637 for shard in range(0, mapper_spec.shard_count):
2638 shard_filenames = all_filenames[shard::mapper_spec.shard_count]
2639 if shard_filenames:
2640 readers.append(cls(
2641 shard_filenames, buffer_size=buffer_size, _account_id=account_id,
2642 delimiter=delimiter))
2643 return readers
2645 @classmethod
2646 def from_json(cls, state):
2647 obj = pickle.loads(state[cls._JSON_PICKLE])
2648 if obj._bucket:
2649 obj._bucket_iter = iter(obj._bucket)
2650 return obj
2652 def to_json(self):
2653 before_iter = self._bucket_iter
2654 self._bucket_iter = None
2655 try:
2656 return {self._JSON_PICKLE: pickle.dumps(self)}
2657 finally:
2658 self._bucket_itr = before_iter
2660 def next(self):
2661 """Returns the next input from this input reader, a block of bytes.
2663 Non existent files will be logged and skipped. The file might have been
2664 removed after input splitting.
2666 Returns:
2667 The next input from this input reader in the form of a cloudstorage
2668 ReadBuffer that supports a File-like interface (read, readline, seek,
2669 tell, and close). An error may be raised if the file can not be opened.
2671 Raises:
2672 StopIteration: The list of files has been exhausted.
2674 options = {}
2675 if self._buffer_size:
2676 options["read_buffer_size"] = self._buffer_size
2677 if self._account_id:
2678 options["_account_id"] = self._account_id
2679 while True:
2680 filename = self._next_file()
2681 if filename is None:
2682 raise StopIteration()
2683 try:
2684 start_time = time.time()
2685 handle = cloudstorage.open(filename, **options)
2687 ctx = context.get()
2688 if ctx:
2689 operation.counters.Increment(
2690 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2692 return handle
2693 except cloudstorage.NotFoundError:
2694 logging.warning("File %s may have been removed. Skipping file.",
2695 filename)
2697 def __str__(self):
2699 num_files = len(self._filenames)
2700 if num_files > self._STRING_MAX_FILES_LISTED:
2701 names = "%s...%s + %d not shown" % (
2702 ",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]),
2703 self._filenames[-1],
2704 num_files - self._STRING_MAX_FILES_LISTED)
2705 else:
2706 names = ",".join(self._filenames)
2708 if self._index > num_files:
2709 status = "EOF"
2710 else:
2711 status = "Next %s (%d of %d)" % (
2712 self._filenames[self._index],
2713 self._index + 1,
2714 num_files)
2715 return "CloudStorage [%s, %s]" % (status, names)
2718 class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader):
2719 """Read data from a Google Cloud Storage file using LevelDB format.
2721 See the _GoogleCloudStorageOutputWriter for additional configuration options.
2724 def __getstate__(self):
2725 result = self.__dict__.copy()
2727 if "_record_reader" in result:
2730 result.pop("_record_reader")
2731 return result
2733 def next(self):
2734 """Returns the next input from this input reader, a record.
2736 Returns:
2737 The next input from this input reader in the form of a record read from
2738 an LevelDB file.
2740 Raises:
2741 StopIteration: The ordered set records has been exhausted.
2743 while True:
2744 if not hasattr(self, "_cur_handle") or self._cur_handle is None:
2746 self._cur_handle = super(_GoogleCloudStorageRecordInputReader,
2747 self).next()
2748 if not hasattr(self, "_record_reader") or self._record_reader is None:
2749 self._record_reader = records.RecordsReader(self._cur_handle)
2751 try:
2752 start_time = time.time()
2753 content = self._record_reader.read()
2755 ctx = context.get()
2756 if ctx:
2757 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
2758 operation.counters.Increment(
2759 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2760 return content
2762 except EOFError:
2763 self._cur_handle = None
2764 self._record_reader = None
2768 class _ReducerReader(RecordsReader):
2769 """Reader to read KeyValues records files from Files API."""
2771 expand_parameters = True
2773 def __init__(self, filenames, position):
2774 super(_ReducerReader, self).__init__(filenames, position)
2775 self.current_key = None
2776 self.current_values = None
2778 def __iter__(self):
2779 ctx = context.get()
2780 combiner = None
2782 if ctx:
2783 combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec")
2784 if combiner_spec:
2785 combiner = util.handler_for_name(combiner_spec)
2787 for binary_record in super(_ReducerReader, self).__iter__():
2788 proto = file_service_pb.KeyValues()
2789 proto.ParseFromString(binary_record)
2791 to_yield = None
2792 if self.current_key is not None and self.current_key != proto.key():
2793 to_yield = (self.current_key, self.current_values)
2794 self.current_key = None
2795 self.current_values = None
2797 if self.current_key is None:
2798 self.current_key = proto.key()
2799 self.current_values = []
2801 if combiner:
2802 combiner_result = combiner(
2803 self.current_key, proto.value_list(), self.current_values)
2805 if not util.is_generator(combiner_result):
2806 raise errors.BadCombinerOutputError(
2807 "Combiner %s should yield values instead of returning them (%s)" %
2808 (combiner, combiner_result))
2810 self.current_values = []
2811 for value in combiner_result:
2812 if isinstance(value, operation.Operation):
2813 value(ctx)
2814 else:
2816 self.current_values.append(value)
2821 if not to_yield:
2822 yield ALLOW_CHECKPOINT
2823 else:
2825 self.current_values.extend(proto.value_list())
2827 if to_yield:
2828 yield to_yield
2830 yield ALLOW_CHECKPOINT
2834 if self.current_key is not None:
2835 to_yield = (self.current_key, self.current_values)
2836 self.current_key = None
2837 self.current_values = None
2838 yield to_yield
2840 @staticmethod
2841 def encode_data(data):
2842 """Encodes the given data, which may have include raw bytes.
2844 Works around limitations in JSON encoding, which cannot handle raw bytes.
2846 Args:
2847 data: the data to encode.
2849 Returns:
2850 The data encoded.
2852 return base64.b64encode(pickle.dumps(data))
2854 @staticmethod
2855 def decode_data(data):
2856 """Decodes data encoded with the encode_data function."""
2857 return pickle.loads(base64.b64decode(data))
2859 def to_json(self):
2860 """Returns an input shard state for the remaining inputs.
2862 Returns:
2863 A json-izable version of the remaining InputReader.
2865 result = super(_ReducerReader, self).to_json()
2866 result["current_key"] = self.encode_data(self.current_key)
2867 result["current_values"] = self.encode_data(self.current_values)
2868 return result
2870 @classmethod
2871 def from_json(cls, json):
2872 """Creates an instance of the InputReader for the given input shard state.
2874 Args:
2875 json: The InputReader state as a dict-like object.
2877 Returns:
2878 An instance of the InputReader configured using the values of json.
2880 result = super(_ReducerReader, cls).from_json(json)
2881 result.current_key = _ReducerReader.decode_data(json["current_key"])
2882 result.current_values = _ReducerReader.decode_data(json["current_values"])
2883 return result