3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Sample Input Reader for map job."""
22 from google
.appengine
.ext
.mapreduce
import context
23 from google
.appengine
.ext
.mapreduce
import errors
24 from google
.appengine
.ext
.mapreduce
import operation
25 from google
.appengine
.ext
.mapreduce
.api
import map_job
30 COUNTER_IO_READ_BYTES
= "io-read-bytes"
33 COUNTER_IO_READ_MSEC
= "io-read-msec"
36 class SampleInputReader(map_job
.InputReader
):
37 """A sample InputReader that generates random strings as output.
39 Primary usage is to as an example InputReader that can be use for test
46 STRING_LENGTH
= "string_length"
48 _DEFAULT_STRING_LENGTH
= 10
50 def __init__(self
, count
, string_length
):
51 """Initialize input reader.
54 count: number of entries this shard should generate.
55 string_length: the length of generated random strings.
58 self
._string
_length
= string_length
65 start_time
= time
.time()
66 content
= "".join(random
.choice(string
.ascii_lowercase
)
67 for _
in range(self
._string
_length
))
69 operation
.counters
.Increment(
70 COUNTER_IO_READ_MSEC
, int((time
.time() - start_time
) * 1000))(ctx
)
71 operation
.counters
.Increment(COUNTER_IO_READ_BYTES
, len(content
))(ctx
)
75 def from_json(cls
, state
):
77 return cls(state
[cls
.COUNT
], state
[cls
.STRING_LENGTH
])
81 return {self
.COUNT
: self
._count
, self
.STRING_LENGTH
: self
._string
_length
}
84 def split_input(cls
, job_config
):
86 params
= job_config
.input_reader_params
87 count
= params
[cls
.COUNT
]
88 string_length
= params
.get(cls
.STRING_LENGTH
, cls
._DEFAULT
_STRING
_LENGTH
)
90 shard_count
= job_config
.shard_count
91 count_per_shard
= count
// shard_count
94 cls(count_per_shard
, string_length
) for _
in range(shard_count
)]
96 left
= count
- count_per_shard
*shard_count
98 mr_input_readers
.append(cls(left
, string_length
))
100 return mr_input_readers
103 def validate(cls
, job_config
):
105 super(SampleInputReader
, cls
).validate(job_config
)
107 params
= job_config
.input_reader_params
109 if cls
.COUNT
not in params
:
110 raise errors
.BadReaderParamsError("Must specify %s" % cls
.COUNT
)
111 if not isinstance(params
[cls
.COUNT
], int):
112 raise errors
.BadReaderParamsError("%s should be an int but is %s" %
113 (cls
.COUNT
, type(params
[cls
.COUNT
])))
114 if params
[cls
.COUNT
] <= 0:
115 raise errors
.BadReaderParamsError("%s should be a positive int")
117 if cls
.STRING_LENGTH
in params
and not (
118 isinstance(params
[cls
.STRING_LENGTH
], int) and
119 params
[cls
.STRING_LENGTH
] > 0):
120 raise errors
.BadReaderParamsError("%s should be a positive int "
123 params
[cls
.STRING_LENGTH
]))