App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / mapreduce_pipeline.py
blob1c802d5ccd6140027807bad067addacdb56dfd7e
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Pipelines for mapreduce library."""
35 from __future__ import with_statement
38 __all__ = [
39 "CleanupPipeline",
40 "MapPipeline",
41 "MapperPipeline",
42 "MapreducePipeline",
43 "ReducePipeline",
44 "ShufflePipeline",
47 import google
49 from appengine_pipeline.src import pipeline
50 from appengine_pipeline.src.pipeline import common as pipeline_common
51 from google.appengine.api import files
52 from google.appengine.ext.mapreduce import input_readers
53 from google.appengine.ext.mapreduce import mapper_pipeline
54 from google.appengine.ext.mapreduce import model
55 from google.appengine.ext.mapreduce import output_writers
56 from google.appengine.ext.mapreduce import pipeline_base
57 from google.appengine.ext.mapreduce import shuffler
64 MapperPipeline = mapper_pipeline.MapperPipeline
66 ShufflePipeline = shuffler.ShufflePipeline
68 CleanupPipeline = mapper_pipeline._CleanupPipeline
71 _ReducerReader = input_readers._ReducerReader
74 class MapPipeline(pipeline_base._OutputSlotsMixin,
75 pipeline_base.PipelineBase):
76 """Runs the map stage of MapReduce.
78 Iterates over input reader and outputs data into key/value format
79 for shuffler consumption.
81 Args:
82 job_name: mapreduce job name as string.
83 mapper_spec: specification of map handler function as string.
84 input_reader_spec: input reader specification as string.
85 params: mapper and input reader parameters as dict.
86 shards: number of shards to start as int.
88 Returns:
89 list of filenames written to by this mapper, one for each shard.
90 """
92 def run(self,
93 job_name,
94 mapper_spec,
95 input_reader_spec,
96 params,
97 shards=None):
98 yield MapperPipeline(
99 job_name + "-map",
100 mapper_spec,
101 input_reader_spec,
102 output_writer_spec=
103 output_writers.__name__ + ".KeyValueBlobstoreOutputWriter",
104 params=params,
105 shards=shards)
108 class ReducePipeline(pipeline_base._OutputSlotsMixin,
109 pipeline_base.PipelineBase):
110 """Runs the reduce stage of MapReduce.
112 Merge-reads input files and runs reducer function on them.
114 Args:
115 job_name: mapreduce job name as string.
116 reader_spec: specification of reduce function.
117 output_writer_spec: specification of output write to use with reduce
118 function.
119 params: mapper parameters to use as dict.
120 filenames: list of filenames to reduce.
121 combiner_spec: Optional. Specification of a combine function. If not
122 supplied, no combine step will take place. The combine function takes a
123 key, list of values and list of previously combined results. It yields
124 combined values that might be processed by another combiner call, but will
125 eventually end up in reducer. The combiner output key is assumed to be the
126 same as the input key.
127 shards: Optional. Number of output shards. Defaults to the number of
128 input files.
130 Returns:
131 filenames from output writer.
134 def run(self,
135 job_name,
136 reducer_spec,
137 output_writer_spec,
138 params,
139 filenames,
140 combiner_spec=None,
141 shards=None):
142 new_params = dict(params or {})
143 new_params.update({
144 "files": filenames
146 if combiner_spec:
147 new_params.update({
148 "combiner_spec": combiner_spec,
152 if shards is None:
153 shards = len(filenames)
155 yield mapper_pipeline.MapperPipeline(
156 job_name + "-reduce",
157 reducer_spec,
158 __name__ + "._ReducerReader",
159 output_writer_spec,
160 new_params,
161 shards=shards)
164 class MapreducePipeline(pipeline_base._OutputSlotsMixin,
165 pipeline_base.PipelineBase):
166 """Pipeline to execute MapReduce jobs.
168 Args:
169 job_name: job name as string.
170 mapper_spec: specification of mapper to use.
171 reducer_spec: specification of reducer to use.
172 input_reader_spec: specification of input reader to read data from.
173 output_writer_spec: specification of output writer to save reduce output to.
174 mapper_params: parameters to use for mapper phase.
175 reducer_params: parameters to use for reduce phase.
176 shards: number of shards to use as int.
177 combiner_spec: Optional. Specification of a combine function. If not
178 supplied, no combine step will take place. The combine function takes a
179 key, list of values and list of previously combined results. It yields
180 combined values that might be processed by another combiner call, but will
181 eventually end up in reducer. The combiner output key is assumed to be the
182 same as the input key.
184 Returns:
185 result_status: one of model.MapreduceState._RESULTS. Check this to see
186 if the job is successful.
187 default: a list of filenames if the mapreduce was sucesssful and
188 was outputting files. An empty list otherwise.
191 def run(self,
192 job_name,
193 mapper_spec,
194 reducer_spec,
195 input_reader_spec,
196 output_writer_spec=None,
197 mapper_params=None,
198 reducer_params=None,
199 shards=None,
200 combiner_spec=None):
201 map_pipeline = yield MapPipeline(job_name,
202 mapper_spec,
203 input_reader_spec,
204 params=mapper_params,
205 shards=shards)
206 shuffler_pipeline = yield ShufflePipeline(
207 job_name, map_pipeline)
208 reducer_pipeline = yield ReducePipeline(
209 job_name,
210 reducer_spec,
211 output_writer_spec,
212 reducer_params,
213 shuffler_pipeline,
214 combiner_spec=combiner_spec)
215 with pipeline.After(reducer_pipeline):
216 all_temp_files = yield pipeline_common.Extend(
217 map_pipeline, shuffler_pipeline)
218 yield CleanupPipeline(all_temp_files)
220 yield _ReturnPipeline(map_pipeline.result_status,
221 reducer_pipeline.result_status,
222 reducer_pipeline)
225 class _ReturnPipeline(pipeline_base._OutputSlotsMixin,
226 pipeline_base.PipelineBase):
227 """Returns Mapreduce result.
229 Fills outputs for MapreducePipeline. See MapreducePipeline.
232 output_names = ["result_status"]
234 def run(self,
235 map_result_status,
236 reduce_result_status,
237 reduce_outputs):
239 if (map_result_status == model.MapreduceState.RESULT_ABORTED or
240 reduce_result_status == model.MapreduceState.RESULT_ABORTED):
241 result_status = model.MapreduceState.RESULT_ABORTED
242 elif (map_result_status == model.MapreduceState.RESULT_FAILED or
243 reduce_result_status == model.MapreduceState.RESULT_FAILED):
244 result_status = model.MapreduceState.RESULT_FAILED
245 else:
246 result_status = model.MapreduceState.RESULT_SUCCESS
248 self.fill(self.outputs.result_status, result_status)
249 if result_status == model.MapreduceState.RESULT_SUCCESS:
250 yield pipeline_common.Return(reduce_outputs)
251 else:
252 yield pipeline_common.Return([])