1.9.30 sync.
[gae.git] / python / google / appengine / ext / mapreduce / input_readers.py
blob1acec331fcfe7e30dcfc614d659d99336869e069
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Defines input readers for MapReduce."""
35 __all__ = [
36 "AbstractDatastoreInputReader",
37 "ALLOW_CHECKPOINT",
38 "BadReaderParamsError",
39 "BlobstoreLineInputReader",
40 "BlobstoreZipInputReader",
41 "BlobstoreZipLineInputReader",
42 "COUNTER_IO_READ_BYTES",
43 "COUNTER_IO_READ_MSEC",
44 "DatastoreEntityInputReader",
45 "DatastoreInputReader",
46 "DatastoreKeyInputReader",
47 "FileInputReader",
48 "RandomStringInputReader",
49 "RawDatastoreInputReader",
50 "Error",
51 "InputReader",
52 "LogInputReader",
53 "NamespaceInputReader",
54 "RecordsReader",
60 import base64
61 import copy
62 import logging
63 import pickle
64 import random
65 import string
66 import StringIO
67 import time
68 import zipfile
70 from google.net.proto import ProtocolBuffer
71 from google.appengine.ext import ndb
73 from google.appengine.api import datastore
74 from google.appengine.api import files
75 from google.appengine.api import logservice
76 from google.appengine.api.files import file_service_pb
77 from google.appengine.api.logservice import log_service_pb
78 from google.appengine.ext import blobstore
79 from google.appengine.ext import db
80 from google.appengine.ext import key_range
81 from google.appengine.ext.db import metadata
82 from google.appengine.ext.mapreduce import context
83 from google.appengine.ext.mapreduce import datastore_range_iterators as db_iters
84 from google.appengine.ext.mapreduce import errors
85 from google.appengine.ext.mapreduce import file_format_parser
86 from google.appengine.ext.mapreduce import file_format_root
87 from google.appengine.ext.mapreduce import json_util
88 from google.appengine.ext.mapreduce import key_ranges
89 from google.appengine.ext.mapreduce import model
90 from google.appengine.ext.mapreduce import namespace_range
91 from google.appengine.ext.mapreduce import operation
92 from google.appengine.ext.mapreduce import property_range
93 from google.appengine.ext.mapreduce import records
94 from google.appengine.ext.mapreduce import util
98 try:
100 from google.appengine.ext import cloudstorage
101 if hasattr(cloudstorage, "_STUB"):
102 cloudstorage = None
103 except ImportError:
104 pass
108 Error = errors.Error
109 BadReaderParamsError = errors.BadReaderParamsError
113 COUNTER_IO_READ_BYTES = "io-read-bytes"
116 COUNTER_IO_READ_MSEC = "io-read-msec"
121 ALLOW_CHECKPOINT = object()
124 class InputReader(json_util.JsonMixin):
125 """Abstract base class for input readers.
127 InputReaders have the following properties:
128 * They are created by using the split_input method to generate a set of
129 InputReaders from a MapperSpec.
130 * They generate inputs to the mapper via the iterator interface.
131 * After creation, they can be serialized and resumed using the JsonMixin
132 interface.
133 * They are cast to string for a user-readable description; it may be
134 valuable to implement __str__.
140 expand_parameters = False
143 _APP_PARAM = "_app"
144 NAMESPACE_PARAM = "namespace"
145 NAMESPACES_PARAM = "namespaces"
147 def __iter__(self):
148 return self
150 def next(self):
151 """Returns the next input from this input reader as a key, value pair.
153 Returns:
154 The next input from this input reader.
156 raise NotImplementedError("next() not implemented in %s" % self.__class__)
158 @classmethod
159 def from_json(cls, input_shard_state):
160 """Creates an instance of the InputReader for the given input shard state.
162 Args:
163 input_shard_state: The InputReader state as a dict-like object.
165 Returns:
166 An instance of the InputReader configured using the values of json.
168 raise NotImplementedError("from_json() not implemented in %s" % cls)
170 def to_json(self):
171 """Returns an input shard state for the remaining inputs.
173 Returns:
174 A json-izable version of the remaining InputReader.
176 raise NotImplementedError("to_json() not implemented in %s" %
177 self.__class__)
179 @classmethod
180 def split_input(cls, mapper_spec):
181 """Returns a list of input readers.
183 This method creates a list of input readers, each for one shard.
184 It attempts to split inputs among readers evenly.
186 Args:
187 mapper_spec: model.MapperSpec specifies the inputs and additional
188 parameters to define the behavior of input readers.
190 Returns:
191 A list of InputReaders. None or [] when no input data can be found.
193 raise NotImplementedError("split_input() not implemented in %s" % cls)
195 @classmethod
196 def validate(cls, mapper_spec):
197 """Validates mapper spec and all mapper parameters.
199 Input reader parameters are expected to be passed as "input_reader"
200 subdictionary in mapper_spec.params.
202 Pre 1.6.4 API mixes input reader parameters with all other parameters. Thus
203 to be compatible, input reader check mapper_spec.params as well and
204 issue a warning if "input_reader" subdicationary is not present.
206 Args:
207 mapper_spec: The MapperSpec for this InputReader.
209 Raises:
210 BadReaderParamsError: required parameters are missing or invalid.
212 if mapper_spec.input_reader_class() != cls:
213 raise BadReaderParamsError("Input reader class mismatch")
216 def _get_params(mapper_spec, allowed_keys=None, allow_old=True):
217 """Obtain input reader parameters.
219 Utility function for input readers implementation. Fetches parameters
220 from mapreduce specification giving appropriate usage warnings.
222 Args:
223 mapper_spec: The MapperSpec for the job
224 allowed_keys: set of all allowed keys in parameters as strings. If it is not
225 None, then parameters are expected to be in a separate "input_reader"
226 subdictionary of mapper_spec parameters.
227 allow_old: Allow parameters to exist outside of the input_reader
228 subdictionary for compatability.
230 Returns:
231 mapper parameters as dict
233 Raises:
234 BadReaderParamsError: if parameters are invalid/missing or not allowed.
236 if "input_reader" not in mapper_spec.params:
237 message = ("Input reader's parameters should be specified in "
238 "input_reader subdictionary.")
239 if not allow_old or allowed_keys:
240 raise errors.BadReaderParamsError(message)
241 params = mapper_spec.params
242 params = dict((str(n), v) for n, v in params.iteritems())
243 else:
244 if not isinstance(mapper_spec.params.get("input_reader"), dict):
245 raise errors.BadReaderParamsError(
246 "Input reader parameters should be a dictionary")
247 params = mapper_spec.params.get("input_reader")
248 params = dict((str(n), v) for n, v in params.iteritems())
249 if allowed_keys:
250 params_diff = set(params.keys()) - allowed_keys
251 if params_diff:
252 raise errors.BadReaderParamsError(
253 "Invalid input_reader parameters: %s" % ",".join(params_diff))
254 return params
257 class FileInputReader(InputReader):
258 """Reader to read Files API files of user specified format.
260 This class currently only supports Google Storage files. It will be extended
261 to support blobstore files in the future.
263 Reader Parameters:
264 files: a list of filenames or filename patterns.
265 filename must be of format '/gs/bucket/filename'.
266 filename pattern has format '/gs/bucket/prefix*'.
267 filename pattern will be expanded to filenames with the given prefix.
268 Please see parseGlob in the file api.files.gs.py which is included in the
269 App Engine SDK for supported patterns.
271 Example:
272 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
273 includes "file1", all files under bucket2, and files under bucket3 with
274 a prefix "p" in its name.
276 format: format string determines what your map function gets as its input.
277 format string can be "lines", "bytes", "zip", or a cascade of them plus
278 optional parameters. See file_formats.FORMATS for all supported formats.
279 See file_format_parser._FileFormatParser for format string syntax.
281 Example:
282 "lines": your map function gets files' contents line by line.
283 "bytes": your map function gets files' contents entirely.
284 "zip": InputReader unzips files and feeds your map function each of
285 the archive's member files as a whole.
286 "zip[bytes]: same as above.
287 "zip[lines]": InputReader unzips files and feeds your map function
288 files' contents line by line.
289 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
290 file with utf32 encoding and feeds your map function line by line.
291 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
292 base64 encoding, unzips each file, reads each of them with utf32
293 encoding and feeds your map function line by line.
295 Note that "encoding" only teaches InputReader how to interpret files.
296 The input your map function gets is always a Python str.
300 FILES_PARAM = "files"
301 FORMAT_PARAM = "format"
303 def __init__(self, format_root):
304 """Initialize input reader.
306 Args:
307 format_root: a FileFormatRoot instance.
309 self._file_format_root = format_root
311 def __iter__(self):
312 """Inherit docs."""
313 return self
315 def next(self):
316 """Inherit docs."""
317 ctx = context.get()
318 start_time = time.time()
320 content = self._file_format_root.next().read()
322 if ctx:
323 operation.counters.Increment(
324 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
325 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
327 return content
329 @classmethod
330 def split_input(cls, mapper_spec):
331 """Inherit docs."""
332 params = _get_params(mapper_spec)
335 filenames = []
336 for f in params[cls.FILES_PARAM]:
337 parsedName = files.gs.parseGlob(f)
338 if isinstance(parsedName, tuple):
339 filenames.extend(files.gs.listdir(parsedName[0],
340 {"prefix": parsedName[1]}))
341 else:
342 filenames.append(parsedName)
344 file_format_roots = file_format_root.split(filenames,
345 params[cls.FORMAT_PARAM],
346 mapper_spec.shard_count)
348 if file_format_roots is None:
349 return []
350 return [cls(root) for root in file_format_roots]
352 @classmethod
353 def validate(cls, mapper_spec):
354 """Inherit docs."""
355 if mapper_spec.input_reader_class() != cls:
356 raise BadReaderParamsError("Mapper input reader class mismatch")
359 params = _get_params(mapper_spec)
360 if cls.FILES_PARAM not in params:
361 raise BadReaderParamsError("Must specify %s" % cls.FILES_PARAM)
362 if cls.FORMAT_PARAM not in params:
363 raise BadReaderParamsError("Must specify %s" % cls.FORMAT_PARAM)
365 format_string = params[cls.FORMAT_PARAM]
366 if not isinstance(format_string, basestring):
367 raise BadReaderParamsError("format should be string but is %s" %
368 cls.FORMAT_PARAM)
369 try:
370 file_format_parser.parse(format_string)
371 except ValueError, e:
372 raise BadReaderParamsError(e)
374 paths = params[cls.FILES_PARAM]
375 if not (paths and isinstance(paths, list)):
376 raise BadReaderParamsError("files should be a list of filenames.")
379 try:
380 for path in paths:
381 files.gs.parseGlob(path)
382 except files.InvalidFileNameError:
383 raise BadReaderParamsError("Invalid filename %s." % path)
385 @classmethod
386 def from_json(cls, json):
387 """Inherit docs."""
388 return cls(
389 file_format_root.FileFormatRoot.from_json(json["file_format_root"]))
391 def to_json(self):
392 """Inherit docs."""
393 return {"file_format_root": self._file_format_root.to_json()}
396 class AbstractDatastoreInputReader(InputReader):
397 """Abstract class for datastore input readers."""
400 _BATCH_SIZE = 50
403 _MAX_SHARD_COUNT = 256
408 MAX_NAMESPACES_FOR_KEY_SHARD = 10
411 ENTITY_KIND_PARAM = "entity_kind"
412 KEYS_ONLY_PARAM = "keys_only"
413 BATCH_SIZE_PARAM = "batch_size"
414 KEY_RANGE_PARAM = "key_range"
415 FILTERS_PARAM = "filters"
417 _KEY_RANGE_ITER_CLS = db_iters.AbstractKeyRangeIterator
419 def __init__(self, iterator):
420 """Create new DatastoreInputReader object.
422 This is internal constructor. Use split_input to create readers instead.
424 Args:
425 iterator: an iterator that generates objects for this input reader.
427 self._iter = iterator
429 def __iter__(self):
430 """Yields whatever internal iterator yields."""
431 for o in self._iter:
432 yield o
434 def __str__(self):
435 """Returns the string representation of this InputReader."""
436 return repr(self._iter)
438 def to_json(self):
439 """Serializes input reader to json compatible format.
441 Returns:
442 all the data in json-compatible map.
444 return self._iter.to_json()
446 @classmethod
447 def from_json(cls, json):
448 """Create new DatastoreInputReader from json, encoded by to_json.
450 Args:
451 json: json representation of DatastoreInputReader.
453 Returns:
454 an instance of DatastoreInputReader with all data deserialized from json.
456 return cls(db_iters.RangeIteratorFactory.from_json(json))
458 @classmethod
459 def _get_query_spec(cls, mapper_spec):
460 """Construct a model.QuerySpec from model.MapperSpec."""
461 params = _get_params(mapper_spec)
462 entity_kind = params[cls.ENTITY_KIND_PARAM]
463 filters = params.get(cls.FILTERS_PARAM)
464 app = params.get(cls._APP_PARAM)
465 ns = params.get(cls.NAMESPACE_PARAM)
467 return model.QuerySpec(
468 entity_kind=cls._get_raw_entity_kind(entity_kind),
469 keys_only=bool(params.get(cls.KEYS_ONLY_PARAM, False)),
470 filters=filters,
471 batch_size=int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE)),
472 model_class_path=entity_kind,
473 app=app,
474 ns=ns)
476 @classmethod
477 def split_input(cls, mapper_spec):
478 """Inherit doc."""
479 shard_count = mapper_spec.shard_count
480 query_spec = cls._get_query_spec(mapper_spec)
482 namespaces = None
483 if query_spec.ns is not None:
484 k_ranges = cls._to_key_ranges_by_shard(
485 query_spec.app, [query_spec.ns], shard_count, query_spec)
486 else:
487 ns_keys = namespace_range.get_namespace_keys(
488 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
491 if not ns_keys:
492 return
495 elif len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
496 namespaces = [ns_key.name() or "" for ns_key in ns_keys]
497 k_ranges = cls._to_key_ranges_by_shard(
498 query_spec.app, namespaces, shard_count, query_spec)
500 else:
501 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
502 contiguous=False,
503 can_query=lambda: True,
504 _app=query_spec.app)
505 k_ranges = [key_ranges.KeyRangesFactory.create_from_ns_range(ns_range)
506 for ns_range in ns_ranges]
508 iters = [db_iters.RangeIteratorFactory.create_key_ranges_iterator(
509 r, query_spec, cls._KEY_RANGE_ITER_CLS) for r in k_ranges]
511 return [cls(i) for i in iters]
513 @classmethod
514 def _to_key_ranges_by_shard(cls, app, namespaces, shard_count, query_spec):
515 """Get a list of key_ranges.KeyRanges objects, one for each shard.
517 This method uses scatter index to split each namespace into pieces
518 and assign those pieces to shards.
520 Args:
521 app: app_id in str.
522 namespaces: a list of namespaces in str.
523 shard_count: number of shards to split.
524 query_spec: model.QuerySpec.
526 Returns:
527 a list of key_ranges.KeyRanges objects.
529 key_ranges_by_ns = []
532 for namespace in namespaces:
533 ranges = cls._split_ns_by_scatter(
534 shard_count,
535 namespace,
536 query_spec.entity_kind,
537 app)
540 random.shuffle(ranges)
541 key_ranges_by_ns.append(ranges)
546 ranges_by_shard = [[] for _ in range(shard_count)]
547 for ranges in key_ranges_by_ns:
548 for i, k_range in enumerate(ranges):
549 if k_range:
550 ranges_by_shard[i].append(k_range)
552 key_ranges_by_shard = []
553 for ranges in ranges_by_shard:
554 if ranges:
555 key_ranges_by_shard.append(key_ranges.KeyRangesFactory.create_from_list(
556 ranges))
557 return key_ranges_by_shard
559 @classmethod
560 def _split_ns_by_scatter(cls,
561 shard_count,
562 namespace,
563 raw_entity_kind,
564 app):
565 """Split a namespace by scatter index into key_range.KeyRange.
567 TODO: Power this with key_range.KeyRange.compute_split_points.
569 Args:
570 shard_count: number of shards.
571 namespace: namespace name to split. str.
572 raw_entity_kind: low level datastore API entity kind.
573 app: app id in str.
575 Returns:
576 A list of key_range.KeyRange objects. If there are not enough entities to
577 splits into requested shards, the returned list will contain KeyRanges
578 ordered lexicographically with any Nones appearing at the end.
580 if shard_count == 1:
582 return [key_range.KeyRange(namespace=namespace, _app=app)]
584 ds_query = datastore.Query(kind=raw_entity_kind,
585 namespace=namespace,
586 _app=app,
587 keys_only=True)
588 ds_query.Order("__scatter__")
589 oversampling_factor = 32
590 random_keys = ds_query.Get(shard_count * oversampling_factor)
592 if not random_keys:
595 return ([key_range.KeyRange(namespace=namespace, _app=app)] +
596 [None] * (shard_count - 1))
598 random_keys.sort()
600 if len(random_keys) >= shard_count:
602 random_keys = cls._choose_split_points(random_keys, shard_count)
604 k_ranges = []
606 k_ranges.append(key_range.KeyRange(
607 key_start=None,
608 key_end=random_keys[0],
609 direction=key_range.KeyRange.ASC,
610 include_start=False,
611 include_end=False,
612 namespace=namespace,
613 _app=app))
615 for i in range(0, len(random_keys) - 1):
616 k_ranges.append(key_range.KeyRange(
617 key_start=random_keys[i],
618 key_end=random_keys[i+1],
619 direction=key_range.KeyRange.ASC,
620 include_start=True,
621 include_end=False,
622 namespace=namespace,
623 _app=app))
625 k_ranges.append(key_range.KeyRange(
626 key_start=random_keys[-1],
627 key_end=None,
628 direction=key_range.KeyRange.ASC,
629 include_start=True,
630 include_end=False,
631 namespace=namespace,
632 _app=app))
634 if len(k_ranges) < shard_count:
636 k_ranges += [None] * (shard_count - len(k_ranges))
637 return k_ranges
639 @classmethod
640 def _choose_split_points(cls, sorted_keys, shard_count):
641 """Returns the best split points given a random set of datastore.Keys."""
642 assert len(sorted_keys) >= shard_count
643 index_stride = len(sorted_keys) / float(shard_count)
644 return [sorted_keys[int(round(index_stride * i))]
645 for i in range(1, shard_count)]
647 @classmethod
648 def validate(cls, mapper_spec):
649 """Inherit docs."""
650 params = _get_params(mapper_spec)
651 if cls.ENTITY_KIND_PARAM not in params:
652 raise BadReaderParamsError("Missing input reader parameter 'entity_kind'")
653 if cls.BATCH_SIZE_PARAM in params:
654 try:
655 batch_size = int(params[cls.BATCH_SIZE_PARAM])
656 if batch_size < 1:
657 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
658 except ValueError, e:
659 raise BadReaderParamsError("Bad batch size: %s" % e)
660 try:
661 bool(params.get(cls.KEYS_ONLY_PARAM, False))
662 except:
663 raise BadReaderParamsError("keys_only expects a boolean value but got %s",
664 params[cls.KEYS_ONLY_PARAM])
665 if cls.NAMESPACE_PARAM in params:
666 if not isinstance(params[cls.NAMESPACE_PARAM],
667 (str, unicode, type(None))):
668 raise BadReaderParamsError(
669 "Expected a single namespace string")
670 if cls.NAMESPACES_PARAM in params:
671 raise BadReaderParamsError("Multiple namespaces are no longer supported")
672 if cls.FILTERS_PARAM in params:
673 filters = params[cls.FILTERS_PARAM]
674 if not isinstance(filters, list):
675 raise BadReaderParamsError("Expected list for filters parameter")
676 for f in filters:
677 if not isinstance(f, (tuple, list)):
678 raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
679 if len(f) != 3:
680 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
681 prop, op, _ = f
682 if not isinstance(prop, basestring):
683 raise BadReaderParamsError("Property should be string: %s", prop)
684 if not isinstance(op, basestring):
685 raise BadReaderParamsError("Operator should be string: %s", op)
687 @classmethod
688 def _get_raw_entity_kind(cls, entity_kind_or_model_classpath):
689 """Returns the entity kind to use with low level datastore calls.
691 Args:
692 entity_kind_or_model_classpath: user specified entity kind or model
693 classpath.
695 Returns:
696 the entity kind in str to use with low level datastore calls.
698 return entity_kind_or_model_classpath
701 class RawDatastoreInputReader(AbstractDatastoreInputReader):
702 """Iterates over an entity kind and yields datastore.Entity."""
704 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityIterator
706 @classmethod
707 def validate(cls, mapper_spec):
708 """Inherit docs."""
709 super(RawDatastoreInputReader, cls).validate(mapper_spec)
710 params = _get_params(mapper_spec)
711 entity_kind = params[cls.ENTITY_KIND_PARAM]
712 if "." in entity_kind:
713 logging.warning(
714 ". detected in entity kind %s specified for reader %s."
715 "Assuming entity kind contains the dot.",
716 entity_kind, cls.__name__)
717 if cls.FILTERS_PARAM in params:
718 filters = params[cls.FILTERS_PARAM]
719 for f in filters:
720 if f[1] != "=":
721 raise BadReaderParamsError(
722 "Only equality filters are supported: %s", f)
725 class DatastoreInputReader(AbstractDatastoreInputReader):
726 """Iterates over a Model and yields model instances.
728 Supports both db.model and ndb.model.
731 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeModelIterator
733 @classmethod
734 def _get_raw_entity_kind(cls, model_classpath):
735 entity_type = util.for_name(model_classpath)
736 if isinstance(entity_type, db.Model):
737 return entity_type.kind()
738 elif isinstance(entity_type, (ndb.Model, ndb.MetaModel)):
740 return entity_type._get_kind()
741 else:
742 return util.get_short_name(model_classpath)
744 @classmethod
745 def validate(cls, mapper_spec):
746 """Inherit docs."""
747 super(DatastoreInputReader, cls).validate(mapper_spec)
748 params = _get_params(mapper_spec)
749 entity_kind = params[cls.ENTITY_KIND_PARAM]
751 try:
752 model_class = util.for_name(entity_kind)
753 except ImportError, e:
754 raise BadReaderParamsError("Bad entity kind: %s" % e)
755 if cls.FILTERS_PARAM in params:
756 filters = params[cls.FILTERS_PARAM]
757 if issubclass(model_class, db.Model):
758 cls._validate_filters(filters, model_class)
759 else:
760 cls._validate_filters_ndb(filters, model_class)
761 property_range.PropertyRange(filters, entity_kind)
763 @classmethod
764 def _validate_filters(cls, filters, model_class):
765 """Validate user supplied filters.
767 Validate filters are on existing properties and filter values
768 have valid semantics.
770 Args:
771 filters: user supplied filters. Each filter should be a list or tuple of
772 format (<property_name_as_str>, <query_operator_as_str>,
773 <value_of_certain_type>). Value type is up to the property's type.
774 model_class: the db.Model class for the entity type to apply filters on.
776 Raises:
777 BadReaderParamsError: if any filter is invalid in any way.
779 if not filters:
780 return
782 properties = model_class.properties()
784 for f in filters:
785 prop, _, val = f
786 if prop not in properties:
787 raise errors.BadReaderParamsError(
788 "Property %s is not defined for entity type %s",
789 prop, model_class.kind())
793 try:
794 properties[prop].validate(val)
795 except db.BadValueError, e:
796 raise errors.BadReaderParamsError(e)
798 @classmethod
800 def _validate_filters_ndb(cls, filters, model_class):
801 """Validate ndb.Model filters."""
802 if not filters:
803 return
805 properties = model_class._properties
807 for f in filters:
808 prop, _, val = f
809 if prop not in properties:
810 raise errors.BadReaderParamsError(
811 "Property %s is not defined for entity type %s",
812 prop, model_class._get_kind())
816 try:
817 properties[prop]._do_validate(val)
818 except db.BadValueError, e:
819 raise errors.BadReaderParamsError(e)
821 @classmethod
822 def split_input(cls, mapper_spec):
823 """Inherit docs."""
824 shard_count = mapper_spec.shard_count
825 query_spec = cls._get_query_spec(mapper_spec)
827 if not property_range.should_shard_by_property_range(query_spec.filters):
828 return super(DatastoreInputReader, cls).split_input(mapper_spec)
830 p_range = property_range.PropertyRange(query_spec.filters,
831 query_spec.model_class_path)
832 p_ranges = p_range.split(shard_count)
835 if query_spec.ns:
836 ns_range = namespace_range.NamespaceRange(
837 namespace_start=query_spec.ns,
838 namespace_end=query_spec.ns,
839 _app=query_spec.app)
840 ns_ranges = [copy.copy(ns_range) for _ in p_ranges]
841 else:
842 ns_keys = namespace_range.get_namespace_keys(
843 query_spec.app, cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
844 if not ns_keys:
845 return
848 if len(ns_keys) <= cls.MAX_NAMESPACES_FOR_KEY_SHARD:
849 ns_ranges = [namespace_range.NamespaceRange(_app=query_spec.app)
850 for _ in p_ranges]
852 else:
853 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
854 contiguous=False,
855 can_query=lambda: True,
856 _app=query_spec.app)
857 p_ranges = [copy.copy(p_range) for _ in ns_ranges]
859 assert len(p_ranges) == len(ns_ranges)
861 iters = [
862 db_iters.RangeIteratorFactory.create_property_range_iterator(
863 p, ns, query_spec) for p, ns in zip(p_ranges, ns_ranges)]
864 return [cls(i) for i in iters]
867 class DatastoreKeyInputReader(RawDatastoreInputReader):
868 """Iterate over an entity kind and yields datastore.Key."""
870 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeKeyIterator
874 DatastoreEntityInputReader = RawDatastoreInputReader
879 class _OldAbstractDatastoreInputReader(InputReader):
880 """Abstract base class for classes that iterate over datastore entities.
882 Concrete subclasses must implement _iter_key_range(self, k_range). See the
883 docstring for that method for details.
887 _BATCH_SIZE = 50
890 _MAX_SHARD_COUNT = 256
893 _OVERSAMPLING_FACTOR = 32
898 MAX_NAMESPACES_FOR_KEY_SHARD = 10
901 ENTITY_KIND_PARAM = "entity_kind"
902 KEYS_ONLY_PARAM = "keys_only"
903 BATCH_SIZE_PARAM = "batch_size"
904 KEY_RANGE_PARAM = "key_range"
905 NAMESPACE_RANGE_PARAM = "namespace_range"
906 CURRENT_KEY_RANGE_PARAM = "current_key_range"
907 FILTERS_PARAM = "filters"
913 def __init__(self,
914 entity_kind,
915 key_ranges=None,
916 ns_range=None,
917 batch_size=_BATCH_SIZE,
918 current_key_range=None,
919 filters=None):
920 """Create new AbstractDatastoreInputReader object.
922 This is internal constructor. Use split_query in a concrete class instead.
924 Args:
925 entity_kind: entity kind as string.
926 key_ranges: a sequence of key_range.KeyRange instances to process. Only
927 one of key_ranges or ns_range can be non-None.
928 ns_range: a namespace_range.NamespaceRange to process. Only one of
929 key_ranges or ns_range can be non-None.
930 batch_size: size of read batch as int.
931 current_key_range: the current key_range.KeyRange being processed.
932 filters: optional list of filters to apply to the query. Each filter is
933 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
934 User filters are applied first.
936 assert key_ranges is not None or ns_range is not None, (
937 "must specify one of 'key_ranges' or 'ns_range'")
938 assert key_ranges is None or ns_range is None, (
939 "can't specify both 'key_ranges ' and 'ns_range'")
941 self._entity_kind = entity_kind
944 self._key_ranges = key_ranges and list(reversed(key_ranges))
946 self._ns_range = ns_range
947 self._batch_size = int(batch_size)
948 self._current_key_range = current_key_range
949 self._filters = filters
951 @classmethod
952 def _get_raw_entity_kind(cls, entity_kind):
953 if "." in entity_kind:
954 logging.warning(
955 ". detected in entity kind %s specified for reader %s."
956 "Assuming entity kind contains the dot.",
957 entity_kind, cls.__name__)
958 return entity_kind
960 def __iter__(self):
961 """Iterates over the given KeyRanges or NamespaceRange.
963 This method iterates over the given KeyRanges or NamespaceRange and sets
964 the self._current_key_range to the KeyRange currently being processed. It
965 then delegates to the _iter_key_range method to yield that actual
966 results.
968 Yields:
969 Forwards the objects yielded by the subclasses concrete _iter_key_range()
970 method. The caller must consume the result yielded because self.to_json()
971 will not include it.
973 if self._key_ranges is not None:
974 for o in self._iter_key_ranges():
975 yield o
976 elif self._ns_range is not None:
977 for o in self._iter_ns_range():
978 yield o
979 else:
980 assert False, "self._key_ranges and self._ns_range are both None"
982 def _iter_key_ranges(self):
983 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
984 while True:
985 if self._current_key_range is None:
986 if self._key_ranges:
987 self._current_key_range = self._key_ranges.pop()
990 continue
991 else:
992 break
994 for key, o in self._iter_key_range(
995 copy.deepcopy(self._current_key_range)):
998 self._current_key_range.advance(key)
999 yield o
1000 self._current_key_range = None
1002 def _iter_ns_range(self):
1003 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
1004 while True:
1005 if self._current_key_range is None:
1006 query = self._ns_range.make_datastore_query()
1007 namespace_result = query.Get(1)
1008 if not namespace_result:
1009 break
1011 namespace = namespace_result[0].name() or ""
1012 self._current_key_range = key_range.KeyRange(
1013 namespace=namespace, _app=self._ns_range.app)
1014 yield ALLOW_CHECKPOINT
1016 for key, o in self._iter_key_range(
1017 copy.deepcopy(self._current_key_range)):
1020 self._current_key_range.advance(key)
1021 yield o
1023 if (self._ns_range.is_single_namespace or
1024 self._current_key_range.namespace == self._ns_range.namespace_end):
1025 break
1026 self._ns_range = self._ns_range.with_start_after(
1027 self._current_key_range.namespace)
1028 self._current_key_range = None
1030 def _iter_key_range(self, k_range):
1031 """Yields a db.Key and the value that should be yielded by self.__iter__().
1033 Args:
1034 k_range: The key_range.KeyRange to iterate over.
1036 Yields:
1037 A 2-tuple containing the last db.Key processed and the value that should
1038 be yielded by __iter__. The returned db.Key will be used to determine the
1039 InputReader's current position in self._current_key_range.
1041 raise NotImplementedError("_iter_key_range() not implemented in %s" %
1042 self.__class__)
1044 def __str__(self):
1045 """Returns the string representation of this InputReader."""
1046 if self._ns_range is None:
1047 return repr(self._key_ranges)
1048 else:
1049 return repr(self._ns_range)
1051 @classmethod
1052 def _choose_split_points(cls, sorted_keys, shard_count):
1053 """Returns the best split points given a random set of db.Keys."""
1054 assert len(sorted_keys) >= shard_count
1055 index_stride = len(sorted_keys) / float(shard_count)
1056 return [sorted_keys[int(round(index_stride * i))]
1057 for i in range(1, shard_count)]
1061 @classmethod
1062 def _split_input_from_namespace(cls, app, namespace, entity_kind,
1063 shard_count):
1064 """Helper for _split_input_from_params.
1066 If there are not enough Entities to make all of the given shards, the
1067 returned list of KeyRanges will include Nones. The returned list will
1068 contain KeyRanges ordered lexographically with any Nones appearing at the
1069 end.
1071 Args:
1072 app: the app.
1073 namespace: the namespace.
1074 entity_kind: entity kind as string.
1075 shard_count: the number of shards.
1077 Returns:
1078 KeyRange objects.
1081 raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
1082 if shard_count == 1:
1084 return [key_range.KeyRange(namespace=namespace, _app=app)]
1086 ds_query = datastore.Query(kind=raw_entity_kind,
1087 namespace=namespace,
1088 _app=app,
1089 keys_only=True)
1090 ds_query.Order("__scatter__")
1091 random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
1093 if not random_keys:
1096 return ([key_range.KeyRange(namespace=namespace, _app=app)] +
1097 [None] * (shard_count - 1))
1099 random_keys.sort()
1101 if len(random_keys) >= shard_count:
1103 random_keys = cls._choose_split_points(random_keys, shard_count)
1106 key_ranges = []
1108 key_ranges.append(key_range.KeyRange(
1109 key_start=None,
1110 key_end=random_keys[0],
1111 direction=key_range.KeyRange.ASC,
1112 include_start=False,
1113 include_end=False,
1114 namespace=namespace,
1115 _app=app))
1117 for i in range(0, len(random_keys) - 1):
1118 key_ranges.append(key_range.KeyRange(
1119 key_start=random_keys[i],
1120 key_end=random_keys[i+1],
1121 direction=key_range.KeyRange.ASC,
1122 include_start=True,
1123 include_end=False,
1124 namespace=namespace,
1125 _app=app))
1127 key_ranges.append(key_range.KeyRange(
1128 key_start=random_keys[-1],
1129 key_end=None,
1130 direction=key_range.KeyRange.ASC,
1131 include_start=True,
1132 include_end=False,
1133 namespace=namespace,
1134 _app=app))
1136 if len(key_ranges) < shard_count:
1138 key_ranges += [None] * (shard_count - len(key_ranges))
1140 return key_ranges
1142 @classmethod
1143 def _split_input_from_params(cls, app, namespaces, entity_kind_name,
1144 params, shard_count):
1145 """Return input reader objects. Helper for split_input."""
1147 key_ranges = []
1148 for namespace in namespaces:
1149 key_ranges.extend(
1150 cls._split_input_from_namespace(app,
1151 namespace,
1152 entity_kind_name,
1153 shard_count))
1158 shared_ranges = [[] for _ in range(shard_count)]
1159 for i, k_range in enumerate(key_ranges):
1160 shared_ranges[i % shard_count].append(k_range)
1161 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1163 return [cls(entity_kind_name,
1164 key_ranges=key_ranges,
1165 ns_range=None,
1166 batch_size=batch_size)
1167 for key_ranges in shared_ranges if key_ranges]
1169 @classmethod
1170 def validate(cls, mapper_spec):
1171 """Validates mapper spec and all mapper parameters.
1173 Args:
1174 mapper_spec: The MapperSpec for this InputReader.
1176 Raises:
1177 BadReaderParamsError: required parameters are missing or invalid.
1179 if mapper_spec.input_reader_class() != cls:
1180 raise BadReaderParamsError("Input reader class mismatch")
1181 params = _get_params(mapper_spec)
1182 if cls.ENTITY_KIND_PARAM not in params:
1183 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
1184 if cls.BATCH_SIZE_PARAM in params:
1185 try:
1186 batch_size = int(params[cls.BATCH_SIZE_PARAM])
1187 if batch_size < 1:
1188 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
1189 except ValueError, e:
1190 raise BadReaderParamsError("Bad batch size: %s" % e)
1191 if cls.NAMESPACE_PARAM in params:
1192 if not isinstance(params[cls.NAMESPACE_PARAM],
1193 (str, unicode, type(None))):
1194 raise BadReaderParamsError(
1195 "Expected a single namespace string")
1196 if cls.NAMESPACES_PARAM in params:
1197 raise BadReaderParamsError("Multiple namespaces are no longer supported")
1198 if cls.FILTERS_PARAM in params:
1199 filters = params[cls.FILTERS_PARAM]
1200 if not isinstance(filters, list):
1201 raise BadReaderParamsError("Expected list for filters parameter")
1202 for f in filters:
1203 if not isinstance(f, (tuple, list)):
1204 raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
1205 if len(f) != 3:
1206 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
1207 if not isinstance(f[0], basestring):
1208 raise BadReaderParamsError("First element should be string: %s", f)
1209 if f[1] != "=":
1210 raise BadReaderParamsError(
1211 "Only equality filters are supported: %s", f)
1213 @classmethod
1214 def split_input(cls, mapper_spec):
1215 """Splits query into shards without fetching query results.
1217 Tries as best as it can to split the whole query result set into equal
1218 shards. Due to difficulty of making the perfect split, resulting shards'
1219 sizes might differ significantly from each other.
1221 Args:
1222 mapper_spec: MapperSpec with params containing 'entity_kind'.
1223 May have 'namespace' in the params as a string containing a single
1224 namespace. If specified then the input reader will only yield values
1225 in the given namespace. If 'namespace' is not given then values from
1226 all namespaces will be yielded. May also have 'batch_size' in the params
1227 to specify the number of entities to process in each batch.
1229 Returns:
1230 A list of InputReader objects. If the query results are empty then the
1231 empty list will be returned. Otherwise, the list will always have a length
1232 equal to number_of_shards but may be padded with Nones if there are too
1233 few results for effective sharding.
1235 params = _get_params(mapper_spec)
1236 entity_kind_name = params[cls.ENTITY_KIND_PARAM]
1237 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1238 shard_count = mapper_spec.shard_count
1239 namespace = params.get(cls.NAMESPACE_PARAM)
1240 app = params.get(cls._APP_PARAM)
1241 filters = params.get(cls.FILTERS_PARAM)
1243 if namespace is None:
1255 namespace_query = datastore.Query("__namespace__",
1256 keys_only=True,
1257 _app=app)
1258 namespace_keys = namespace_query.Get(
1259 limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
1261 if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
1262 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
1263 contiguous=True,
1264 _app=app)
1265 return [cls(entity_kind_name,
1266 key_ranges=None,
1267 ns_range=ns_range,
1268 batch_size=batch_size,
1269 filters=filters)
1270 for ns_range in ns_ranges]
1271 elif not namespace_keys:
1272 return [cls(entity_kind_name,
1273 key_ranges=None,
1274 ns_range=namespace_range.NamespaceRange(_app=app),
1275 batch_size=shard_count,
1276 filters=filters)]
1277 else:
1278 namespaces = [namespace_key.name() or ""
1279 for namespace_key in namespace_keys]
1280 else:
1281 namespaces = [namespace]
1283 readers = cls._split_input_from_params(
1284 app, namespaces, entity_kind_name, params, shard_count)
1285 if filters:
1286 for reader in readers:
1287 reader._filters = filters
1288 return readers
1290 def to_json(self):
1291 """Serializes all the data in this query range into json form.
1293 Returns:
1294 all the data in json-compatible map.
1296 if self._key_ranges is None:
1297 key_ranges_json = None
1298 else:
1299 key_ranges_json = []
1300 for k in self._key_ranges:
1301 if k:
1302 key_ranges_json.append(k.to_json())
1303 else:
1304 key_ranges_json.append(None)
1306 if self._ns_range is None:
1307 namespace_range_json = None
1308 else:
1309 namespace_range_json = self._ns_range.to_json_object()
1311 if self._current_key_range is None:
1312 current_key_range_json = None
1313 else:
1314 current_key_range_json = self._current_key_range.to_json()
1316 json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
1317 self.NAMESPACE_RANGE_PARAM: namespace_range_json,
1318 self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
1319 self.ENTITY_KIND_PARAM: self._entity_kind,
1320 self.BATCH_SIZE_PARAM: self._batch_size,
1321 self.FILTERS_PARAM: self._filters}
1322 return json_dict
1324 @classmethod
1325 def from_json(cls, json):
1326 """Create new DatastoreInputReader from the json, encoded by to_json.
1328 Args:
1329 json: json map representation of DatastoreInputReader.
1331 Returns:
1332 an instance of DatastoreInputReader with all data deserialized from json.
1334 if json[cls.KEY_RANGE_PARAM] is None:
1336 key_ranges = None
1337 else:
1338 key_ranges = []
1339 for k in json[cls.KEY_RANGE_PARAM]:
1340 if k:
1341 key_ranges.append(key_range.KeyRange.from_json(k))
1342 else:
1343 key_ranges.append(None)
1345 if json[cls.NAMESPACE_RANGE_PARAM] is None:
1346 ns_range = None
1347 else:
1348 ns_range = namespace_range.NamespaceRange.from_json_object(
1349 json[cls.NAMESPACE_RANGE_PARAM])
1351 if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
1352 current_key_range = None
1353 else:
1354 current_key_range = key_range.KeyRange.from_json(
1355 json[cls.CURRENT_KEY_RANGE_PARAM])
1357 return cls(
1358 json[cls.ENTITY_KIND_PARAM],
1359 key_ranges,
1360 ns_range,
1361 json[cls.BATCH_SIZE_PARAM],
1362 current_key_range,
1363 filters=json.get(cls.FILTERS_PARAM))
1366 class BlobstoreLineInputReader(InputReader):
1367 """Input reader for a newline delimited blob in Blobstore."""
1370 _BLOB_BUFFER_SIZE = 64000
1373 _MAX_SHARD_COUNT = 256
1376 _MAX_BLOB_KEYS_COUNT = 246
1379 BLOB_KEYS_PARAM = "blob_keys"
1382 INITIAL_POSITION_PARAM = "initial_position"
1383 END_POSITION_PARAM = "end_position"
1384 BLOB_KEY_PARAM = "blob_key"
1386 def __init__(self, blob_key, start_position, end_position):
1387 """Initializes this instance with the given blob key and character range.
1389 This BlobstoreInputReader will read from the first record starting after
1390 strictly after start_position until the first record ending at or after
1391 end_position (exclusive). As an exception, if start_position is 0, then
1392 this InputReader starts reading at the first record.
1394 Args:
1395 blob_key: the BlobKey that this input reader is processing.
1396 start_position: the position to start reading at.
1397 end_position: a position in the last record to read.
1399 self._blob_key = blob_key
1400 self._blob_reader = blobstore.BlobReader(blob_key,
1401 self._BLOB_BUFFER_SIZE,
1402 start_position)
1403 self._end_position = end_position
1404 self._has_iterated = False
1405 self._read_before_start = bool(start_position)
1407 def next(self):
1408 """Returns the next input from as an (offset, line) tuple."""
1409 self._has_iterated = True
1411 if self._read_before_start:
1412 self._blob_reader.readline()
1413 self._read_before_start = False
1414 start_position = self._blob_reader.tell()
1416 if start_position > self._end_position:
1417 raise StopIteration()
1419 line = self._blob_reader.readline()
1421 if not line:
1422 raise StopIteration()
1424 return start_position, line.rstrip("\n")
1426 def to_json(self):
1427 """Returns an json-compatible input shard spec for remaining inputs."""
1428 new_pos = self._blob_reader.tell()
1429 if self._has_iterated:
1430 new_pos -= 1
1431 return {self.BLOB_KEY_PARAM: self._blob_key,
1432 self.INITIAL_POSITION_PARAM: new_pos,
1433 self.END_POSITION_PARAM: self._end_position}
1435 def __str__(self):
1436 """Returns the string representation of this BlobstoreLineInputReader."""
1437 return "blobstore.BlobKey(%r):[%d, %d]" % (
1438 self._blob_key, self._blob_reader.tell(), self._end_position)
1440 @classmethod
1441 def from_json(cls, json):
1442 """Instantiates an instance of this InputReader for the given shard spec."""
1443 return cls(json[cls.BLOB_KEY_PARAM],
1444 json[cls.INITIAL_POSITION_PARAM],
1445 json[cls.END_POSITION_PARAM])
1447 @classmethod
1448 def validate(cls, mapper_spec):
1449 """Validates mapper spec and all mapper parameters.
1451 Args:
1452 mapper_spec: The MapperSpec for this InputReader.
1454 Raises:
1455 BadReaderParamsError: required parameters are missing or invalid.
1457 if mapper_spec.input_reader_class() != cls:
1458 raise BadReaderParamsError("Mapper input reader class mismatch")
1459 params = _get_params(mapper_spec)
1460 if cls.BLOB_KEYS_PARAM not in params:
1461 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1462 blob_keys = params[cls.BLOB_KEYS_PARAM]
1463 if isinstance(blob_keys, basestring):
1466 blob_keys = blob_keys.split(",")
1467 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1468 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1469 if not blob_keys:
1470 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1471 for blob_key in blob_keys:
1472 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1473 if not blob_info:
1474 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1475 blob_key)
1477 @classmethod
1478 def split_input(cls, mapper_spec):
1479 """Returns a list of shard_count input_spec_shards for input_spec.
1481 Args:
1482 mapper_spec: The mapper specification to split from. Must contain
1483 'blob_keys' parameter with one or more blob keys.
1485 Returns:
1486 A list of BlobstoreInputReaders corresponding to the specified shards.
1488 params = _get_params(mapper_spec)
1489 blob_keys = params[cls.BLOB_KEYS_PARAM]
1490 if isinstance(blob_keys, basestring):
1493 blob_keys = blob_keys.split(",")
1495 blob_sizes = {}
1496 for blob_key in blob_keys:
1497 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1498 blob_sizes[blob_key] = blob_info.size
1500 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1501 shards_per_blob = shard_count // len(blob_keys)
1502 if shards_per_blob == 0:
1503 shards_per_blob = 1
1505 chunks = []
1506 for blob_key, blob_size in blob_sizes.items():
1507 blob_chunk_size = blob_size // shards_per_blob
1508 for i in xrange(shards_per_blob - 1):
1509 chunks.append(BlobstoreLineInputReader.from_json(
1510 {cls.BLOB_KEY_PARAM: blob_key,
1511 cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
1512 cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
1513 chunks.append(BlobstoreLineInputReader.from_json(
1514 {cls.BLOB_KEY_PARAM: blob_key,
1515 cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
1516 cls.END_POSITION_PARAM: blob_size}))
1517 return chunks
1520 class BlobstoreZipInputReader(InputReader):
1521 """Input reader for files from a zip archive stored in the Blobstore.
1523 Each instance of the reader will read the TOC, from the end of the zip file,
1524 and then only the contained files which it is responsible for.
1528 _MAX_SHARD_COUNT = 256
1531 BLOB_KEY_PARAM = "blob_key"
1532 START_INDEX_PARAM = "start_index"
1533 END_INDEX_PARAM = "end_index"
1535 def __init__(self, blob_key, start_index, end_index,
1536 _reader=blobstore.BlobReader):
1537 """Initializes this instance with the given blob key and file range.
1539 This BlobstoreZipInputReader will read from the file with index start_index
1540 up to but not including the file with index end_index.
1542 Args:
1543 blob_key: the BlobKey that this input reader is processing.
1544 start_index: the index of the first file to read.
1545 end_index: the index of the first file that will not be read.
1546 _reader: a callable that returns a file-like object for reading blobs.
1547 Used for dependency injection.
1549 self._blob_key = blob_key
1550 self._start_index = start_index
1551 self._end_index = end_index
1552 self._reader = _reader
1553 self._zip = None
1554 self._entries = None
1556 def next(self):
1557 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1559 Returns:
1560 The next input from this input reader, in the form of a 2-tuple.
1561 The first element of the tuple is a zipfile.ZipInfo object.
1562 The second element of the tuple is a zero-argument function that, when
1563 called, returns the complete body of the file.
1565 if not self._zip:
1566 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1568 self._entries = self._zip.infolist()[self._start_index:self._end_index]
1569 self._entries.reverse()
1570 if not self._entries:
1571 raise StopIteration()
1572 entry = self._entries.pop()
1573 self._start_index += 1
1574 return (entry, lambda: self._read(entry))
1576 def _read(self, entry):
1577 """Read entry content.
1579 Args:
1580 entry: zip file entry as zipfile.ZipInfo.
1581 Returns:
1582 Entry content as string.
1584 start_time = time.time()
1585 content = self._zip.read(entry.filename)
1587 ctx = context.get()
1588 if ctx:
1589 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1590 operation.counters.Increment(
1591 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1593 return content
1595 @classmethod
1596 def from_json(cls, json):
1597 """Creates an instance of the InputReader for the given input shard state.
1599 Args:
1600 json: The InputReader state as a dict-like object.
1602 Returns:
1603 An instance of the InputReader configured using the values of json.
1605 return cls(json[cls.BLOB_KEY_PARAM],
1606 json[cls.START_INDEX_PARAM],
1607 json[cls.END_INDEX_PARAM])
1609 def to_json(self):
1610 """Returns an input shard state for the remaining inputs.
1612 Returns:
1613 A json-izable version of the remaining InputReader.
1615 return {self.BLOB_KEY_PARAM: self._blob_key,
1616 self.START_INDEX_PARAM: self._start_index,
1617 self.END_INDEX_PARAM: self._end_index}
1619 def __str__(self):
1620 """Returns the string representation of this BlobstoreZipInputReader."""
1621 return "blobstore.BlobKey(%r):[%d, %d]" % (
1622 self._blob_key, self._start_index, self._end_index)
1624 @classmethod
1625 def validate(cls, mapper_spec):
1626 """Validates mapper spec and all mapper parameters.
1628 Args:
1629 mapper_spec: The MapperSpec for this InputReader.
1631 Raises:
1632 BadReaderParamsError: required parameters are missing or invalid.
1634 if mapper_spec.input_reader_class() != cls:
1635 raise BadReaderParamsError("Mapper input reader class mismatch")
1636 params = _get_params(mapper_spec)
1637 if cls.BLOB_KEY_PARAM not in params:
1638 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1639 blob_key = params[cls.BLOB_KEY_PARAM]
1640 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1641 if not blob_info:
1642 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1643 blob_key)
1645 @classmethod
1646 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1647 """Returns a list of input shard states for the input spec.
1649 Args:
1650 mapper_spec: The MapperSpec for this InputReader. Must contain
1651 'blob_key' parameter with one blob key.
1652 _reader: a callable that returns a file-like object for reading blobs.
1653 Used for dependency injection.
1655 Returns:
1656 A list of InputReaders spanning files within the zip.
1658 params = _get_params(mapper_spec)
1659 blob_key = params[cls.BLOB_KEY_PARAM]
1660 zip_input = zipfile.ZipFile(_reader(blob_key))
1661 zfiles = zip_input.infolist()
1662 total_size = sum(x.file_size for x in zfiles)
1663 num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
1664 size_per_shard = total_size // num_shards
1668 shard_start_indexes = [0]
1669 current_shard_size = 0
1670 for i, fileinfo in enumerate(zfiles):
1671 current_shard_size += fileinfo.file_size
1672 if current_shard_size >= size_per_shard:
1673 shard_start_indexes.append(i + 1)
1674 current_shard_size = 0
1676 if shard_start_indexes[-1] != len(zfiles):
1677 shard_start_indexes.append(len(zfiles))
1679 return [cls(blob_key, start_index, end_index, _reader)
1680 for start_index, end_index
1681 in zip(shard_start_indexes, shard_start_indexes[1:])]
1684 class BlobstoreZipLineInputReader(InputReader):
1685 """Input reader for newline delimited files in zip archives from Blobstore.
1687 This has the same external interface as the BlobstoreLineInputReader, in that
1688 it takes a list of blobs as its input and yields lines to the reader.
1689 However the blobs themselves are expected to be zip archives of line delimited
1690 files instead of the files themselves.
1692 This is useful as many line delimited files gain greatly from compression.
1696 _MAX_SHARD_COUNT = 256
1699 _MAX_BLOB_KEYS_COUNT = 246
1702 BLOB_KEYS_PARAM = "blob_keys"
1705 BLOB_KEY_PARAM = "blob_key"
1706 START_FILE_INDEX_PARAM = "start_file_index"
1707 END_FILE_INDEX_PARAM = "end_file_index"
1708 OFFSET_PARAM = "offset"
1710 def __init__(self, blob_key, start_file_index, end_file_index, offset,
1711 _reader=blobstore.BlobReader):
1712 """Initializes this instance with the given blob key and file range.
1714 This BlobstoreZipLineInputReader will read from the file with index
1715 start_file_index up to but not including the file with index end_file_index.
1716 It will return lines starting at offset within file[start_file_index]
1718 Args:
1719 blob_key: the BlobKey that this input reader is processing.
1720 start_file_index: the index of the first file to read within the zip.
1721 end_file_index: the index of the first file that will not be read.
1722 offset: the byte offset within blob_key.zip[start_file_index] to start
1723 reading. The reader will continue to the end of the file.
1724 _reader: a callable that returns a file-like object for reading blobs.
1725 Used for dependency injection.
1727 self._blob_key = blob_key
1728 self._start_file_index = start_file_index
1729 self._end_file_index = end_file_index
1730 self._initial_offset = offset
1731 self._reader = _reader
1732 self._zip = None
1733 self._entries = None
1734 self._filestream = None
1736 @classmethod
1737 def validate(cls, mapper_spec):
1738 """Validates mapper spec and all mapper parameters.
1740 Args:
1741 mapper_spec: The MapperSpec for this InputReader.
1743 Raises:
1744 BadReaderParamsError: required parameters are missing or invalid.
1746 if mapper_spec.input_reader_class() != cls:
1747 raise BadReaderParamsError("Mapper input reader class mismatch")
1748 params = _get_params(mapper_spec)
1749 if cls.BLOB_KEYS_PARAM not in params:
1750 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1752 blob_keys = params[cls.BLOB_KEYS_PARAM]
1753 if isinstance(blob_keys, basestring):
1756 blob_keys = blob_keys.split(",")
1757 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1758 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1759 if not blob_keys:
1760 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1761 for blob_key in blob_keys:
1762 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1763 if not blob_info:
1764 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1765 blob_key)
1767 @classmethod
1768 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1769 """Returns a list of input readers for the input spec.
1771 Args:
1772 mapper_spec: The MapperSpec for this InputReader. Must contain
1773 'blob_keys' parameter with one or more blob keys.
1774 _reader: a callable that returns a file-like object for reading blobs.
1775 Used for dependency injection.
1777 Returns:
1778 A list of InputReaders spanning the subfiles within the blobs.
1779 There will be at least one reader per blob, but it will otherwise
1780 attempt to keep the expanded size even.
1782 params = _get_params(mapper_spec)
1783 blob_keys = params[cls.BLOB_KEYS_PARAM]
1784 if isinstance(blob_keys, basestring):
1787 blob_keys = blob_keys.split(",")
1789 blob_files = {}
1790 total_size = 0
1791 for blob_key in blob_keys:
1792 zip_input = zipfile.ZipFile(_reader(blob_key))
1793 blob_files[blob_key] = zip_input.infolist()
1794 total_size += sum(x.file_size for x in blob_files[blob_key])
1796 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1802 size_per_shard = total_size // shard_count
1804 readers = []
1805 for blob_key in blob_keys:
1806 bfiles = blob_files[blob_key]
1807 current_shard_size = 0
1808 start_file_index = 0
1809 next_file_index = 0
1810 for fileinfo in bfiles:
1811 next_file_index += 1
1812 current_shard_size += fileinfo.file_size
1813 if current_shard_size >= size_per_shard:
1814 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1815 _reader))
1816 current_shard_size = 0
1817 start_file_index = next_file_index
1818 if current_shard_size != 0:
1819 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1820 _reader))
1822 return readers
1824 def next(self):
1825 """Returns the next line from this input reader as (lineinfo, line) tuple.
1827 Returns:
1828 The next input from this input reader, in the form of a 2-tuple.
1829 The first element of the tuple describes the source, it is itself
1830 a tuple (blobkey, filenumber, byteoffset).
1831 The second element of the tuple is the line found at that offset.
1833 if not self._filestream:
1834 if not self._zip:
1835 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1837 self._entries = self._zip.infolist()[self._start_file_index:
1838 self._end_file_index]
1839 self._entries.reverse()
1840 if not self._entries:
1841 raise StopIteration()
1842 entry = self._entries.pop()
1843 value = self._zip.read(entry.filename)
1844 self._filestream = StringIO.StringIO(value)
1845 if self._initial_offset:
1846 self._filestream.seek(self._initial_offset)
1847 self._filestream.readline()
1849 start_position = self._filestream.tell()
1850 line = self._filestream.readline()
1852 if not line:
1854 self._filestream.close()
1855 self._filestream = None
1856 self._start_file_index += 1
1857 self._initial_offset = 0
1858 return self.next()
1860 return ((self._blob_key, self._start_file_index, start_position),
1861 line.rstrip("\n"))
1863 def _next_offset(self):
1864 """Return the offset of the next line to read."""
1865 if self._filestream:
1866 offset = self._filestream.tell()
1867 if offset:
1868 offset -= 1
1869 else:
1870 offset = self._initial_offset
1872 return offset
1874 def to_json(self):
1875 """Returns an input shard state for the remaining inputs.
1877 Returns:
1878 A json-izable version of the remaining InputReader.
1881 return {self.BLOB_KEY_PARAM: self._blob_key,
1882 self.START_FILE_INDEX_PARAM: self._start_file_index,
1883 self.END_FILE_INDEX_PARAM: self._end_file_index,
1884 self.OFFSET_PARAM: self._next_offset()}
1886 @classmethod
1887 def from_json(cls, json, _reader=blobstore.BlobReader):
1888 """Creates an instance of the InputReader for the given input shard state.
1890 Args:
1891 json: The InputReader state as a dict-like object.
1892 _reader: For dependency injection.
1894 Returns:
1895 An instance of the InputReader configured using the values of json.
1897 return cls(json[cls.BLOB_KEY_PARAM],
1898 json[cls.START_FILE_INDEX_PARAM],
1899 json[cls.END_FILE_INDEX_PARAM],
1900 json[cls.OFFSET_PARAM],
1901 _reader)
1903 def __str__(self):
1904 """Returns the string representation of this reader.
1906 Returns:
1907 string blobkey:[start file num, end file num]:current offset.
1909 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1910 self._blob_key, self._start_file_index, self._end_file_index,
1911 self._next_offset())
1914 class RandomStringInputReader(InputReader):
1915 """RandomStringInputReader generates random strings as output.
1917 Primary usage is to populate output with testing entries.
1921 COUNT = "count"
1923 STRING_LENGTH = "string_length"
1925 DEFAULT_STRING_LENGTH = 10
1927 def __init__(self, count, string_length):
1928 """Initialize input reader.
1930 Args:
1931 count: number of entries this shard should generate.
1932 string_length: the length of generated random strings.
1934 self._count = count
1935 self._string_length = string_length
1937 def __iter__(self):
1938 ctx = context.get()
1940 while self._count:
1941 self._count -= 1
1942 start_time = time.time()
1943 content = "".join(random.choice(string.ascii_lowercase)
1944 for _ in range(self._string_length))
1945 if ctx:
1946 operation.counters.Increment(
1947 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1948 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1949 yield content
1951 @classmethod
1952 def split_input(cls, mapper_spec):
1953 params = _get_params(mapper_spec)
1954 count = params[cls.COUNT]
1955 string_length = cls.DEFAULT_STRING_LENGTH
1956 if cls.STRING_LENGTH in params:
1957 string_length = params[cls.STRING_LENGTH]
1959 shard_count = mapper_spec.shard_count
1960 count_per_shard = count // shard_count
1962 mr_input_readers = [
1963 cls(count_per_shard, string_length) for _ in range(shard_count)]
1965 left = count - count_per_shard*shard_count
1966 if left > 0:
1967 mr_input_readers.append(cls(left, string_length))
1969 return mr_input_readers
1971 @classmethod
1972 def validate(cls, mapper_spec):
1973 if mapper_spec.input_reader_class() != cls:
1974 raise BadReaderParamsError("Mapper input reader class mismatch")
1976 params = _get_params(mapper_spec)
1977 if cls.COUNT not in params:
1978 raise BadReaderParamsError("Must specify %s" % cls.COUNT)
1979 if not isinstance(params[cls.COUNT], int):
1980 raise BadReaderParamsError("%s should be an int but is %s" %
1981 (cls.COUNT, type(params[cls.COUNT])))
1982 if params[cls.COUNT] <= 0:
1983 raise BadReaderParamsError("%s should be a positive int")
1984 if cls.STRING_LENGTH in params and not (
1985 isinstance(params[cls.STRING_LENGTH], int) and
1986 params[cls.STRING_LENGTH] > 0):
1987 raise BadReaderParamsError("%s should be a positive int but is %s" %
1988 (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
1989 if (not isinstance(mapper_spec.shard_count, int) or
1990 mapper_spec.shard_count <= 0):
1991 raise BadReaderParamsError(
1992 "shard_count should be a positive int but is %s" %
1993 mapper_spec.shard_count)
1995 @classmethod
1996 def from_json(cls, json):
1997 return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
1999 def to_json(self):
2000 return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
2009 class NamespaceInputReader(InputReader):
2010 """An input reader to iterate over namespaces.
2012 This reader yields namespace names as string.
2013 It will always produce only one shard.
2016 NAMESPACE_RANGE_PARAM = "namespace_range"
2017 BATCH_SIZE_PARAM = "batch_size"
2018 _BATCH_SIZE = 10
2020 def __init__(self, ns_range, batch_size=_BATCH_SIZE):
2021 self.ns_range = ns_range
2022 self._batch_size = batch_size
2024 def to_json(self):
2025 """Serializes all the data in this query range into json form.
2027 Returns:
2028 all the data in json-compatible map.
2030 return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
2031 self.BATCH_SIZE_PARAM: self._batch_size}
2033 @classmethod
2034 def from_json(cls, json):
2035 """Create new DatastoreInputReader from the json, encoded by to_json.
2037 Args:
2038 json: json map representation of DatastoreInputReader.
2040 Returns:
2041 an instance of DatastoreInputReader with all data deserialized from json.
2043 return cls(
2044 namespace_range.NamespaceRange.from_json_object(
2045 json[cls.NAMESPACE_RANGE_PARAM]),
2046 json[cls.BATCH_SIZE_PARAM])
2048 @classmethod
2049 def validate(cls, mapper_spec):
2050 """Validates mapper spec.
2052 Args:
2053 mapper_spec: The MapperSpec for this InputReader.
2055 Raises:
2056 BadReaderParamsError: required parameters are missing or invalid.
2058 if mapper_spec.input_reader_class() != cls:
2059 raise BadReaderParamsError("Input reader class mismatch")
2060 params = _get_params(mapper_spec)
2061 if cls.BATCH_SIZE_PARAM in params:
2062 try:
2063 batch_size = int(params[cls.BATCH_SIZE_PARAM])
2064 if batch_size < 1:
2065 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
2066 except ValueError, e:
2067 raise BadReaderParamsError("Bad batch size: %s" % e)
2069 @classmethod
2070 def split_input(cls, mapper_spec):
2071 """Returns a list of input readers for the input spec.
2073 Args:
2074 mapper_spec: The MapperSpec for this InputReader.
2076 Returns:
2077 A list of InputReaders.
2079 batch_size = int(_get_params(mapper_spec).get(
2080 cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
2081 shard_count = mapper_spec.shard_count
2082 namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
2083 contiguous=True)
2084 return [NamespaceInputReader(ns_range, batch_size)
2085 for ns_range in namespace_ranges]
2087 def __iter__(self):
2088 while True:
2089 keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
2090 if not keys:
2091 break
2093 for key in keys:
2094 namespace = metadata.Namespace.key_to_namespace(key)
2095 self.ns_range = self.ns_range.with_start_after(namespace)
2096 yield namespace
2098 def __str__(self):
2099 return repr(self.ns_range)
2102 class RecordsReader(InputReader):
2103 """Reader to read a list of Files API file in records format.
2105 The number of input shards can be specified by the SHARDS_PARAM
2106 mapper parameter. Input files cannot be split, so there will be at most
2107 one shard per file. Also the number of shards will not be reduced based on
2108 the number of input files, so shards in always equals shards out.
2111 FILE_PARAM = "file"
2112 FILES_PARAM = "files"
2114 def __init__(self, filenames, position):
2115 """Constructor.
2117 Args:
2118 filenames: list of filenames.
2119 position: file position to start reading from as int.
2121 self._filenames = filenames
2122 if self._filenames:
2123 self._reader = records.RecordsReader(
2124 files.BufferedFile(self._filenames[0]))
2125 self._reader.seek(position)
2126 else:
2127 self._reader = None
2129 def __iter__(self):
2130 """Iterate over records in file.
2132 Yields:
2133 Records as strings.
2135 ctx = context.get()
2137 while self._reader:
2138 try:
2139 start_time = time.time()
2140 record = self._reader.read()
2141 if ctx:
2142 operation.counters.Increment(
2143 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2144 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(record))(ctx)
2145 yield record
2146 except (files.ExistenceError), e:
2147 raise errors.FailJobError("ExistenceError: %s" % e)
2148 except (files.UnknownError), e:
2149 raise errors.RetrySliceError("UnknownError: %s" % e)
2150 except EOFError:
2151 self._filenames.pop(0)
2152 if not self._filenames:
2153 self._reader = None
2154 else:
2155 self._reader = records.RecordsReader(
2156 files.BufferedFile(self._filenames[0]))
2158 @classmethod
2159 def from_json(cls, json):
2160 """Creates an instance of the InputReader for the given input shard state.
2162 Args:
2163 json: The InputReader state as a dict-like object.
2165 Returns:
2166 An instance of the InputReader configured using the values of json.
2168 return cls(json["filenames"], json["position"])
2170 def to_json(self):
2171 """Returns an input shard state for the remaining inputs.
2173 Returns:
2174 A json-izable version of the remaining InputReader.
2176 result = {
2177 "filenames": self._filenames,
2178 "position": 0,
2180 if self._reader:
2181 result["position"] = self._reader.tell()
2182 return result
2184 @classmethod
2185 def split_input(cls, mapper_spec):
2186 """Returns a list of input readers for the input spec.
2188 Args:
2189 mapper_spec: The MapperSpec for this InputReader.
2191 Returns:
2192 A list of InputReaders.
2194 params = _get_params(mapper_spec)
2195 shard_count = mapper_spec.shard_count
2197 if cls.FILES_PARAM in params:
2198 filenames = params[cls.FILES_PARAM]
2199 if isinstance(filenames, basestring):
2200 filenames = filenames.split(",")
2201 else:
2202 filenames = [params[cls.FILE_PARAM]]
2204 batch_list = [[] for _ in xrange(shard_count)]
2205 for index, _ in enumerate(filenames):
2207 batch_list[index % shard_count].append(filenames[index])
2210 batch_list.sort(reverse=True, key=len)
2211 return [cls(batch, 0) for batch in batch_list]
2213 @classmethod
2214 def validate(cls, mapper_spec):
2215 """Validates mapper spec and all mapper parameters.
2217 Args:
2218 mapper_spec: The MapperSpec for this InputReader.
2220 Raises:
2221 BadReaderParamsError: required parameters are missing or invalid.
2223 if mapper_spec.input_reader_class() != cls:
2224 raise errors.BadReaderParamsError("Input reader class mismatch")
2225 params = _get_params(mapper_spec)
2226 if (cls.FILES_PARAM not in params and
2227 cls.FILE_PARAM not in params):
2228 raise BadReaderParamsError(
2229 "Must specify '%s' or '%s' parameter for mapper input" %
2230 (cls.FILES_PARAM, cls.FILE_PARAM))
2232 def __str__(self):
2233 position = 0
2234 if self._reader:
2235 position = self._reader.tell()
2236 return "%s:%s" % (self._filenames, position)
2239 class LogInputReader(InputReader):
2240 """Input reader for a time range of logs via the Logs Reader API.
2242 The number of input shards may be specified by the SHARDS_PARAM mapper
2243 parameter. A starting and ending time (in seconds since the Unix epoch) are
2244 required to generate time ranges over which to shard the input.
2247 START_TIME_PARAM = "start_time"
2248 END_TIME_PARAM = "end_time"
2249 MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
2250 INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
2251 INCLUDE_APP_LOGS_PARAM = "include_app_logs"
2252 VERSION_IDS_PARAM = "version_ids"
2253 MODULE_VERSIONS_PARAM = "module_versions"
2256 _OFFSET_PARAM = "offset"
2257 _PROTOTYPE_REQUEST_PARAM = "prototype_request"
2259 _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
2260 MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
2261 INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
2262 MODULE_VERSIONS_PARAM, _PROTOTYPE_REQUEST_PARAM])
2263 _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
2265 def __init__(self,
2266 start_time=None,
2267 end_time=None,
2268 minimum_log_level=None,
2269 include_incomplete=False,
2270 include_app_logs=False,
2271 version_ids=None,
2272 module_versions=None,
2273 **kwargs):
2274 """Constructor.
2276 Args:
2277 start_time: The earliest request completion or last-update time of logs
2278 that should be mapped over, in seconds since the Unix epoch.
2279 end_time: The latest request completion or last-update time that logs
2280 should be mapped over, in seconds since the Unix epoch.
2281 minimum_log_level: An application log level which serves as a filter on
2282 the requests mapped over--requests with no application log at or above
2283 the specified level will be omitted, even if include_app_logs is False.
2284 include_incomplete: Whether or not to include requests that have started
2285 but not yet finished, as a boolean. Defaults to False.
2286 include_app_logs: Whether or not to include application level logs in the
2287 mapped logs, as a boolean. Defaults to False.
2288 version_ids: A list of version ids whose logs should be read. This can not
2289 be used with module_versions
2290 module_versions: A list of tuples containing a module and version id
2291 whose logs should be read. This can not be used with version_ids
2292 **kwargs: A dictionary of keywords associated with this input reader.
2294 InputReader.__init__(self)
2298 self.__params = dict(kwargs)
2300 if start_time is not None:
2301 self.__params[self.START_TIME_PARAM] = start_time
2302 if end_time is not None:
2303 self.__params[self.END_TIME_PARAM] = end_time
2304 if minimum_log_level is not None:
2305 self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
2306 if include_incomplete is not None:
2307 self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
2308 if include_app_logs is not None:
2309 self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
2310 if version_ids:
2311 self.__params[self.VERSION_IDS_PARAM] = version_ids
2312 if module_versions:
2313 self.__params[self.MODULE_VERSIONS_PARAM] = module_versions
2316 if self._PROTOTYPE_REQUEST_PARAM in self.__params:
2317 prototype_request = log_service_pb.LogReadRequest(
2318 self.__params[self._PROTOTYPE_REQUEST_PARAM])
2319 self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
2321 def __iter__(self):
2322 """Iterates over logs in a given range of time.
2324 Yields:
2325 A RequestLog containing all the information for a single request.
2327 for log in logservice.fetch(**self.__params):
2328 self.__params[self._OFFSET_PARAM] = log.offset
2329 yield log
2331 @classmethod
2332 def from_json(cls, json):
2333 """Creates an instance of the InputReader for the given input shard's state.
2335 Args:
2336 json: The InputReader state as a dict-like object.
2338 Returns:
2339 An instance of the InputReader configured using the given JSON parameters.
2342 params = dict((str(k), v) for k, v in json.iteritems()
2343 if k in cls._PARAMS)
2348 if cls._OFFSET_PARAM in params:
2349 params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
2350 return cls(**params)
2352 def to_json(self):
2353 """Returns an input shard state for the remaining inputs.
2355 Returns:
2356 A JSON serializable version of the remaining input to read.
2359 params = dict(self.__params)
2360 if self._PROTOTYPE_REQUEST_PARAM in params:
2361 prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
2362 params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
2363 if self._OFFSET_PARAM in params:
2364 params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
2365 return params
2367 @classmethod
2368 def split_input(cls, mapper_spec):
2369 """Returns a list of input readers for the given input specification.
2371 Args:
2372 mapper_spec: The MapperSpec for this InputReader.
2374 Returns:
2375 A list of InputReaders.
2377 params = _get_params(mapper_spec)
2378 shard_count = mapper_spec.shard_count
2381 start_time = params[cls.START_TIME_PARAM]
2382 end_time = params[cls.END_TIME_PARAM]
2383 seconds_per_shard = (end_time - start_time) / shard_count
2386 shards = []
2387 for _ in xrange(shard_count - 1):
2388 params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
2389 seconds_per_shard)
2390 shards.append(LogInputReader(**params))
2391 params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
2394 params[cls.END_TIME_PARAM] = end_time
2395 return shards + [LogInputReader(**params)]
2397 @classmethod
2398 def validate(cls, mapper_spec):
2399 """Validates the mapper's specification and all necessary parameters.
2401 Args:
2402 mapper_spec: The MapperSpec to be used with this InputReader.
2404 Raises:
2405 BadReaderParamsError: If the user fails to specify both a starting time
2406 and an ending time, or if the starting time is later than the ending
2407 time.
2409 if mapper_spec.input_reader_class() != cls:
2410 raise errors.BadReaderParamsError("Input reader class mismatch")
2412 params = _get_params(mapper_spec, allowed_keys=cls._PARAMS)
2413 if (cls.VERSION_IDS_PARAM not in params and
2414 cls.MODULE_VERSIONS_PARAM not in params):
2415 raise errors.BadReaderParamsError("Must specify a list of version ids or "
2416 "module/version ids for mapper input")
2417 if (cls.VERSION_IDS_PARAM in params and
2418 cls.MODULE_VERSIONS_PARAM in params):
2419 raise errors.BadReaderParamsError("Can not supply both version ids or "
2420 "module/version ids. Use only one.")
2421 if (cls.START_TIME_PARAM not in params or
2422 params[cls.START_TIME_PARAM] is None):
2423 raise errors.BadReaderParamsError("Must specify a starting time for "
2424 "mapper input")
2425 if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
2426 params[cls.END_TIME_PARAM] = time.time()
2428 if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
2429 raise errors.BadReaderParamsError("The starting time cannot be later "
2430 "than or the same as the ending time.")
2432 if cls._PROTOTYPE_REQUEST_PARAM in params:
2433 try:
2434 params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
2435 params[cls._PROTOTYPE_REQUEST_PARAM])
2436 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
2437 raise errors.BadReaderParamsError("The prototype request must be "
2438 "parseable as a LogReadRequest.")
2443 try:
2444 logservice.fetch(**params)
2445 except logservice.InvalidArgumentError, e:
2446 raise errors.BadReaderParamsError("One or more parameters are not valid "
2447 "inputs to logservice.fetch(): %s" % e)
2449 def __str__(self):
2450 """Returns the string representation of this LogInputReader."""
2451 params = []
2452 for key in sorted(self.__params.keys()):
2453 value = self.__params[key]
2454 if key is self._PROTOTYPE_REQUEST_PARAM:
2455 params.append("%s='%s'" % (key, value))
2456 elif key is self._OFFSET_PARAM:
2457 params.append("%s='%s'" % (key, value))
2458 else:
2459 params.append("%s=%s" % (key, value))
2461 return "LogInputReader(%s)" % ", ".join(params)
2464 class _GoogleCloudStorageInputReader(InputReader):
2465 """Input reader from Google Cloud Storage using the cloudstorage library.
2467 This class is expected to be subclassed with a reader that understands
2468 user-level records.
2470 Required configuration in the mapper_spec.input_reader dictionary.
2471 BUCKET_NAME_PARAM: name of the bucket to use (with no extra delimiters or
2472 suffixed such as directories.
2473 OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
2474 in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
2475 treated as prefix and all objects with matching names will be read.
2476 Entries should not start with a slash unless that is part of the object's
2477 name. An example list could be:
2478 ["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
2479 To retrieve all files "*" will match every object in the bucket. If a file
2480 is listed twice or is covered by multiple prefixes it will be read twice,
2481 there is no deduplication.
2483 Optional configuration in the mapper_sec.input_reader dictionary.
2484 BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
2485 DELIMITER_PARAM: if specified, turn on the shallow splitting mode.
2486 The delimiter is used as a path separator to designate directory
2487 hierarchy. Matching of prefixes from OBJECT_NAME_PARAM
2488 will stop at the first directory instead of matching
2489 all files under the directory. This allows MR to process bucket with
2490 hundreds of thousands of files.
2494 BUCKET_NAME_PARAM = "bucket_name"
2495 OBJECT_NAMES_PARAM = "objects"
2496 BUFFER_SIZE_PARAM = "buffer_size"
2497 DELIMITER_PARAM = "delimiter"
2500 _ACCOUNT_ID_PARAM = "account_id"
2503 _JSON_PICKLE = "pickle"
2504 _STRING_MAX_FILES_LISTED = 10
2512 def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
2513 delimiter=None):
2514 """Initialize a GoogleCloudStorageInputReader instance.
2516 Args:
2517 filenames: A list of Google Cloud Storage filenames of the form
2518 '/bucket/objectname'.
2519 index: Index of the next filename to read.
2520 buffer_size: The size of the read buffer, None to use default.
2521 _account_id: Internal use only. See cloudstorage documentation.
2522 delimiter: Delimiter used as path separator. See class doc for details.
2524 self._filenames = filenames
2525 self._index = index
2526 self._buffer_size = buffer_size
2527 self._account_id = _account_id
2528 self._delimiter = delimiter
2529 self._bucket = None
2530 self._bucket_iter = None
2532 def _next_file(self):
2533 """Find next filename.
2535 self._filenames may need to be expanded via listbucket.
2537 Returns:
2538 None if no more file is left. Filename otherwise.
2540 while True:
2541 if self._bucket_iter:
2542 try:
2543 return self._bucket_iter.next().filename
2544 except StopIteration:
2545 self._bucket_iter = None
2546 self._bucket = None
2547 if self._index >= len(self._filenames):
2548 return
2549 filename = self._filenames[self._index]
2550 self._index += 1
2551 if self._delimiter is None or not filename.endswith(self._delimiter):
2552 return filename
2553 self._bucket = cloudstorage.listbucket(filename,
2554 delimiter=self._delimiter)
2555 self._bucket_iter = iter(self._bucket)
2557 @classmethod
2558 def get_params(cls, mapper_spec, allowed_keys=None, allow_old=True):
2559 params = _get_params(mapper_spec, allowed_keys, allow_old)
2562 if (mapper_spec.params.get(cls.BUCKET_NAME_PARAM) is not None and
2563 params.get(cls.BUCKET_NAME_PARAM) is None):
2564 params[cls.BUCKET_NAME_PARAM] = mapper_spec.params[cls.BUCKET_NAME_PARAM]
2565 return params
2567 @classmethod
2568 def validate(cls, mapper_spec):
2569 """Validate mapper specification.
2571 Args:
2572 mapper_spec: an instance of model.MapperSpec
2574 Raises:
2575 BadReaderParamsError: if the specification is invalid for any reason such
2576 as missing the bucket name or providing an invalid bucket name.
2578 reader_spec = cls.get_params(mapper_spec, allow_old=False)
2581 if cls.BUCKET_NAME_PARAM not in reader_spec:
2582 raise errors.BadReaderParamsError(
2583 "%s is required for Google Cloud Storage" %
2584 cls.BUCKET_NAME_PARAM)
2585 try:
2586 cloudstorage.validate_bucket_name(
2587 reader_spec[cls.BUCKET_NAME_PARAM])
2588 except ValueError, error:
2589 raise errors.BadReaderParamsError("Bad bucket name, %s" % (error))
2592 if cls.OBJECT_NAMES_PARAM not in reader_spec:
2593 raise errors.BadReaderParamsError(
2594 "%s is required for Google Cloud Storage" %
2595 cls.OBJECT_NAMES_PARAM)
2596 filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2597 if not isinstance(filenames, list):
2598 raise errors.BadReaderParamsError(
2599 "Object name list is not a list but a %s" %
2600 filenames.__class__.__name__)
2601 for filename in filenames:
2602 if not isinstance(filename, basestring):
2603 raise errors.BadReaderParamsError(
2604 "Object name is not a string but a %s" %
2605 filename.__class__.__name__)
2606 if cls.DELIMITER_PARAM in reader_spec:
2607 delimiter = reader_spec[cls.DELIMITER_PARAM]
2608 if not isinstance(delimiter, basestring):
2609 raise errors.BadReaderParamsError(
2610 "%s is not a string but a %s" %
2611 (cls.DELIMITER_PARAM, type(delimiter)))
2613 @classmethod
2614 def split_input(cls, mapper_spec):
2615 """Returns a list of input readers.
2617 An equal number of input files are assigned to each shard (+/- 1). If there
2618 are fewer files than shards, fewer than the requested number of shards will
2619 be used. Input files are currently never split (although for some formats
2620 could be and may be split in a future implementation).
2622 Args:
2623 mapper_spec: an instance of model.MapperSpec.
2625 Returns:
2626 A list of InputReaders. None when no input data can be found.
2628 reader_spec = cls.get_params(mapper_spec, allow_old=False)
2629 bucket = reader_spec[cls.BUCKET_NAME_PARAM]
2630 filenames = reader_spec[cls.OBJECT_NAMES_PARAM]
2631 delimiter = reader_spec.get(cls.DELIMITER_PARAM)
2632 account_id = reader_spec.get(cls._ACCOUNT_ID_PARAM)
2633 buffer_size = reader_spec.get(cls.BUFFER_SIZE_PARAM)
2636 all_filenames = []
2637 for filename in filenames:
2638 if filename.endswith("*"):
2639 all_filenames.extend(
2640 [file_stat.filename for file_stat in cloudstorage.listbucket(
2641 "/" + bucket + "/" + filename[:-1], delimiter=delimiter,
2642 _account_id=account_id)])
2643 else:
2644 all_filenames.append("/%s/%s" % (bucket, filename))
2647 readers = []
2648 for shard in range(0, mapper_spec.shard_count):
2649 shard_filenames = all_filenames[shard::mapper_spec.shard_count]
2650 if shard_filenames:
2651 readers.append(cls(
2652 shard_filenames, buffer_size=buffer_size, _account_id=account_id,
2653 delimiter=delimiter))
2654 return readers
2656 @classmethod
2657 def from_json(cls, state):
2658 obj = pickle.loads(state[cls._JSON_PICKLE])
2659 if obj._bucket:
2660 obj._bucket_iter = iter(obj._bucket)
2661 return obj
2663 def to_json(self):
2664 before_iter = self._bucket_iter
2665 self._bucket_iter = None
2666 try:
2667 return {self._JSON_PICKLE: pickle.dumps(self)}
2668 finally:
2669 self._bucket_itr = before_iter
2671 def next(self):
2672 """Returns the next input from this input reader, a block of bytes.
2674 Non existent files will be logged and skipped. The file might have been
2675 removed after input splitting.
2677 Returns:
2678 The next input from this input reader in the form of a cloudstorage
2679 ReadBuffer that supports a File-like interface (read, readline, seek,
2680 tell, and close). An error may be raised if the file can not be opened.
2682 Raises:
2683 StopIteration: The list of files has been exhausted.
2685 options = {}
2686 if self._buffer_size:
2687 options["read_buffer_size"] = self._buffer_size
2688 if self._account_id:
2689 options["_account_id"] = self._account_id
2690 while True:
2691 filename = self._next_file()
2692 if filename is None:
2693 raise StopIteration()
2694 try:
2695 start_time = time.time()
2696 handle = cloudstorage.open(filename, **options)
2698 ctx = context.get()
2699 if ctx:
2700 operation.counters.Increment(
2701 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2703 return handle
2704 except cloudstorage.NotFoundError:
2705 logging.warning("File %s may have been removed. Skipping file.",
2706 filename)
2708 def __str__(self):
2710 num_files = len(self._filenames)
2711 if num_files > self._STRING_MAX_FILES_LISTED:
2712 names = "%s...%s + %d not shown" % (
2713 ",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]),
2714 self._filenames[-1],
2715 num_files - self._STRING_MAX_FILES_LISTED)
2716 else:
2717 names = ",".join(self._filenames)
2719 if self._index > num_files:
2720 status = "EOF"
2721 else:
2722 status = "Next %s (%d of %d)" % (
2723 self._filenames[self._index],
2724 self._index + 1,
2725 num_files)
2726 return "CloudStorage [%s, %s]" % (status, names)
2729 class _GoogleCloudStorageRecordInputReader(_GoogleCloudStorageInputReader):
2730 """Read data from a Google Cloud Storage file using LevelDB format.
2732 See the _GoogleCloudStorageOutputWriter for additional configuration options.
2735 def __getstate__(self):
2736 result = self.__dict__.copy()
2738 if "_record_reader" in result:
2741 result.pop("_record_reader")
2742 return result
2744 def next(self):
2745 """Returns the next input from this input reader, a record.
2747 Returns:
2748 The next input from this input reader in the form of a record read from
2749 an LevelDB file.
2751 Raises:
2752 StopIteration: The ordered set records has been exhausted.
2754 while True:
2755 if not hasattr(self, "_cur_handle") or self._cur_handle is None:
2757 self._cur_handle = super(_GoogleCloudStorageRecordInputReader,
2758 self).next()
2759 if not hasattr(self, "_record_reader") or self._record_reader is None:
2760 self._record_reader = records.RecordsReader(self._cur_handle)
2762 try:
2763 start_time = time.time()
2764 content = self._record_reader.read()
2766 ctx = context.get()
2767 if ctx:
2768 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
2769 operation.counters.Increment(
2770 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
2771 return content
2773 except EOFError:
2774 self._cur_handle = None
2775 self._record_reader = None
2778 class _ReducerReader(_GoogleCloudStorageRecordInputReader):
2779 """Reader to read KeyValues records from GCS."""
2781 expand_parameters = True
2783 def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
2784 delimiter=None):
2785 super(_ReducerReader, self).__init__(filenames, index, buffer_size,
2786 _account_id, delimiter)
2787 self.current_key = None
2788 self.current_values = None
2790 def __iter__(self):
2791 ctx = context.get()
2792 combiner = None
2794 if ctx:
2795 combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec")
2796 if combiner_spec:
2797 combiner = util.handler_for_name(combiner_spec)
2799 try:
2800 while True:
2801 binary_record = super(_ReducerReader, self).next()
2802 proto = file_service_pb.KeyValues()
2803 proto.ParseFromString(binary_record)
2805 to_yield = None
2806 if self.current_key is not None and self.current_key != proto.key():
2807 to_yield = (self.current_key, self.current_values)
2808 self.current_key = None
2809 self.current_values = None
2811 if self.current_key is None:
2812 self.current_key = proto.key()
2813 self.current_values = []
2815 if combiner:
2816 combiner_result = combiner(
2817 self.current_key, proto.value_list(), self.current_values)
2819 if not util.is_generator(combiner_result):
2820 raise errors.BadCombinerOutputError(
2821 "Combiner %s should yield values instead of returning them "
2822 "(%s)" % (combiner, combiner_result))
2824 self.current_values = []
2825 for value in combiner_result:
2826 if isinstance(value, operation.Operation):
2827 value(ctx)
2828 else:
2830 self.current_values.append(value)
2835 if not to_yield:
2836 yield ALLOW_CHECKPOINT
2837 else:
2839 self.current_values.extend(proto.value_list())
2841 if to_yield:
2842 yield to_yield
2844 yield ALLOW_CHECKPOINT
2845 except StopIteration:
2846 pass
2850 if self.current_key is not None:
2851 to_yield = (self.current_key, self.current_values)
2852 self.current_key = None
2853 self.current_values = None
2854 yield to_yield
2856 @staticmethod
2857 def encode_data(data):
2858 """Encodes the given data, which may have include raw bytes.
2860 Works around limitations in JSON encoding, which cannot handle raw bytes.
2862 Args:
2863 data: the data to encode.
2865 Returns:
2866 The data encoded.
2868 return base64.b64encode(pickle.dumps(data))
2870 @staticmethod
2871 def decode_data(data):
2872 """Decodes data encoded with the encode_data function."""
2873 return pickle.loads(base64.b64decode(data))
2875 def to_json(self):
2876 """Returns an input shard state for the remaining inputs.
2878 Returns:
2879 A json-izable version of the remaining InputReader.
2881 result = super(_ReducerReader, self).to_json()
2882 result["current_key"] = self.encode_data(self.current_key)
2883 result["current_values"] = self.encode_data(self.current_values)
2884 return result
2886 @classmethod
2887 def from_json(cls, json):
2888 """Creates an instance of the InputReader for the given input shard state.
2890 Args:
2891 json: The InputReader state as a dict-like object.
2893 Returns:
2894 An instance of the InputReader configured using the values of json.
2896 result = super(_ReducerReader, cls).from_json(json)
2897 result.current_key = _ReducerReader.decode_data(json["current_key"])
2898 result.current_values = _ReducerReader.decode_data(json["current_values"])
2899 return result