App Engine Python SDK version 1.7.7
[gae.git] / python / google / appengine / ext / mapreduce / input_readers.py
blob7add4cb70239d2a47f5a91c7211bffc871f4747c
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Defines input readers for MapReduce."""
36 __all__ = [
37 "AbstractDatastoreInputReader",
38 "ALLOW_CHECKPOINT",
39 "BadReaderParamsError",
40 "BlobstoreLineInputReader",
41 "BlobstoreZipInputReader",
42 "BlobstoreZipLineInputReader",
43 "COUNTER_IO_READ_BYTES",
44 "COUNTER_IO_READ_MSEC",
45 "ConsistentKeyReader",
46 "DatastoreEntityInputReader",
47 "DatastoreInputReader",
48 "DatastoreKeyInputReader",
49 "FileInputReader",
50 "RandomStringInputReader",
51 "Error",
52 "InputReader",
53 "LogInputReader",
54 "NamespaceInputReader",
55 "RecordsReader",
60 import base64
61 import copy
62 import logging
63 import random
64 import string
65 import StringIO
66 import time
67 import zipfile
69 from google.net.proto import ProtocolBuffer
70 try:
71 from google.appengine.ext import ndb
72 except ImportError:
73 ndb = None
74 from google.appengine.api import datastore
75 from google.appengine.api import files
76 from google.appengine.api import logservice
77 from google.appengine.api.files import records
78 from google.appengine.api.logservice import log_service_pb
79 from google.appengine.datastore import datastore_query
80 from google.appengine.datastore import datastore_rpc
81 from google.appengine.ext import blobstore
82 from google.appengine.ext import db
83 from google.appengine.ext import key_range
84 from google.appengine.ext.db import metadata
85 from google.appengine.ext.mapreduce import context
86 from google.appengine.ext.mapreduce import errors
87 from google.appengine.ext.mapreduce import file_format_parser
88 from google.appengine.ext.mapreduce import file_format_root
89 from google.appengine.ext.mapreduce import model
90 from google.appengine.ext.mapreduce import namespace_range
91 from google.appengine.ext.mapreduce import operation
92 from google.appengine.ext.mapreduce import util
96 Error = errors.Error
97 BadReaderParamsError = errors.BadReaderParamsError
101 COUNTER_IO_READ_BYTES = "io-read-bytes"
104 COUNTER_IO_READ_MSEC = "io-read-msec"
109 ALLOW_CHECKPOINT = object()
112 class InputReader(model.JsonMixin):
113 """Abstract base class for input readers.
115 InputReaders have the following properties:
116 * They are created by using the split_input method to generate a set of
117 InputReaders from a MapperSpec.
118 * They generate inputs to the mapper via the iterator interface.
119 * After creation, they can be serialized and resumed using the JsonMixin
120 interface.
121 * They are cast to string for a user-readable description; it may be
122 valuable to implement __str__.
128 expand_parameters = False
131 _APP_PARAM = "_app"
132 NAMESPACE_PARAM = "namespace"
133 NAMESPACES_PARAM = "namespaces"
135 def __iter__(self):
136 return self
138 def next(self):
139 """Returns the next input from this input reader as a key, value pair.
141 Returns:
142 The next input from this input reader.
144 raise NotImplementedError("next() not implemented in %s" % self.__class__)
146 @classmethod
147 def from_json(cls, input_shard_state):
148 """Creates an instance of the InputReader for the given input shard state.
150 Args:
151 input_shard_state: The InputReader state as a dict-like object.
153 Returns:
154 An instance of the InputReader configured using the values of json.
156 raise NotImplementedError("from_json() not implemented in %s" % cls)
158 def to_json(self):
159 """Returns an input shard state for the remaining inputs.
161 Returns:
162 A json-izable version of the remaining InputReader.
164 raise NotImplementedError("to_json() not implemented in %s" %
165 self.__class__)
167 @classmethod
168 def split_input(cls, mapper_spec):
169 """Returns a list of input readers for the input spec.
171 Args:
172 mapper_spec: The MapperSpec for this InputReader.
174 Returns:
175 A list of InputReaders.
177 raise NotImplementedError("split_input() not implemented in %s" % cls)
179 @classmethod
180 def validate(cls, mapper_spec):
181 """Validates mapper spec and all mapper parameters.
183 Input reader parameters are expected to be passed as "input_reader"
184 subdictionary of mapper_spec.params. To be compatible with previous
185 API input reader is advised to check mapper_spec.params and issue
186 a warning if "input_reader" subdicationary is not present.
187 _get_params helper method can be used to simplify implementation.
189 Args:
190 mapper_spec: The MapperSpec for this InputReader.
192 Raises:
193 BadReaderParamsError: required parameters are missing or invalid.
195 raise NotImplementedError("validate() not implemented in %s" % cls)
198 def _get_params(mapper_spec, allowed_keys=None):
199 """Obtain input reader parameters.
201 Utility function for input readers implementation. Fetches parameters
202 from mapreduce specification giving appropriate usage warnings.
204 Args:
205 mapper_spec: The MapperSpec for the job
206 allowed_keys: set of all allowed keys in parameters as strings. If it is not
207 None, then parameters are expected to be in a separate "input_reader"
208 subdictionary of mapper_spec parameters.
210 Returns:
211 mapper parameters as dict
213 Raises:
214 BadReaderParamsError: if parameters are invalid/missing or not allowed.
216 if "input_reader" not in mapper_spec.params:
217 message = ("Input reader's parameters should be specified in "
218 "input_reader subdictionary.")
219 if allowed_keys:
220 raise errors.BadReaderParamsError(message)
221 params = mapper_spec.params
222 params = dict((str(n), v) for n, v in params.iteritems())
223 else:
224 if not isinstance(mapper_spec.params.get("input_reader"), dict):
225 raise errors.BadReaderParamsError(
226 "Input reader parameters should be a dictionary")
227 params = mapper_spec.params.get("input_reader")
228 params = dict((str(n), v) for n, v in params.iteritems())
229 if allowed_keys:
230 params_diff = set(params.keys()) - allowed_keys
231 if params_diff:
232 raise errors.BadReaderParamsError(
233 "Invalid input_reader parameters: %s" % ",".join(params_diff))
234 return params
237 class FileInputReader(InputReader):
238 """Reader to read Files API files of user specified format.
240 This class currently only supports Google Storage files. It will be extended
241 to support blobstore files in the future.
243 Reader Parameters:
244 files: a list of filenames or filename patterns.
245 filename must be of format '/gs/bucket/filename'.
246 filename pattern has format '/gs/bucket/prefix*'.
247 filename pattern will be expanded to filenames with the given prefix.
248 Please see parseGlob in the file api.files.gs.py which is included in the
249 App Engine SDK for supported patterns.
251 Example:
252 ["/gs/bucket1/file1", "/gs/bucket2/*", "/gs/bucket3/p*"]
253 includes "file1", all files under bucket2, and files under bucket3 with
254 a prefix "p" in its name.
256 format: format string determines what your map function gets as its input.
257 format string can be "lines", "bytes", "zip", or a cascade of them plus
258 optional parameters. See file_formats.FORMATS for all supported formats.
259 See file_format_parser._FileFormatParser for format string syntax.
261 Example:
262 "lines": your map function gets files' contents line by line.
263 "bytes": your map function gets files' contents entirely.
264 "zip": InputReader unzips files and feeds your map function each of
265 the archive's member files as a whole.
266 "zip[bytes]: same as above.
267 "zip[lines]": InputReader unzips files and feeds your map function
268 files' contents line by line.
269 "zip[lines(encoding=utf32)]": InputReader unzips files, reads each
270 file with utf32 encoding and feeds your map function line by line.
271 "base64[zip[lines(encoding=utf32)]]: InputReader decodes files with
272 base64 encoding, unzips each file, reads each of them with utf32
273 encoding and feeds your map function line by line.
275 Note that "encoding" only teaches InputReader how to interpret files.
276 The input your map function gets is always a Python str.
280 FILES_PARAM = "files"
281 FORMAT_PARAM = "format"
283 def __init__(self, format_root):
284 """Initialize input reader.
286 Args:
287 format_root: a FileFormatRoot instance.
289 self._file_format_root = format_root
291 def __iter__(self):
292 """Inherit docs."""
293 return self
295 def next(self):
296 """Inherit docs."""
297 ctx = context.get()
298 start_time = time.time()
300 content = self._file_format_root.next().read()
302 if ctx:
303 operation.counters.Increment(
304 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
305 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
307 return content
309 @classmethod
310 def split_input(cls, mapper_spec):
311 """Inherit docs."""
312 params = _get_params(mapper_spec)
315 filenames = []
316 for f in params[cls.FILES_PARAM]:
317 parsedName = files.gs.parseGlob(f)
318 if isinstance(parsedName, tuple):
319 filenames.extend(files.gs.listdir(parsedName[0],
320 {"prefix": parsedName[1]}))
321 else:
322 filenames.append(parsedName)
324 file_format_roots = file_format_root.split(filenames,
325 params[cls.FORMAT_PARAM],
326 mapper_spec.shard_count)
328 return [cls(root) for root in file_format_roots]
330 @classmethod
331 def validate(cls, mapper_spec):
332 """Inherit docs."""
333 if mapper_spec.input_reader_class() != cls:
334 raise BadReaderParamsError("Mapper input reader class mismatch")
337 params = _get_params(mapper_spec)
338 if cls.FILES_PARAM not in params:
339 raise BadReaderParamsError("Must specify %s" % cls.FILES_PARAM)
340 if cls.FORMAT_PARAM not in params:
341 raise BadReaderParamsError("Must specify %s" % cls.FORMAT_PARAM)
343 format_string = params[cls.FORMAT_PARAM]
344 if not isinstance(format_string, basestring):
345 raise BadReaderParamsError("format should be string but is %s" %
346 cls.FORMAT_PARAM)
347 try:
348 file_format_parser.parse(format_string)
349 except ValueError, e:
350 raise BadReaderParamsError(e)
352 paths = params[cls.FILES_PARAM]
353 if not (paths and isinstance(paths, list)):
354 raise BadReaderParamsError("files should be a list of filenames.")
357 try:
358 for path in paths:
359 files.gs.parseGlob(path)
360 except files.InvalidFileNameError:
361 raise BadReaderParamsError("Invalid filename %s." % path)
363 @classmethod
364 def from_json(cls, json):
365 """Inherit docs."""
366 return cls(
367 file_format_root.FileFormatRoot.from_json(json["file_format_root"]))
369 def to_json(self):
370 """Inherit docs."""
371 return {"file_format_root": self._file_format_root.to_json()}
377 class AbstractDatastoreInputReader(InputReader):
378 """Abstract base class for classes that iterate over datastore entities.
380 Concrete subclasses must implement _iter_key_range(self, k_range). See the
381 docstring for that method for details.
385 _BATCH_SIZE = 50
388 _MAX_SHARD_COUNT = 256
391 _OVERSAMPLING_FACTOR = 32
396 MAX_NAMESPACES_FOR_KEY_SHARD = 10
399 ENTITY_KIND_PARAM = "entity_kind"
400 KEYS_ONLY_PARAM = "keys_only"
401 BATCH_SIZE_PARAM = "batch_size"
402 KEY_RANGE_PARAM = "key_range"
403 NAMESPACE_RANGE_PARAM = "namespace_range"
404 CURRENT_KEY_RANGE_PARAM = "current_key_range"
405 FILTERS_PARAM = "filters"
410 def __init__(self,
411 entity_kind,
412 key_ranges=None,
413 ns_range=None,
414 batch_size=_BATCH_SIZE,
415 current_key_range=None,
416 filters=None):
417 """Create new AbstractDatastoreInputReader object.
419 This is internal constructor. Use split_query in a concrete class instead.
421 Args:
422 entity_kind: entity kind as string.
423 key_ranges: a sequence of key_range.KeyRange instances to process. Only
424 one of key_ranges or ns_range can be non-None.
425 ns_range: a namespace_range.NamespaceRange to process. Only one of
426 key_ranges or ns_range can be non-None.
427 batch_size: size of read batch as int.
428 current_key_range: the current key_range.KeyRange being processed.
429 filters: optional list of filters to apply to the query. Each filter is
430 a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
431 User filters are applied first.
433 assert key_ranges is not None or ns_range is not None, (
434 "must specify one of 'key_ranges' or 'ns_range'")
435 assert key_ranges is None or ns_range is None, (
436 "can't specify both 'key_ranges ' and 'ns_range'")
438 self._entity_kind = entity_kind
441 self._key_ranges = key_ranges and list(reversed(key_ranges))
443 self._ns_range = ns_range
444 self._batch_size = int(batch_size)
445 self._current_key_range = current_key_range
446 self._filters = filters
448 @classmethod
449 def _get_raw_entity_kind(cls, entity_kind):
450 if "." in entity_kind:
451 logging.warning(
452 ". detected in entity kind %s specified for reader %s."
453 "Assuming entity kind contains the dot.",
454 entity_kind, cls.__name__)
455 return entity_kind
457 def __iter__(self):
458 """Iterates over the given KeyRanges or NamespaceRange.
460 This method iterates over the given KeyRanges or NamespaceRange and sets
461 the self._current_key_range to the KeyRange currently being processed. It
462 then delegates to the _iter_key_range method to yield that actual
463 results.
465 Yields:
466 Forwards the objects yielded by the subclasses concrete _iter_key_range()
467 method. The caller must consume the result yielded because self.to_json()
468 will not include it.
470 if self._key_ranges is not None:
471 for o in self._iter_key_ranges():
472 yield o
473 elif self._ns_range is not None:
474 for o in self._iter_ns_range():
475 yield o
476 else:
477 assert False, "self._key_ranges and self._ns_range are both None"
479 def _iter_key_ranges(self):
480 """Iterates over self._key_ranges, delegating to self._iter_key_range()."""
481 while True:
482 if self._current_key_range is None:
483 if self._key_ranges:
484 self._current_key_range = self._key_ranges.pop()
487 continue
488 else:
489 break
491 for key, o in self._iter_key_range(
492 copy.deepcopy(self._current_key_range)):
495 self._current_key_range.advance(key)
496 yield o
497 self._current_key_range = None
499 def _iter_ns_range(self):
500 """Iterates over self._ns_range, delegating to self._iter_key_range()."""
501 while True:
502 if self._current_key_range is None:
503 query = self._ns_range.make_datastore_query()
504 namespace_result = query.Get(1)
505 if not namespace_result:
506 break
508 namespace = namespace_result[0].name() or ""
509 self._current_key_range = key_range.KeyRange(
510 namespace=namespace, _app=self._ns_range.app)
511 yield ALLOW_CHECKPOINT
513 for key, o in self._iter_key_range(
514 copy.deepcopy(self._current_key_range)):
517 self._current_key_range.advance(key)
518 yield o
520 if (self._ns_range.is_single_namespace or
521 self._current_key_range.namespace == self._ns_range.namespace_end):
522 break
523 self._ns_range = self._ns_range.with_start_after(
524 self._current_key_range.namespace)
525 self._current_key_range = None
527 def _iter_key_range(self, k_range):
528 """Yields a db.Key and the value that should be yielded by self.__iter__().
530 Args:
531 k_range: The key_range.KeyRange to iterate over.
533 Yields:
534 A 2-tuple containing the last db.Key processed and the value that should
535 be yielded by __iter__. The returned db.Key will be used to determine the
536 InputReader's current position in self._current_key_range.
538 raise NotImplementedError("_iter_key_range() not implemented in %s" %
539 self.__class__)
541 def __str__(self):
542 """Returns the string representation of this InputReader."""
543 if self._ns_range is None:
544 return repr(self._key_ranges)
545 else:
546 return repr(self._ns_range)
548 @classmethod
549 def _choose_split_points(cls, sorted_keys, shard_count):
550 """Returns the best split points given a random set of db.Keys."""
551 assert len(sorted_keys) >= shard_count
552 index_stride = len(sorted_keys) / float(shard_count)
553 return [sorted_keys[int(round(index_stride * i))]
554 for i in range(1, shard_count)]
558 @classmethod
559 def _split_input_from_namespace(cls, app, namespace, entity_kind,
560 shard_count):
561 """Return KeyRange objects. Helper for _split_input_from_params.
563 If there are not enough Entities to make all of the given shards, the
564 returned list of KeyRanges will include Nones. The returned list will
565 contain KeyRanges ordered lexographically with any Nones appearing at the
566 end.
569 raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
570 if shard_count == 1:
572 return [key_range.KeyRange(namespace=namespace, _app=app)]
574 ds_query = datastore.Query(kind=raw_entity_kind,
575 namespace=namespace,
576 _app=app,
577 keys_only=True)
578 ds_query.Order("__scatter__")
579 random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
581 if not random_keys:
584 return ([key_range.KeyRange(namespace=namespace, _app=app)] +
585 [None] * (shard_count - 1))
587 random_keys.sort()
589 if len(random_keys) >= shard_count:
591 random_keys = cls._choose_split_points(random_keys, shard_count)
593 key_ranges = []
595 key_ranges.append(key_range.KeyRange(
596 key_start=None,
597 key_end=random_keys[0],
598 direction=key_range.KeyRange.ASC,
599 include_start=False,
600 include_end=False,
601 namespace=namespace,
602 _app=app))
604 for i in range(0, len(random_keys) - 1):
605 key_ranges.append(key_range.KeyRange(
606 key_start=random_keys[i],
607 key_end=random_keys[i+1],
608 direction=key_range.KeyRange.ASC,
609 include_start=True,
610 include_end=False,
611 namespace=namespace,
612 _app=app))
614 key_ranges.append(key_range.KeyRange(
615 key_start=random_keys[-1],
616 key_end=None,
617 direction=key_range.KeyRange.ASC,
618 include_start=True,
619 include_end=False,
620 namespace=namespace,
621 _app=app))
623 if len(key_ranges) < shard_count:
625 key_ranges = key_ranges + [None] * (shard_count - len(key_ranges))
627 return key_ranges
629 @classmethod
630 def _split_input_from_params(cls, app, namespaces, entity_kind_name,
631 params, shard_count):
632 """Return input reader objects. Helper for split_input."""
633 key_ranges = []
634 for namespace in namespaces:
635 key_ranges.extend(
636 cls._split_input_from_namespace(app,
637 namespace,
638 entity_kind_name,
639 shard_count))
644 shared_ranges = [[] for _ in range(shard_count)]
645 for i, k_range in enumerate(key_ranges):
646 shared_ranges[i % shard_count].append(k_range)
647 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
649 return [cls(entity_kind_name,
650 key_ranges=key_ranges,
651 ns_range=None,
652 batch_size=batch_size)
653 for key_ranges in shared_ranges if key_ranges]
655 @classmethod
656 def validate(cls, mapper_spec):
657 """Validates mapper spec and all mapper parameters.
659 Args:
660 mapper_spec: The MapperSpec for this InputReader.
662 Raises:
663 BadReaderParamsError: required parameters are missing or invalid.
665 if mapper_spec.input_reader_class() != cls:
666 raise BadReaderParamsError("Input reader class mismatch")
667 params = _get_params(mapper_spec)
668 if cls.ENTITY_KIND_PARAM not in params:
669 raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
670 if cls.BATCH_SIZE_PARAM in params:
671 try:
672 batch_size = int(params[cls.BATCH_SIZE_PARAM])
673 if batch_size < 1:
674 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
675 except ValueError, e:
676 raise BadReaderParamsError("Bad batch size: %s" % e)
677 if cls.NAMESPACE_PARAM in params:
678 if not isinstance(params[cls.NAMESPACE_PARAM],
679 (str, unicode, type(None))):
680 raise BadReaderParamsError(
681 "Expected a single namespace string")
682 if cls.NAMESPACES_PARAM in params:
683 raise BadReaderParamsError("Multiple namespaces are no longer supported")
684 if cls.FILTERS_PARAM in params:
685 filters = params[cls.FILTERS_PARAM]
686 if not isinstance(filters, list):
687 raise BadReaderParamsError("Expected list for filters parameter")
688 for f in filters:
689 if not isinstance(f, (tuple, list)):
690 raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
691 if len(f) != 3:
692 raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
693 if not isinstance(f[0], basestring):
694 raise BadReaderParamsError("First element should be string: %s", f)
695 if f[1] != "=":
696 raise BadReaderParamsError(
697 "Only equality filters are supported: %s", f)
699 @classmethod
700 def split_input(cls, mapper_spec):
701 """Splits query into shards without fetching query results.
703 Tries as best as it can to split the whole query result set into equal
704 shards. Due to difficulty of making the perfect split, resulting shards'
705 sizes might differ significantly from each other.
707 Args:
708 mapper_spec: MapperSpec with params containing 'entity_kind'.
709 May have 'namespace' in the params as a string containing a single
710 namespace. If specified then the input reader will only yield values
711 in the given namespace. If 'namespace' is not given then values from
712 all namespaces will be yielded. May also have 'batch_size' in the params
713 to specify the number of entities to process in each batch.
715 Returns:
716 A list of InputReader objects. If the query results are empty then the
717 empty list will be returned. Otherwise, the list will always have a length
718 equal to number_of_shards but may be padded with Nones if there are too
719 few results for effective sharding.
721 params = _get_params(mapper_spec)
722 entity_kind_name = params[cls.ENTITY_KIND_PARAM]
723 batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
724 shard_count = mapper_spec.shard_count
725 namespace = params.get(cls.NAMESPACE_PARAM)
726 app = params.get(cls._APP_PARAM)
727 filters = params.get(cls.FILTERS_PARAM)
729 if namespace is None:
741 namespace_query = datastore.Query("__namespace__",
742 keys_only=True,
743 _app=app)
744 namespace_keys = namespace_query.Get(
745 limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
747 if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
748 ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
749 contiguous=True,
750 _app=app)
751 return [cls(entity_kind_name,
752 key_ranges=None,
753 ns_range=ns_range,
754 batch_size=batch_size,
755 filters=filters)
756 for ns_range in ns_ranges]
757 elif not namespace_keys:
758 return [cls(entity_kind_name,
759 key_ranges=None,
760 ns_range=namespace_range.NamespaceRange(),
761 batch_size=shard_count,
762 filters=filters)]
763 else:
764 namespaces = [namespace_key.name() or ""
765 for namespace_key in namespace_keys]
766 else:
767 namespaces = [namespace]
769 readers = cls._split_input_from_params(
770 app, namespaces, entity_kind_name, params, shard_count)
771 if filters:
772 for reader in readers:
773 reader._filters = filters
774 return readers
776 def to_json(self):
777 """Serializes all the data in this query range into json form.
779 Returns:
780 all the data in json-compatible map.
782 if self._key_ranges is None:
783 key_ranges_json = None
784 else:
785 key_ranges_json = []
786 for k in self._key_ranges:
787 if k:
788 key_ranges_json.append(k.to_json())
789 else:
790 key_ranges_json.append(None)
792 if self._ns_range is None:
793 namespace_range_json = None
794 else:
795 namespace_range_json = self._ns_range.to_json_object()
797 if self._current_key_range is None:
798 current_key_range_json = None
799 else:
800 current_key_range_json = self._current_key_range.to_json()
802 json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
803 self.NAMESPACE_RANGE_PARAM: namespace_range_json,
804 self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
805 self.ENTITY_KIND_PARAM: self._entity_kind,
806 self.BATCH_SIZE_PARAM: self._batch_size,
807 self.FILTERS_PARAM: self._filters}
808 return json_dict
810 @classmethod
811 def from_json(cls, json):
812 """Create new DatastoreInputReader from the json, encoded by to_json.
814 Args:
815 json: json map representation of DatastoreInputReader.
817 Returns:
818 an instance of DatastoreInputReader with all data deserialized from json.
820 if json[cls.KEY_RANGE_PARAM] is None:
821 key_ranges = None
822 else:
823 key_ranges = []
824 for k in json[cls.KEY_RANGE_PARAM]:
825 if k:
826 key_ranges.append(key_range.KeyRange.from_json(k))
827 else:
828 key_ranges.append(None)
830 if json[cls.NAMESPACE_RANGE_PARAM] is None:
831 ns_range = None
832 else:
833 ns_range = namespace_range.NamespaceRange.from_json_object(
834 json[cls.NAMESPACE_RANGE_PARAM])
836 if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
837 current_key_range = None
838 else:
839 current_key_range = key_range.KeyRange.from_json(
840 json[cls.CURRENT_KEY_RANGE_PARAM])
842 return cls(
843 json[cls.ENTITY_KIND_PARAM],
844 key_ranges,
845 ns_range,
846 json[cls.BATCH_SIZE_PARAM],
847 current_key_range,
848 filters=json.get(cls.FILTERS_PARAM))
851 class DatastoreInputReader(AbstractDatastoreInputReader):
852 """Represents a range in query results.
854 DatastoreInputReader yields model instances from the entities in a given key
855 range. Iterating over DatastoreInputReader changes its range past consumed
856 entries.
858 The class shouldn't be instantiated directly. Use the split_input class method
859 instead.
862 def _iter_key_range(self, k_range):
863 cursor = None
864 while True:
865 query = k_range.make_ascending_query(
866 util.for_name(self._entity_kind),
867 filters=self._filters)
868 if isinstance(query, db.Query):
870 if cursor:
871 query.with_cursor(cursor)
873 results = query.fetch(limit=self._batch_size)
874 if not results:
875 break
877 for model_instance in results:
878 key = model_instance.key()
879 yield key, model_instance
880 cursor = query.cursor()
881 else:
883 results, cursor, more = query.fetch_page(self._batch_size,
884 start_cursor=cursor)
885 for model_instance in results:
886 key = model_instance.key
887 yield key, model_instance
888 if not more:
889 break
891 @classmethod
892 def validate(cls, mapper_spec):
893 """Validates mapper spec and all mapper parameters.
895 Args:
896 mapper_spec: The MapperSpec for this InputReader.
898 Raises:
899 BadReaderParamsError: required parameters are missing or invalid.
901 super(DatastoreInputReader, cls).validate(mapper_spec)
902 params = _get_params(mapper_spec)
903 keys_only = util.parse_bool(params.get(cls.KEYS_ONLY_PARAM, False))
904 if keys_only:
905 raise BadReaderParamsError("The keys_only parameter is obsolete. "
906 "Use DatastoreKeyInputReader instead.")
908 entity_kind_name = params[cls.ENTITY_KIND_PARAM]
910 try:
911 util.for_name(entity_kind_name)
912 except ImportError, e:
913 raise BadReaderParamsError("Bad entity kind: %s" % e)
915 @classmethod
916 def _get_raw_entity_kind(cls, entity_kind):
917 """Returns an entity kind to use with datastore calls."""
918 entity_type = util.for_name(entity_kind)
919 if isinstance(entity_type, db.Model):
920 return entity_type.kind()
921 elif ndb and isinstance(entity_type, (ndb.Model, ndb.MetaModel)):
922 return entity_type._get_kind()
923 else:
924 return util.get_short_name(entity_kind)
927 class DatastoreKeyInputReader(AbstractDatastoreInputReader):
928 """An input reader which takes a Kind and yields Keys for that kind."""
930 def _iter_key_range(self, k_range):
931 raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
932 query = k_range.make_ascending_datastore_query(
933 raw_entity_kind, keys_only=True, filters=self._filters)
934 for key in query.Run(
935 config=datastore_query.QueryOptions(batch_size=self._batch_size)):
936 yield key, key
939 class DatastoreEntityInputReader(AbstractDatastoreInputReader):
940 """An input reader which yields low level datastore entities for a kind."""
942 def _iter_key_range(self, k_range):
943 raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
944 query = k_range.make_ascending_datastore_query(
945 raw_entity_kind, self._filters)
946 for entity in query.Run(
947 config=datastore_query.QueryOptions(batch_size=self._batch_size)):
948 yield entity.key(), entity
951 class BlobstoreLineInputReader(InputReader):
952 """Input reader for a newline delimited blob in Blobstore."""
955 _BLOB_BUFFER_SIZE = 64000
958 _MAX_SHARD_COUNT = 256
961 _MAX_BLOB_KEYS_COUNT = 246
964 BLOB_KEYS_PARAM = "blob_keys"
967 INITIAL_POSITION_PARAM = "initial_position"
968 END_POSITION_PARAM = "end_position"
969 BLOB_KEY_PARAM = "blob_key"
971 def __init__(self, blob_key, start_position, end_position):
972 """Initializes this instance with the given blob key and character range.
974 This BlobstoreInputReader will read from the first record starting after
975 strictly after start_position until the first record ending at or after
976 end_position (exclusive). As an exception, if start_position is 0, then
977 this InputReader starts reading at the first record.
979 Args:
980 blob_key: the BlobKey that this input reader is processing.
981 start_position: the position to start reading at.
982 end_position: a position in the last record to read.
984 self._blob_key = blob_key
985 self._blob_reader = blobstore.BlobReader(blob_key,
986 self._BLOB_BUFFER_SIZE,
987 start_position)
988 self._end_position = end_position
989 self._has_iterated = False
990 self._read_before_start = bool(start_position)
992 def next(self):
993 """Returns the next input from as an (offset, line) tuple."""
994 self._has_iterated = True
996 if self._read_before_start:
997 self._blob_reader.readline()
998 self._read_before_start = False
999 start_position = self._blob_reader.tell()
1001 if start_position > self._end_position:
1002 raise StopIteration()
1004 line = self._blob_reader.readline()
1006 if not line:
1007 raise StopIteration()
1009 return start_position, line.rstrip("\n")
1011 def to_json(self):
1012 """Returns an json-compatible input shard spec for remaining inputs."""
1013 new_pos = self._blob_reader.tell()
1014 if self._has_iterated:
1015 new_pos -= 1
1016 return {self.BLOB_KEY_PARAM: self._blob_key,
1017 self.INITIAL_POSITION_PARAM: new_pos,
1018 self.END_POSITION_PARAM: self._end_position}
1020 def __str__(self):
1021 """Returns the string representation of this BlobstoreLineInputReader."""
1022 return "blobstore.BlobKey(%r):[%d, %d]" % (
1023 self._blob_key, self._blob_reader.tell(), self._end_position)
1025 @classmethod
1026 def from_json(cls, json):
1027 """Instantiates an instance of this InputReader for the given shard spec."""
1028 return cls(json[cls.BLOB_KEY_PARAM],
1029 json[cls.INITIAL_POSITION_PARAM],
1030 json[cls.END_POSITION_PARAM])
1032 @classmethod
1033 def validate(cls, mapper_spec):
1034 """Validates mapper spec and all mapper parameters.
1036 Args:
1037 mapper_spec: The MapperSpec for this InputReader.
1039 Raises:
1040 BadReaderParamsError: required parameters are missing or invalid.
1042 if mapper_spec.input_reader_class() != cls:
1043 raise BadReaderParamsError("Mapper input reader class mismatch")
1044 params = _get_params(mapper_spec)
1045 if cls.BLOB_KEYS_PARAM not in params:
1046 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1047 blob_keys = params[cls.BLOB_KEYS_PARAM]
1048 if isinstance(blob_keys, basestring):
1051 blob_keys = blob_keys.split(",")
1052 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1053 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1054 if not blob_keys:
1055 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1056 for blob_key in blob_keys:
1057 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1058 if not blob_info:
1059 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1060 blob_key)
1062 @classmethod
1063 def split_input(cls, mapper_spec):
1064 """Returns a list of shard_count input_spec_shards for input_spec.
1066 Args:
1067 mapper_spec: The mapper specification to split from. Must contain
1068 'blob_keys' parameter with one or more blob keys.
1070 Returns:
1071 A list of BlobstoreInputReaders corresponding to the specified shards.
1073 params = _get_params(mapper_spec)
1074 blob_keys = params[cls.BLOB_KEYS_PARAM]
1075 if isinstance(blob_keys, basestring):
1078 blob_keys = blob_keys.split(",")
1080 blob_sizes = {}
1081 for blob_key in blob_keys:
1082 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1083 blob_sizes[blob_key] = blob_info.size
1085 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1086 shards_per_blob = shard_count // len(blob_keys)
1087 if shards_per_blob == 0:
1088 shards_per_blob = 1
1090 chunks = []
1091 for blob_key, blob_size in blob_sizes.items():
1092 blob_chunk_size = blob_size // shards_per_blob
1093 for i in xrange(shards_per_blob - 1):
1094 chunks.append(BlobstoreLineInputReader.from_json(
1095 {cls.BLOB_KEY_PARAM: blob_key,
1096 cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
1097 cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
1098 chunks.append(BlobstoreLineInputReader.from_json(
1099 {cls.BLOB_KEY_PARAM: blob_key,
1100 cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
1101 cls.END_POSITION_PARAM: blob_size}))
1102 return chunks
1105 class BlobstoreZipInputReader(InputReader):
1106 """Input reader for files from a zip archive stored in the Blobstore.
1108 Each instance of the reader will read the TOC, from the end of the zip file,
1109 and then only the contained files which it is responsible for.
1113 _MAX_SHARD_COUNT = 256
1116 BLOB_KEY_PARAM = "blob_key"
1117 START_INDEX_PARAM = "start_index"
1118 END_INDEX_PARAM = "end_index"
1120 def __init__(self, blob_key, start_index, end_index,
1121 _reader=blobstore.BlobReader):
1122 """Initializes this instance with the given blob key and file range.
1124 This BlobstoreZipInputReader will read from the file with index start_index
1125 up to but not including the file with index end_index.
1127 Args:
1128 blob_key: the BlobKey that this input reader is processing.
1129 start_index: the index of the first file to read.
1130 end_index: the index of the first file that will not be read.
1131 _reader: a callable that returns a file-like object for reading blobs.
1132 Used for dependency injection.
1134 self._blob_key = blob_key
1135 self._start_index = start_index
1136 self._end_index = end_index
1137 self._reader = _reader
1138 self._zip = None
1139 self._entries = None
1141 def next(self):
1142 """Returns the next input from this input reader as (ZipInfo, opener) tuple.
1144 Returns:
1145 The next input from this input reader, in the form of a 2-tuple.
1146 The first element of the tuple is a zipfile.ZipInfo object.
1147 The second element of the tuple is a zero-argument function that, when
1148 called, returns the complete body of the file.
1150 if not self._zip:
1151 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1153 self._entries = self._zip.infolist()[self._start_index:self._end_index]
1154 self._entries.reverse()
1155 if not self._entries:
1156 raise StopIteration()
1157 entry = self._entries.pop()
1158 self._start_index += 1
1159 return (entry, lambda: self._read(entry))
1161 def _read(self, entry):
1162 """Read entry content.
1164 Args:
1165 entry: zip file entry as zipfile.ZipInfo.
1166 Returns:
1167 Entry content as string.
1169 start_time = time.time()
1170 content = self._zip.read(entry.filename)
1172 ctx = context.get()
1173 if ctx:
1174 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1175 operation.counters.Increment(
1176 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1178 return content
1180 @classmethod
1181 def from_json(cls, json):
1182 """Creates an instance of the InputReader for the given input shard state.
1184 Args:
1185 json: The InputReader state as a dict-like object.
1187 Returns:
1188 An instance of the InputReader configured using the values of json.
1190 return cls(json[cls.BLOB_KEY_PARAM],
1191 json[cls.START_INDEX_PARAM],
1192 json[cls.END_INDEX_PARAM])
1194 def to_json(self):
1195 """Returns an input shard state for the remaining inputs.
1197 Returns:
1198 A json-izable version of the remaining InputReader.
1200 return {self.BLOB_KEY_PARAM: self._blob_key,
1201 self.START_INDEX_PARAM: self._start_index,
1202 self.END_INDEX_PARAM: self._end_index}
1204 def __str__(self):
1205 """Returns the string representation of this BlobstoreZipInputReader."""
1206 return "blobstore.BlobKey(%r):[%d, %d]" % (
1207 self._blob_key, self._start_index, self._end_index)
1209 @classmethod
1210 def validate(cls, mapper_spec):
1211 """Validates mapper spec and all mapper parameters.
1213 Args:
1214 mapper_spec: The MapperSpec for this InputReader.
1216 Raises:
1217 BadReaderParamsError: required parameters are missing or invalid.
1219 if mapper_spec.input_reader_class() != cls:
1220 raise BadReaderParamsError("Mapper input reader class mismatch")
1221 params = _get_params(mapper_spec)
1222 if cls.BLOB_KEY_PARAM not in params:
1223 raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
1224 blob_key = params[cls.BLOB_KEY_PARAM]
1225 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1226 if not blob_info:
1227 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1228 blob_key)
1231 @classmethod
1232 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1233 """Returns a list of input shard states for the input spec.
1235 Args:
1236 mapper_spec: The MapperSpec for this InputReader. Must contain
1237 'blob_key' parameter with one blob key.
1238 _reader: a callable that returns a file-like object for reading blobs.
1239 Used for dependency injection.
1241 Returns:
1242 A list of InputReaders spanning files within the zip.
1244 params = _get_params(mapper_spec)
1245 blob_key = params[cls.BLOB_KEY_PARAM]
1246 zip_input = zipfile.ZipFile(_reader(blob_key))
1247 files = zip_input.infolist()
1248 total_size = sum(x.file_size for x in files)
1249 num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
1250 size_per_shard = total_size // num_shards
1254 shard_start_indexes = [0]
1255 current_shard_size = 0
1256 for i, fileinfo in enumerate(files):
1257 current_shard_size += fileinfo.file_size
1258 if current_shard_size >= size_per_shard:
1259 shard_start_indexes.append(i + 1)
1260 current_shard_size = 0
1262 if shard_start_indexes[-1] != len(files):
1263 shard_start_indexes.append(len(files))
1265 return [cls(blob_key, start_index, end_index, _reader)
1266 for start_index, end_index
1267 in zip(shard_start_indexes, shard_start_indexes[1:])]
1270 class BlobstoreZipLineInputReader(InputReader):
1271 """Input reader for newline delimited files in zip archives from Blobstore.
1273 This has the same external interface as the BlobstoreLineInputReader, in that
1274 it takes a list of blobs as its input and yields lines to the reader.
1275 However the blobs themselves are expected to be zip archives of line delimited
1276 files instead of the files themselves.
1278 This is useful as many line delimited files gain greatly from compression.
1282 _MAX_SHARD_COUNT = 256
1285 _MAX_BLOB_KEYS_COUNT = 246
1288 BLOB_KEYS_PARAM = "blob_keys"
1291 BLOB_KEY_PARAM = "blob_key"
1292 START_FILE_INDEX_PARAM = "start_file_index"
1293 END_FILE_INDEX_PARAM = "end_file_index"
1294 OFFSET_PARAM = "offset"
1296 def __init__(self, blob_key, start_file_index, end_file_index, offset,
1297 _reader=blobstore.BlobReader):
1298 """Initializes this instance with the given blob key and file range.
1300 This BlobstoreZipLineInputReader will read from the file with index
1301 start_file_index up to but not including the file with index end_file_index.
1302 It will return lines starting at offset within file[start_file_index]
1304 Args:
1305 blob_key: the BlobKey that this input reader is processing.
1306 start_file_index: the index of the first file to read within the zip.
1307 end_file_index: the index of the first file that will not be read.
1308 offset: the byte offset within blob_key.zip[start_file_index] to start
1309 reading. The reader will continue to the end of the file.
1310 _reader: a callable that returns a file-like object for reading blobs.
1311 Used for dependency injection.
1313 self._blob_key = blob_key
1314 self._start_file_index = start_file_index
1315 self._end_file_index = end_file_index
1316 self._initial_offset = offset
1317 self._reader = _reader
1318 self._zip = None
1319 self._entries = None
1320 self._filestream = None
1322 @classmethod
1323 def validate(cls, mapper_spec):
1324 """Validates mapper spec and all mapper parameters.
1326 Args:
1327 mapper_spec: The MapperSpec for this InputReader.
1329 Raises:
1330 BadReaderParamsError: required parameters are missing or invalid.
1332 if mapper_spec.input_reader_class() != cls:
1333 raise BadReaderParamsError("Mapper input reader class mismatch")
1334 params = _get_params(mapper_spec)
1335 if cls.BLOB_KEYS_PARAM not in params:
1336 raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
1338 blob_keys = params[cls.BLOB_KEYS_PARAM]
1339 if isinstance(blob_keys, basestring):
1342 blob_keys = blob_keys.split(",")
1343 if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
1344 raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
1345 if not blob_keys:
1346 raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
1347 for blob_key in blob_keys:
1348 blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
1349 if not blob_info:
1350 raise BadReaderParamsError("Could not find blobinfo for key %s" %
1351 blob_key)
1353 @classmethod
1354 def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
1355 """Returns a list of input readers for the input spec.
1357 Args:
1358 mapper_spec: The MapperSpec for this InputReader. Must contain
1359 'blob_keys' parameter with one or more blob keys.
1360 _reader: a callable that returns a file-like object for reading blobs.
1361 Used for dependency injection.
1363 Returns:
1364 A list of InputReaders spanning the subfiles within the blobs.
1365 There will be at least one reader per blob, but it will otherwise
1366 attempt to keep the expanded size even.
1368 params = _get_params(mapper_spec)
1369 blob_keys = params[cls.BLOB_KEYS_PARAM]
1370 if isinstance(blob_keys, basestring):
1373 blob_keys = blob_keys.split(",")
1375 blob_files = {}
1376 total_size = 0
1377 for blob_key in blob_keys:
1378 zip_input = zipfile.ZipFile(_reader(blob_key))
1379 blob_files[blob_key] = zip_input.infolist()
1380 total_size += sum(x.file_size for x in blob_files[blob_key])
1382 shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
1388 size_per_shard = total_size // shard_count
1390 readers = []
1391 for blob_key in blob_keys:
1392 files = blob_files[blob_key]
1393 current_shard_size = 0
1394 start_file_index = 0
1395 next_file_index = 0
1396 for fileinfo in files:
1397 next_file_index += 1
1398 current_shard_size += fileinfo.file_size
1399 if current_shard_size >= size_per_shard:
1400 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1401 _reader))
1402 current_shard_size = 0
1403 start_file_index = next_file_index
1404 if current_shard_size != 0:
1405 readers.append(cls(blob_key, start_file_index, next_file_index, 0,
1406 _reader))
1408 return readers
1410 def next(self):
1411 """Returns the next line from this input reader as (lineinfo, line) tuple.
1413 Returns:
1414 The next input from this input reader, in the form of a 2-tuple.
1415 The first element of the tuple describes the source, it is itself
1416 a tuple (blobkey, filenumber, byteoffset).
1417 The second element of the tuple is the line found at that offset.
1419 if not self._filestream:
1420 if not self._zip:
1421 self._zip = zipfile.ZipFile(self._reader(self._blob_key))
1423 self._entries = self._zip.infolist()[self._start_file_index:
1424 self._end_file_index]
1425 self._entries.reverse()
1426 if not self._entries:
1427 raise StopIteration()
1428 entry = self._entries.pop()
1429 value = self._zip.read(entry.filename)
1430 self._filestream = StringIO.StringIO(value)
1431 if self._initial_offset:
1432 self._filestream.seek(self._initial_offset)
1433 self._filestream.readline()
1435 start_position = self._filestream.tell()
1436 line = self._filestream.readline()
1438 if not line:
1440 self._filestream.close()
1441 self._filestream = None
1442 self._start_file_index += 1
1443 self._initial_offset = 0
1444 return self.next()
1446 return ((self._blob_key, self._start_file_index, start_position),
1447 line.rstrip("\n"))
1449 def _next_offset(self):
1450 """Return the offset of the next line to read."""
1451 if self._filestream:
1452 offset = self._filestream.tell()
1453 if offset:
1454 offset -= 1
1455 else:
1456 offset = self._initial_offset
1458 return offset
1460 def to_json(self):
1461 """Returns an input shard state for the remaining inputs.
1463 Returns:
1464 A json-izable version of the remaining InputReader.
1467 return {self.BLOB_KEY_PARAM: self._blob_key,
1468 self.START_FILE_INDEX_PARAM: self._start_file_index,
1469 self.END_FILE_INDEX_PARAM: self._end_file_index,
1470 self.OFFSET_PARAM: self._next_offset()}
1472 @classmethod
1473 def from_json(cls, json, _reader=blobstore.BlobReader):
1474 """Creates an instance of the InputReader for the given input shard state.
1476 Args:
1477 json: The InputReader state as a dict-like object.
1478 _reader: For dependency injection.
1480 Returns:
1481 An instance of the InputReader configured using the values of json.
1483 return cls(json[cls.BLOB_KEY_PARAM],
1484 json[cls.START_FILE_INDEX_PARAM],
1485 json[cls.END_FILE_INDEX_PARAM],
1486 json[cls.OFFSET_PARAM],
1487 _reader)
1489 def __str__(self):
1490 """Returns the string representation of this reader.
1492 Returns:
1493 string blobkey:[start file num, end file num]:current offset.
1495 return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
1496 self._blob_key, self._start_file_index, self._end_file_index,
1497 self._next_offset())
1500 class RandomStringInputReader(InputReader):
1501 """RandomStringInputReader generates random strings as output.
1503 Primary usage is to populate output with testing entries.
1507 COUNT = "count"
1509 STRING_LENGTH = "string_length"
1511 DEFAULT_STRING_LENGTH = 10
1513 def __init__(self, count, string_length):
1514 """Initialize input reader.
1516 Args:
1517 count: number of entries this shard should generate.
1518 string_length: the length of generated random strings.
1520 self._count = count
1521 self._string_length = string_length
1523 def __iter__(self):
1524 ctx = context.get()
1526 while self._count:
1527 self._count -= 1
1528 start_time = time.time()
1529 content = "".join(random.choice(string.ascii_lowercase)
1530 for _ in range(self._string_length))
1531 if ctx:
1532 operation.counters.Increment(
1533 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1534 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
1535 yield content
1537 @classmethod
1538 def split_input(cls, mapper_spec):
1539 params = _get_params(mapper_spec)
1540 count = params[cls.COUNT]
1541 string_length = cls.DEFAULT_STRING_LENGTH
1542 if cls.STRING_LENGTH in params:
1543 string_length = params[cls.STRING_LENGTH]
1545 shard_count = mapper_spec.shard_count
1546 count_per_shard = count // shard_count
1548 mr_input_readers = [
1549 cls(count_per_shard, string_length) for _ in range(shard_count)]
1551 left = count - count_per_shard*shard_count
1552 if left > 0:
1553 mr_input_readers.append(cls(left, string_length))
1555 return mr_input_readers
1557 @classmethod
1558 def validate(cls, mapper_spec):
1559 if mapper_spec.input_reader_class() != cls:
1560 raise BadReaderParamsError("Mapper input reader class mismatch")
1562 params = _get_params(mapper_spec)
1563 if cls.COUNT not in params:
1564 raise BadReaderParamsError("Must specify %s" % cls.COUNT)
1565 if not isinstance(params[cls.COUNT], int):
1566 raise BadReaderParamsError("%s should be an int but is %s" %
1567 (cls.COUNT, type(params[cls.COUNT])))
1568 if params[cls.COUNT] <= 0:
1569 raise BadReaderParamsError("%s should be a positive int")
1570 if cls.STRING_LENGTH in params and not (
1571 isinstance(params[cls.STRING_LENGTH], int) and
1572 params[cls.STRING_LENGTH] > 0):
1573 raise BadReaderParamsError("%s should be a positive int but is %s" %
1574 (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
1575 if (not isinstance(mapper_spec.shard_count, int) or
1576 mapper_spec.shard_count <= 0):
1577 raise BadReaderParamsError(
1578 "shard_count should be a positive int but is %s" %
1579 mapper_spec.shard_count)
1581 @classmethod
1582 def from_json(cls, json):
1583 return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
1585 def to_json(self):
1586 return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
1589 class ConsistentKeyReader(DatastoreKeyInputReader):
1590 """A key reader which reads consistent data from datastore.
1592 Datastore might have entities which were written, but not visible through
1593 queries for some time. Typically these entities can be only read inside
1594 transaction until they are 'applied'.
1596 This reader reads all keys even if they are not visible. It might take
1597 significant time to start yielding some data because it has to apply all
1598 modifications created before its start.
1600 START_TIME_US_PARAM = "start_time_us"
1601 UNAPPLIED_LOG_FILTER = "__unapplied_log_timestamp_us__ <"
1602 DUMMY_KIND = "DUMMY_KIND"
1603 DUMMY_ID = 106275677020293L
1604 UNAPPLIED_QUERY_DEADLINE = 270
1606 def _get_unapplied_jobs_accross_namespaces(self,
1607 namespace_start,
1608 namespace_end,
1609 app):
1610 filters = {"__key__ >=": db.Key.from_path("__namespace__",
1611 namespace_start or 1,
1612 _app=app),
1613 "__key__ <=": db.Key.from_path("__namespace__",
1614 namespace_end or 1,
1615 _app=app),
1616 self.UNAPPLIED_LOG_FILTER: self.start_time_us}
1617 unapplied_query = datastore.Query(filters=filters, keys_only=True, _app=app)
1618 return unapplied_query.Get(
1619 limit=self._batch_size,
1620 config=datastore_rpc.Configuration(
1621 deadline=self.UNAPPLIED_QUERY_DEADLINE))
1623 def _iter_ns_range(self):
1624 while True:
1625 unapplied_jobs = self._get_unapplied_jobs_accross_namespaces(
1626 self._ns_range.namespace_start,
1627 self._ns_range.namespace_end,
1628 self._ns_range.app)
1630 if not unapplied_jobs:
1631 break
1633 self._apply_jobs(unapplied_jobs)
1635 for o in super(ConsistentKeyReader, self)._iter_ns_range():
1636 yield o
1638 def _iter_key_range(self, k_range):
1639 assert hasattr(self, "start_time_us"), "start_time_us property was not set"
1640 if self._ns_range is None:
1643 self._apply_key_range(k_range)
1645 for o in super(ConsistentKeyReader, self)._iter_key_range(k_range):
1646 yield o
1648 def _apply_key_range(self, k_range):
1649 """Apply all jobs in the given KeyRange."""
1655 apply_range = copy.deepcopy(k_range)
1656 while True:
1660 unapplied_query = self._make_unapplied_query(apply_range)
1661 unapplied_jobs = unapplied_query.Get(
1662 limit=self._batch_size,
1663 config=datastore_rpc.Configuration(
1664 deadline=self.UNAPPLIED_QUERY_DEADLINE))
1665 if not unapplied_jobs:
1666 break
1667 self._apply_jobs(unapplied_jobs)
1670 apply_range.advance(unapplied_jobs[-1])
1672 def _make_unapplied_query(self, k_range):
1673 """Returns a datastore.Query that finds the unapplied keys in k_range."""
1674 unapplied_query = k_range.make_ascending_datastore_query(
1675 kind=None, keys_only=True)
1676 unapplied_query[
1677 ConsistentKeyReader.UNAPPLIED_LOG_FILTER] = self.start_time_us
1678 return unapplied_query
1680 def _apply_jobs(self, unapplied_jobs):
1681 """Apply all jobs implied by the given keys."""
1683 keys_to_apply = []
1684 for key in unapplied_jobs:
1687 path = key.to_path() + [ConsistentKeyReader.DUMMY_KIND,
1688 ConsistentKeyReader.DUMMY_ID]
1689 keys_to_apply.append(
1690 db.Key.from_path(_app=key.app(), namespace=key.namespace(), *path))
1691 db.get(keys_to_apply, config=datastore_rpc.Configuration(
1692 deadline=self.UNAPPLIED_QUERY_DEADLINE,
1693 read_policy=datastore_rpc.Configuration.APPLY_ALL_JOBS_CONSISTENCY))
1695 @classmethod
1696 def _split_input_from_namespace(cls,
1697 app,
1698 namespace,
1699 entity_kind_name,
1700 shard_count):
1701 key_ranges = super(ConsistentKeyReader, cls)._split_input_from_namespace(
1702 app, namespace, entity_kind_name, shard_count)
1703 assert len(key_ranges) == shard_count
1708 try:
1709 last_key_range_index = key_ranges.index(None) - 1
1710 except ValueError:
1711 last_key_range_index = shard_count - 1
1713 if last_key_range_index != -1:
1714 key_ranges[0].key_start = None
1715 key_ranges[0].include_start = False
1716 key_ranges[last_key_range_index].key_end = None
1717 key_ranges[last_key_range_index].include_end = False
1718 return key_ranges
1720 @classmethod
1721 def _split_input_from_params(cls, app, namespaces, entity_kind_name,
1722 params, shard_count):
1723 readers = super(ConsistentKeyReader, cls)._split_input_from_params(
1724 app,
1725 namespaces,
1726 entity_kind_name,
1727 params,
1728 shard_count)
1732 if not readers:
1733 readers = [cls(entity_kind_name,
1734 key_ranges=None,
1735 ns_range=namespace_range.NamespaceRange(),
1736 batch_size=shard_count)]
1738 return readers
1740 @classmethod
1741 def split_input(cls, mapper_spec):
1742 """Splits input into key ranges."""
1743 readers = super(ConsistentKeyReader, cls).split_input(mapper_spec)
1744 start_time_us = _get_params(mapper_spec).get(
1745 cls.START_TIME_US_PARAM, long(time.time() * 1e6))
1746 for reader in readers:
1747 reader.start_time_us = start_time_us
1748 return readers
1750 def to_json(self):
1751 """Serializes all the data in this reader into json form.
1753 Returns:
1754 all the data in json-compatible map.
1756 json_dict = super(DatastoreKeyInputReader, self).to_json()
1757 json_dict[self.START_TIME_US_PARAM] = self.start_time_us
1758 return json_dict
1760 @classmethod
1761 def from_json(cls, json):
1762 """Create new ConsistentKeyReader from the json, encoded by to_json.
1764 Args:
1765 json: json map representation of ConsistentKeyReader.
1767 Returns:
1768 an instance of ConsistentKeyReader with all data deserialized from json.
1770 reader = super(ConsistentKeyReader, cls).from_json(json)
1771 reader.start_time_us = json[cls.START_TIME_US_PARAM]
1772 return reader
1781 class NamespaceInputReader(InputReader):
1782 """An input reader to iterate over namespaces.
1784 This reader yields namespace names as string.
1785 It will always produce only one shard.
1788 NAMESPACE_RANGE_PARAM = "namespace_range"
1789 BATCH_SIZE_PARAM = "batch_size"
1790 _BATCH_SIZE = 10
1792 def __init__(self, ns_range, batch_size = _BATCH_SIZE):
1793 self.ns_range = ns_range
1794 self._batch_size = batch_size
1796 def to_json(self):
1797 """Serializes all the data in this query range into json form.
1799 Returns:
1800 all the data in json-compatible map.
1802 return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
1803 self.BATCH_SIZE_PARAM: self._batch_size}
1805 @classmethod
1806 def from_json(cls, json):
1807 """Create new DatastoreInputReader from the json, encoded by to_json.
1809 Args:
1810 json: json map representation of DatastoreInputReader.
1812 Returns:
1813 an instance of DatastoreInputReader with all data deserialized from json.
1815 return cls(
1816 namespace_range.NamespaceRange.from_json_object(
1817 json[cls.NAMESPACE_RANGE_PARAM]),
1818 json[cls.BATCH_SIZE_PARAM])
1820 @classmethod
1821 def validate(cls, mapper_spec):
1822 """Validates mapper spec.
1824 Args:
1825 mapper_spec: The MapperSpec for this InputReader.
1827 Raises:
1828 BadReaderParamsError: required parameters are missing or invalid.
1830 if mapper_spec.input_reader_class() != cls:
1831 raise BadReaderParamsError("Input reader class mismatch")
1832 params = _get_params(mapper_spec)
1833 if cls.BATCH_SIZE_PARAM in params:
1834 try:
1835 batch_size = int(params[cls.BATCH_SIZE_PARAM])
1836 if batch_size < 1:
1837 raise BadReaderParamsError("Bad batch size: %s" % batch_size)
1838 except ValueError, e:
1839 raise BadReaderParamsError("Bad batch size: %s" % e)
1841 @classmethod
1842 def split_input(cls, mapper_spec):
1843 """Returns a list of input readers for the input spec.
1845 Args:
1846 mapper_spec: The MapperSpec for this InputReader.
1848 Returns:
1849 A list of InputReaders.
1851 batch_size = int(_get_params(mapper_spec).get(
1852 cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
1853 shard_count = mapper_spec.shard_count
1854 namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
1855 contiguous=True)
1856 return [NamespaceInputReader(ns_range, batch_size)
1857 for ns_range in namespace_ranges]
1859 def __iter__(self):
1860 while True:
1861 keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
1862 if not keys:
1863 break
1865 for key in keys:
1866 namespace = metadata.Namespace.key_to_namespace(key)
1867 self.ns_range = self.ns_range.with_start_after(namespace)
1868 yield namespace
1870 def __str__(self):
1871 return repr(self.ns_range)
1874 class RecordsReader(InputReader):
1875 """Reader to read a list of Files API file in records format.
1877 The number of input shards can be specified by the SHARDS_PARAM
1878 mapper parameter. Input files cannot be split, so there will be at most
1879 one shard per file. Also the number of shards will not be reduced based on
1880 the number of input files, so shards in always equals shards out.
1883 FILE_PARAM = "file"
1884 FILES_PARAM = "files"
1886 def __init__(self, filenames, position):
1887 """Constructor.
1889 Args:
1890 filenames: list of filenames.
1891 position: file position to start reading from as int.
1893 self._filenames = filenames
1894 if self._filenames:
1895 self._reader = records.RecordsReader(
1896 files.BufferedFile(self._filenames[0]))
1897 self._reader.seek(position)
1898 else:
1899 self._reader = None
1901 def __iter__(self):
1902 """Iterate over records in file.
1904 Yields records as strings.
1906 ctx = context.get()
1908 while self._reader:
1909 try:
1910 start_time = time.time()
1911 record = self._reader.read()
1912 if ctx:
1913 operation.counters.Increment(
1914 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
1915 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(record))(ctx)
1916 yield record
1917 except (files.ExistenceError), e:
1918 raise errors.FailJobError("ExistenceError: %s" % e)
1919 except (files.UnknownError), e:
1920 raise errors.RetrySliceError("UnknownError: %s" % e)
1921 except EOFError:
1922 self._filenames.pop(0)
1923 if not self._filenames:
1924 self._reader = None
1925 else:
1926 self._reader = records.RecordsReader(
1927 files.BufferedFile(self._filenames[0]))
1929 @classmethod
1930 def from_json(cls, json):
1931 """Creates an instance of the InputReader for the given input shard state.
1933 Args:
1934 json: The InputReader state as a dict-like object.
1936 Returns:
1937 An instance of the InputReader configured using the values of json.
1939 return cls(json["filenames"], json["position"])
1941 def to_json(self):
1942 """Returns an input shard state for the remaining inputs.
1944 Returns:
1945 A json-izable version of the remaining InputReader.
1947 result = {
1948 "filenames": self._filenames,
1949 "position": 0,
1951 if self._reader:
1952 result["position"] = self._reader.tell()
1953 return result
1955 @classmethod
1956 def split_input(cls, mapper_spec):
1957 """Returns a list of input readers for the input spec.
1959 Args:
1960 mapper_spec: The MapperSpec for this InputReader.
1962 Returns:
1963 A list of InputReaders.
1965 params = _get_params(mapper_spec)
1966 shard_count = mapper_spec.shard_count
1968 if cls.FILES_PARAM in params:
1969 filenames = params[cls.FILES_PARAM]
1970 if isinstance(filenames, basestring):
1971 filenames = filenames.split(",")
1972 else:
1973 filenames = [params[cls.FILE_PARAM]]
1975 batch_list = [[] for _ in xrange(shard_count)]
1976 for index, filename in enumerate(filenames):
1978 batch_list[index % shard_count].append(filenames[index])
1981 batch_list.sort(reverse=True, key=lambda x: len(x))
1982 return [cls(batch, 0) for batch in batch_list]
1984 @classmethod
1985 def validate(cls, mapper_spec):
1986 """Validates mapper spec and all mapper parameters.
1988 Args:
1989 mapper_spec: The MapperSpec for this InputReader.
1991 Raises:
1992 BadReaderParamsError: required parameters are missing or invalid.
1994 if mapper_spec.input_reader_class() != cls:
1995 raise errors.BadReaderParamsError("Input reader class mismatch")
1996 params = _get_params(mapper_spec)
1997 if (cls.FILES_PARAM not in params and
1998 cls.FILE_PARAM not in params):
1999 raise BadReaderParamsError(
2000 "Must specify '%s' or '%s' parameter for mapper input" %
2001 (cls.FILES_PARAM, cls.FILE_PARAM))
2003 def __str__(self):
2004 position = 0
2005 if self._reader:
2006 position = self._reader.tell()
2007 return "%s:%s" % (self._filenames, position)
2010 class LogInputReader(InputReader):
2011 """Input reader for a time range of logs via the Logs Reader API.
2013 The number of input shards may be specified by the SHARDS_PARAM mapper
2014 parameter. A starting and ending time (in seconds since the Unix epoch) are
2015 required to generate time ranges over which to shard the input.
2018 START_TIME_PARAM = "start_time"
2019 END_TIME_PARAM = "end_time"
2020 MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
2021 INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
2022 INCLUDE_APP_LOGS_PARAM = "include_app_logs"
2023 VERSION_IDS_PARAM = "version_ids"
2026 _OFFSET_PARAM = "offset"
2027 _PROTOTYPE_REQUEST_PARAM = "prototype_request"
2029 _PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
2030 MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
2031 INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
2032 _PROTOTYPE_REQUEST_PARAM])
2033 _KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
2035 def __init__(self,
2036 start_time=None,
2037 end_time=None,
2038 minimum_log_level=None,
2039 include_incomplete=False,
2040 include_app_logs=False,
2041 version_ids=None,
2042 **kwargs):
2043 """Constructor.
2045 Args:
2046 start_time: The earliest request completion or last-update time of logs
2047 that should be mapped over, in seconds since the Unix epoch.
2048 end_time: The latest request completion or last-update time that logs
2049 should be mapped over, in seconds since the Unix epoch.
2050 minimum_log_level: An application log level which serves as a filter on
2051 the requests mapped over--requests with no application log at or above
2052 the specified level will be omitted, even if include_app_logs is False.
2053 include_incomplete: Whether or not to include requests that have started
2054 but not yet finished, as a boolean. Defaults to False.
2055 include_app_logs: Whether or not to include application level logs in the
2056 mapped logs, as a boolean. Defaults to False.
2057 version_ids: A list of version ids whose logs should be mapped against.
2059 InputReader.__init__(self)
2063 self.__params = dict(kwargs)
2065 if start_time is not None:
2066 self.__params[self.START_TIME_PARAM] = start_time
2067 if end_time is not None:
2068 self.__params[self.END_TIME_PARAM] = end_time
2069 if minimum_log_level is not None:
2070 self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
2071 if include_incomplete is not None:
2072 self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
2073 if include_app_logs is not None:
2074 self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
2075 if version_ids:
2076 self.__params[self.VERSION_IDS_PARAM] = version_ids
2079 if self._PROTOTYPE_REQUEST_PARAM in self.__params:
2080 prototype_request = log_service_pb.LogReadRequest(
2081 self.__params[self._PROTOTYPE_REQUEST_PARAM])
2082 self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
2084 def __iter__(self):
2085 """Iterates over logs in a given range of time.
2087 Yields:
2088 A RequestLog containing all the information for a single request.
2090 for log in logservice.fetch(**self.__params):
2091 self.__params[self._OFFSET_PARAM] = log.offset
2092 yield log
2094 @classmethod
2095 def from_json(cls, json):
2096 """Creates an instance of the InputReader for the given input shard's state.
2098 Args:
2099 json: The InputReader state as a dict-like object.
2101 Returns:
2102 An instance of the InputReader configured using the given JSON parameters.
2105 params = dict((str(k), v) for k, v in json.iteritems()
2106 if k in cls._PARAMS)
2111 if cls._OFFSET_PARAM in params:
2112 params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
2113 return cls(**params)
2115 def to_json(self):
2116 """Returns an input shard state for the remaining inputs.
2118 Returns:
2119 A JSON serializable version of the remaining input to read.
2122 params = dict(self.__params)
2123 if self._PROTOTYPE_REQUEST_PARAM in params:
2124 prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
2125 params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
2126 if self._OFFSET_PARAM in params:
2127 params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
2128 return params
2130 @classmethod
2131 def split_input(cls, mapper_spec):
2132 """Returns a list of input readers for the given input specification.
2134 Args:
2135 mapper_spec: The MapperSpec for this InputReader.
2137 Returns:
2138 A list of InputReaders.
2140 params = _get_params(mapper_spec)
2141 shard_count = mapper_spec.shard_count
2144 start_time = params[cls.START_TIME_PARAM]
2145 end_time = params[cls.END_TIME_PARAM]
2146 seconds_per_shard = (end_time - start_time) / shard_count
2149 shards = []
2150 for _ in xrange(shard_count - 1):
2151 params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
2152 seconds_per_shard)
2153 shards.append(LogInputReader(**params))
2154 params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
2157 params[cls.END_TIME_PARAM] = end_time
2158 return shards + [LogInputReader(**params)]
2160 @classmethod
2161 def validate(cls, mapper_spec):
2162 """Validates the mapper's specification and all necessary parameters.
2164 Args:
2165 mapper_spec: The MapperSpec to be used with this InputReader.
2167 Raises:
2168 BadReaderParamsError: If the user fails to specify both a starting time
2169 and an ending time, or if the starting time is later than the ending
2170 time.
2172 if mapper_spec.input_reader_class() != cls:
2173 raise errors.BadReaderParamsError("Input reader class mismatch")
2175 params = _get_params(mapper_spec, allowed_keys=cls._PARAMS)
2176 if cls.VERSION_IDS_PARAM not in params:
2177 raise errors.BadReaderParamsError("Must specify a list of version ids "
2178 "for mapper input")
2179 if (cls.START_TIME_PARAM not in params or
2180 params[cls.START_TIME_PARAM] is None):
2181 raise errors.BadReaderParamsError("Must specify a starting time for "
2182 "mapper input")
2183 if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
2184 params[cls.END_TIME_PARAM] = time.time()
2186 if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
2187 raise errors.BadReaderParamsError("The starting time cannot be later "
2188 "than or the same as the ending time.")
2190 if cls._PROTOTYPE_REQUEST_PARAM in params:
2191 try:
2192 params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
2193 params[cls._PROTOTYPE_REQUEST_PARAM])
2194 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
2195 raise errors.BadReaderParamsError("The prototype request must be "
2196 "parseable as a LogReadRequest.")
2201 try:
2202 logservice.fetch(**params)
2203 except logservice.InvalidArgumentError, e:
2204 raise errors.BadReaderParamsError("One or more parameters are not valid "
2205 "inputs to logservice.fetch(): %s" % e)
2207 def __str__(self):
2208 """Returns the string representation of this LogInputReader."""
2209 params = []
2210 for key in sorted(self.__params.keys()):
2211 value = self.__params[key]
2212 if key is self._PROTOTYPE_REQUEST_PARAM:
2213 params.append("%s='%s'" % (key, value))
2214 elif key is self._OFFSET_PARAM:
2215 params.append("%s='%s'" % (key, value))
2216 else:
2217 params.append("%s=%s" % (key, value))
2219 return "LogInputReader(%s)" % ", ".join(params)