App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / mapper_pipeline.py
blob8347471a447f1e68ab25baf5546d3f10f3054b5b
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Pipelines for mapreduce library."""
36 __all__ = [
37 "MapperPipeline",
40 import google
42 from google.appengine.api import files
43 from google.appengine.ext.mapreduce import control
44 from google.appengine.ext.mapreduce import model
45 from google.appengine.ext.mapreduce import parameters
46 from google.appengine.ext.mapreduce import pipeline_base
52 class MapperPipeline(pipeline_base._OutputSlotsMixin,
53 pipeline_base.PipelineBase):
54 """Pipeline wrapper for mapper job.
56 Args:
57 job_name: mapper job name as string
58 handler_spec: mapper handler specification as string.
59 input_reader_spec: input reader specification as string.
60 output_writer_spec: output writer specification as string.
61 params: mapper parameters for input reader and output writer as dict.
62 shards: number of shards in the job as int.
64 Returns:
65 default: the list of filenames produced by the mapper if there was any
66 output and the map was completed successfully.
67 result_status: one of model.MapreduceState._RESULTS.
68 job_id: mr id that can be used to query model.MapreduceState. Available
69 immediately after run returns.
70 """
71 async = True
75 output_names = [
78 "job_id",
80 "counters"] + pipeline_base._OutputSlotsMixin.output_names
82 def run(self,
83 job_name,
84 handler_spec,
85 input_reader_spec,
86 output_writer_spec=None,
87 params=None,
88 shards=None):
89 """Start a mapreduce job.
91 Args:
92 job_name: mapreduce name. Only for display purpose.
93 handler_spec: fully qualified name to your map function/class.
94 input_reader_spec: fully qualified name to input reader class.
95 output_writer_spec: fully qualified name to output writer class.
96 params: a dictionary of parameters for input reader and output writer
97 initialization.
98 shards: number of shards. This provides a guide to mapreduce. The real
99 number of shards is determined by how input are splited.
101 if shards is None:
102 shards = parameters.config.SHARD_COUNT
104 mapreduce_id = control.start_map(
105 job_name,
106 handler_spec,
107 input_reader_spec,
108 params or {},
109 mapreduce_parameters={
110 "done_callback": self.get_callback_url(),
111 "done_callback_method": "GET",
112 "pipeline_id": self.pipeline_id,
114 shard_count=shards,
115 output_writer_spec=output_writer_spec,
117 self.fill(self.outputs.job_id, mapreduce_id)
118 self.set_status(console_url="%s/detail?job_id=%s" % (
119 (parameters.config.BASE_PATH, mapreduce_id)))
121 def try_cancel(self):
122 """Always allow mappers to be canceled and retried."""
123 return True
125 def callback(self):
126 """Callback after this async pipeline finishes."""
127 mapreduce_id = self.outputs.job_id.value
128 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
129 if mapreduce_state.result_status != model.MapreduceState.RESULT_SUCCESS:
130 self.retry("Job %s had status %s" % (
131 mapreduce_id, mapreduce_state.result_status))
132 return
134 mapper_spec = mapreduce_state.mapreduce_spec.mapper
135 outputs = []
136 output_writer_class = mapper_spec.output_writer_class()
137 if (output_writer_class and
138 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
139 outputs = output_writer_class.get_filenames(mapreduce_state)
141 self.fill(self.outputs.result_status, mapreduce_state.result_status)
142 self.fill(self.outputs.counters, mapreduce_state.counters_map.to_dict())
143 self.complete(outputs)
146 class _CleanupPipeline(pipeline_base.PipelineBase):
147 """A pipeline to do a cleanup for mapreduce jobs.
149 Args:
150 filename_or_list: list of files or file lists to delete.
153 def delete_file_or_list(self, filename_or_list):
154 if isinstance(filename_or_list, list):
155 for filename in filename_or_list:
156 self.delete_file_or_list(filename)
157 else:
158 filename = filename_or_list
159 for _ in range(10):
160 try:
161 files.delete(filename)
162 break
163 except:
164 pass
166 def run(self, temp_files):
167 self.delete_file_or_list(temp_files)