App Engine Python SDK version 1.9.3
[gae.git] / python / google / appengine / ext / mapreduce / api / map_job / sample_input_reader.py
blobb1ed89d368f358af28ac19184c8aba011f96f78f
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Sample Input Reader for map job."""
18 import random
19 import string
20 import time
22 from google.appengine.ext.mapreduce import context
23 from google.appengine.ext.mapreduce import errors
24 from google.appengine.ext.mapreduce import operation
25 from google.appengine.ext.mapreduce.api import map_job
30 COUNTER_IO_READ_BYTES = "io-read-bytes"
33 COUNTER_IO_READ_MSEC = "io-read-msec"
36 class SampleInputReader(map_job.InputReader):
37 """A sample InputReader that generates random strings as output.
39 Primary usage is to as an example InputReader that can be use for test
40 purposes.
41 """
44 COUNT = "count"
46 STRING_LENGTH = "string_length"
48 _DEFAULT_STRING_LENGTH = 10
50 def __init__(self, count, string_length):
51 """Initialize input reader.
53 Args:
54 count: number of entries this shard should generate.
55 string_length: the length of generated random strings.
56 """
57 self._count = count
58 self._string_length = string_length
60 def __iter__(self):
61 ctx = context.get()
63 while self._count:
64 self._count -= 1
65 start_time = time.time()
66 content = "".join(random.choice(string.ascii_lowercase)
67 for _ in range(self._string_length))
68 if ctx:
69 operation.counters.Increment(
70 COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
71 operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
72 yield content
74 @classmethod
75 def from_json(cls, state):
76 """Inherit docs."""
77 return cls(state[cls.COUNT], state[cls.STRING_LENGTH])
79 def to_json(self):
80 """Inherit docs."""
81 return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
83 @classmethod
84 def split_input(cls, job_config):
85 """Inherit docs."""
86 params = job_config.input_reader_params
87 count = params[cls.COUNT]
88 string_length = params.get(cls.STRING_LENGTH, cls._DEFAULT_STRING_LENGTH)
90 shard_count = job_config.shard_count
91 count_per_shard = count // shard_count
93 mr_input_readers = [
94 cls(count_per_shard, string_length) for _ in range(shard_count)]
96 left = count - count_per_shard*shard_count
97 if left > 0:
98 mr_input_readers.append(cls(left, string_length))
100 return mr_input_readers
102 @classmethod
103 def validate(cls, job_config):
104 """Inherit docs."""
105 super(SampleInputReader, cls).validate(job_config)
107 params = job_config.input_reader_params
109 if cls.COUNT not in params:
110 raise errors.BadReaderParamsError("Must specify %s" % cls.COUNT)
111 if not isinstance(params[cls.COUNT], int):
112 raise errors.BadReaderParamsError("%s should be an int but is %s" %
113 (cls.COUNT, type(params[cls.COUNT])))
114 if params[cls.COUNT] <= 0:
115 raise errors.BadReaderParamsError("%s should be a positive int")
117 if cls.STRING_LENGTH in params and not (
118 isinstance(params[cls.STRING_LENGTH], int) and
119 params[cls.STRING_LENGTH] > 0):
120 raise errors.BadReaderParamsError("%s should be a positive int "
121 "but is %s" %
122 (cls.STRING_LENGTH,
123 params[cls.STRING_LENGTH]))