3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Pipelines for mapreduce library."""
42 from google
.appengine
.api
import files
43 from google
.appengine
.ext
.mapreduce
import control
44 from google
.appengine
.ext
.mapreduce
import model
45 from google
.appengine
.ext
.mapreduce
import parameters
46 from google
.appengine
.ext
.mapreduce
import pipeline_base
52 class MapperPipeline(pipeline_base
._OutputSlotsMixin
,
53 pipeline_base
.PipelineBase
):
54 """Pipeline wrapper for mapper job.
57 job_name: mapper job name as string
58 handler_spec: mapper handler specification as string.
59 input_reader_spec: input reader specification as string.
60 output_writer_spec: output writer specification as string.
61 params: mapper parameters for input reader and output writer as dict.
62 shards: number of shards in the job as int.
65 default: the list of filenames produced by the mapper if there was any
66 output and the map was completed successfully.
67 result_status: one of model.MapreduceState._RESULTS.
68 job_id: mr id that can be used to query model.MapreduceState. Available
69 immediately after run returns.
80 "counters"] + pipeline_base
._OutputSlotsMixin
.output_names
86 output_writer_spec
=None,
89 """Start a mapreduce job.
92 job_name: mapreduce name. Only for display purpose.
93 handler_spec: fully qualified name to your map function/class.
94 input_reader_spec: fully qualified name to input reader class.
95 output_writer_spec: fully qualified name to output writer class.
96 params: a dictionary of parameters for input reader and output writer
98 shards: number of shards. This provides a guide to mapreduce. The real
99 number of shards is determined by how input are splited.
102 shards
= parameters
.config
.SHARD_COUNT
104 mapreduce_id
= control
.start_map(
109 mapreduce_parameters
={
110 "done_callback": self
.get_callback_url(),
111 "done_callback_method": "GET",
112 "pipeline_id": self
.pipeline_id
,
115 output_writer_spec
=output_writer_spec
,
117 self
.fill(self
.outputs
.job_id
, mapreduce_id
)
118 self
.set_status(console_url
="%s/detail?job_id=%s" % (
119 (parameters
.config
.BASE_PATH
, mapreduce_id
)))
121 def try_cancel(self
):
122 """Always allow mappers to be canceled and retried."""
126 """Callback after this async pipeline finishes."""
127 mapreduce_id
= self
.outputs
.job_id
.value
128 mapreduce_state
= model
.MapreduceState
.get_by_job_id(mapreduce_id
)
129 if mapreduce_state
.result_status
!= model
.MapreduceState
.RESULT_SUCCESS
:
130 self
.retry("Job %s had status %s" % (
131 mapreduce_id
, mapreduce_state
.result_status
))
134 mapper_spec
= mapreduce_state
.mapreduce_spec
.mapper
136 output_writer_class
= mapper_spec
.output_writer_class()
137 if (output_writer_class
and
138 mapreduce_state
.result_status
== model
.MapreduceState
.RESULT_SUCCESS
):
139 outputs
= output_writer_class
.get_filenames(mapreduce_state
)
141 self
.fill(self
.outputs
.result_status
, mapreduce_state
.result_status
)
142 self
.fill(self
.outputs
.counters
, mapreduce_state
.counters_map
.to_dict())
143 self
.complete(outputs
)
146 class _CleanupPipeline(pipeline_base
.PipelineBase
):
147 """A pipeline to do a cleanup for mapreduce jobs.
150 filename_or_list: list of files or file lists to delete.
153 def delete_file_or_list(self
, filename_or_list
):
154 if isinstance(filename_or_list
, list):
155 for filename
in filename_or_list
:
156 self
.delete_file_or_list(filename
)
158 filename
= filename_or_list
161 files
.delete(filename
)
166 def run(self
, temp_files
):
167 self
.delete_file_or_list(temp_files
)