3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Pipelines for mapreduce library."""
35 from __future__
import with_statement
49 from appengine_pipeline
.src
import pipeline
50 from appengine_pipeline
.src
.pipeline
import common
as pipeline_common
51 from google
.appengine
.api
import files
52 from google
.appengine
.ext
.mapreduce
import input_readers
53 from google
.appengine
.ext
.mapreduce
import mapper_pipeline
54 from google
.appengine
.ext
.mapreduce
import model
55 from google
.appengine
.ext
.mapreduce
import output_writers
56 from google
.appengine
.ext
.mapreduce
import pipeline_base
57 from google
.appengine
.ext
.mapreduce
import shuffler
64 MapperPipeline
= mapper_pipeline
.MapperPipeline
66 ShufflePipeline
= shuffler
.ShufflePipeline
68 CleanupPipeline
= mapper_pipeline
._CleanupPipeline
71 _ReducerReader
= input_readers
._ReducerReader
74 class MapPipeline(pipeline_base
._OutputSlotsMixin
,
75 pipeline_base
.PipelineBase
):
76 """Runs the map stage of MapReduce.
78 Iterates over input reader and outputs data into key/value format
79 for shuffler consumption.
82 job_name: mapreduce job name as string.
83 mapper_spec: specification of map handler function as string.
84 input_reader_spec: input reader specification as string.
85 params: mapper and input reader parameters as dict.
86 shards: number of shards to start as int.
89 list of filenames written to by this mapper, one for each shard.
103 output_writers
.__name
__ + ".KeyValueBlobstoreOutputWriter",
108 class ReducePipeline(pipeline_base
._OutputSlotsMixin
,
109 pipeline_base
.PipelineBase
):
110 """Runs the reduce stage of MapReduce.
112 Merge-reads input files and runs reducer function on them.
115 job_name: mapreduce job name as string.
116 reader_spec: specification of reduce function.
117 output_writer_spec: specification of output write to use with reduce
119 params: mapper parameters to use as dict.
120 filenames: list of filenames to reduce.
121 combiner_spec: Optional. Specification of a combine function. If not
122 supplied, no combine step will take place. The combine function takes a
123 key, list of values and list of previously combined results. It yields
124 combined values that might be processed by another combiner call, but will
125 eventually end up in reducer. The combiner output key is assumed to be the
126 same as the input key.
127 shards: Optional. Number of output shards. Defaults to the number of
131 filenames from output writer.
142 new_params
= dict(params
or {})
148 "combiner_spec": combiner_spec
,
153 shards
= len(filenames
)
155 yield mapper_pipeline
.MapperPipeline(
156 job_name
+ "-reduce",
158 __name__
+ "._ReducerReader",
164 class MapreducePipeline(pipeline_base
._OutputSlotsMixin
,
165 pipeline_base
.PipelineBase
):
166 """Pipeline to execute MapReduce jobs.
169 job_name: job name as string.
170 mapper_spec: specification of mapper to use.
171 reducer_spec: specification of reducer to use.
172 input_reader_spec: specification of input reader to read data from.
173 output_writer_spec: specification of output writer to save reduce output to.
174 mapper_params: parameters to use for mapper phase.
175 reducer_params: parameters to use for reduce phase.
176 shards: number of shards to use as int.
177 combiner_spec: Optional. Specification of a combine function. If not
178 supplied, no combine step will take place. The combine function takes a
179 key, list of values and list of previously combined results. It yields
180 combined values that might be processed by another combiner call, but will
181 eventually end up in reducer. The combiner output key is assumed to be the
182 same as the input key.
185 result_status: one of model.MapreduceState._RESULTS. Check this to see
186 if the job is successful.
187 default: a list of filenames if the mapreduce was sucesssful and
188 was outputting files. An empty list otherwise.
196 output_writer_spec
=None,
201 map_pipeline
= yield MapPipeline(job_name
,
204 params
=mapper_params
,
206 shuffler_pipeline
= yield ShufflePipeline(
207 job_name
, map_pipeline
)
208 reducer_pipeline
= yield ReducePipeline(
214 combiner_spec
=combiner_spec
)
215 with pipeline
.After(reducer_pipeline
):
216 all_temp_files
= yield pipeline_common
.Extend(
217 map_pipeline
, shuffler_pipeline
)
218 yield CleanupPipeline(all_temp_files
)
220 yield _ReturnPipeline(map_pipeline
.result_status
,
221 reducer_pipeline
.result_status
,
225 class _ReturnPipeline(pipeline_base
._OutputSlotsMixin
,
226 pipeline_base
.PipelineBase
):
227 """Returns Mapreduce result.
229 Fills outputs for MapreducePipeline. See MapreducePipeline.
232 output_names
= ["result_status"]
236 reduce_result_status
,
239 if (map_result_status
== model
.MapreduceState
.RESULT_ABORTED
or
240 reduce_result_status
== model
.MapreduceState
.RESULT_ABORTED
):
241 result_status
= model
.MapreduceState
.RESULT_ABORTED
242 elif (map_result_status
== model
.MapreduceState
.RESULT_FAILED
or
243 reduce_result_status
== model
.MapreduceState
.RESULT_FAILED
):
244 result_status
= model
.MapreduceState
.RESULT_FAILED
246 result_status
= model
.MapreduceState
.RESULT_SUCCESS
248 self
.fill(self
.outputs
.result_status
, result_status
)
249 if result_status
== model
.MapreduceState
.RESULT_SUCCESS
:
250 yield pipeline_common
.Return(reduce_outputs
)
252 yield pipeline_common
.Return([])