App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / status.py
blob549205551e8d5157c35e85b21997a9cb474df985
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Status page handler for mapreduce framework."""
36 import os
37 import pkgutil
38 import time
40 from google.appengine.api import validation
41 from google.appengine.api import yaml_builder
42 from google.appengine.api import yaml_errors
43 from google.appengine.api import yaml_listener
44 from google.appengine.api import yaml_object
45 from google.appengine.ext import db
46 from google.appengine.ext import webapp
47 from google.appengine.ext.mapreduce import base_handler
48 from google.appengine.ext.mapreduce import errors
49 from google.appengine.ext.mapreduce import model
57 MR_YAML_NAMES = ["mapreduce.yaml", "mapreduce.yml"]
60 class BadStatusParameterError(Exception):
61 """A parameter passed to a status handler was invalid."""
64 class UserParam(validation.Validated):
65 """A user-supplied parameter to a mapreduce job."""
67 ATTRIBUTES = {
68 "name": r"[a-zA-Z0-9_\.]+",
69 "default": validation.Optional(r".*"),
70 "value": validation.Optional(r".*"),
74 class MapperInfo(validation.Validated):
75 """Configuration parameters for the mapper part of the job."""
77 ATTRIBUTES = {
78 "handler": r".+",
79 "input_reader": r".+",
80 "output_writer": validation.Optional(r".+"),
81 "params": validation.Optional(validation.Repeated(UserParam)),
82 "params_validator": validation.Optional(r".+"),
86 class MapreduceInfo(validation.Validated):
87 """Mapreduce description in mapreduce.yaml."""
89 ATTRIBUTES = {
90 "name": r".+",
91 "mapper": MapperInfo,
92 "params": validation.Optional(validation.Repeated(UserParam)),
93 "params_validator": validation.Optional(r".+"),
97 class MapReduceYaml(validation.Validated):
98 """Root class for mapreduce.yaml.
100 File format:
102 mapreduce:
103 - name: <mapreduce_name>
104 mapper:
105 - input_reader: google.appengine.ext.mapreduce.DatastoreInputReader
106 - handler: path_to_my.MapperFunction
107 - params:
108 - name: foo
109 default: bar
110 - name: blah
111 default: stuff
112 - params_validator: path_to_my.ValidatorFunction
114 Where
115 mapreduce_name: The name of the mapreduce. Used for UI purposes.
116 mapper_handler_spec: Full <module_name>.<function_name/class_name> of
117 mapper handler. See MapreduceSpec class documentation for full handler
118 specification.
119 input_reader: Full <module_name>.<function_name/class_name> of the
120 InputReader sub-class to use for the mapper job.
121 params: A list of optional parameter names and optional default values
122 that may be supplied or overridden by the user running the job.
123 params_validator is full <module_name>.<function_name/class_name> of
124 a callable to validate the mapper_params after they are input by the
125 user running the job.
128 ATTRIBUTES = {
129 "mapreduce": validation.Optional(validation.Repeated(MapreduceInfo))
132 @staticmethod
133 def to_dict(mapreduce_yaml):
134 """Converts a MapReduceYaml file into a JSON-encodable dictionary.
136 For use in user-visible UI and internal methods for interfacing with
137 user code (like param validation). as a list
139 Args:
140 mapreduce_yaml: The Pyton representation of the mapreduce.yaml document.
142 Returns:
143 A list of configuration dictionaries.
145 all_configs = []
146 for config in mapreduce_yaml.mapreduce:
147 out = {
148 "name": config.name,
149 "mapper_input_reader": config.mapper.input_reader,
150 "mapper_handler": config.mapper.handler,
152 if config.mapper.params_validator:
153 out["mapper_params_validator"] = config.mapper.params_validator
154 if config.mapper.params:
155 param_defaults = {}
156 for param in config.mapper.params:
157 param_defaults[param.name] = param.default or param.value
158 out["mapper_params"] = param_defaults
159 if config.params:
160 param_defaults = {}
161 for param in config.params:
162 param_defaults[param.name] = param.default or param.value
163 out["params"] = param_defaults
164 if config.mapper.output_writer:
165 out["mapper_output_writer"] = config.mapper.output_writer
166 all_configs.append(out)
168 return all_configs
174 def find_mapreduce_yaml(status_file=__file__):
175 """Traverse directory trees to find mapreduce.yaml file.
177 Begins with the location of status.py and then moves on to check the working
178 directory.
180 Args:
181 status_file: location of status.py, overridable for testing purposes.
183 Returns:
184 the path of mapreduce.yaml file or None if not found.
186 checked = set()
187 yaml = _find_mapreduce_yaml(os.path.dirname(status_file), checked)
188 if not yaml:
189 yaml = _find_mapreduce_yaml(os.getcwd(), checked)
190 return yaml
193 def _find_mapreduce_yaml(start, checked):
194 """Traverse the directory tree identified by start until a directory already
195 in checked is encountered or the path of mapreduce.yaml is found.
197 Checked is present both to make loop termination easy to reason about and so
198 that the same directories do not get rechecked.
200 Args:
201 start: the path to start in and work upward from
202 checked: the set of already examined directories
204 Returns:
205 the path of mapreduce.yaml file or None if not found.
207 dir = start
208 while dir not in checked:
209 checked.add(dir)
210 for mr_yaml_name in MR_YAML_NAMES:
211 yaml_path = os.path.join(dir, mr_yaml_name)
212 if os.path.exists(yaml_path):
213 return yaml_path
214 dir = os.path.dirname(dir)
215 return None
218 def parse_mapreduce_yaml(contents):
219 """Parses mapreduce.yaml file contents.
221 Args:
222 contents: mapreduce.yaml file contents.
224 Returns:
225 MapReduceYaml object with all the data from original file.
227 Raises:
228 errors.BadYamlError: when contents is not a valid mapreduce.yaml file.
230 try:
231 builder = yaml_object.ObjectBuilder(MapReduceYaml)
232 handler = yaml_builder.BuilderHandler(builder)
233 listener = yaml_listener.EventListener(handler)
234 listener.Parse(contents)
236 mr_info = handler.GetResults()
237 except (ValueError, yaml_errors.EventError), e:
238 raise errors.BadYamlError(e)
240 if len(mr_info) < 1:
241 raise errors.BadYamlError("No configs found in mapreduce.yaml")
242 if len(mr_info) > 1:
243 raise errors.MultipleDocumentsInMrYaml("Found %d YAML documents" %
244 len(mr_info))
246 jobs = mr_info[0]
247 job_names = set(j.name for j in jobs.mapreduce)
248 if len(jobs.mapreduce) != len(job_names):
249 raise errors.BadYamlError(
250 "Overlapping mapreduce names; names must be unique")
252 return jobs
255 def get_mapreduce_yaml(parse=parse_mapreduce_yaml):
256 """Locates mapreduce.yaml, loads and parses its info.
258 Args:
259 parse: Used for testing.
261 Returns:
262 MapReduceYaml object.
264 Raises:
265 errors.BadYamlError: when contents is not a valid mapreduce.yaml file or the
266 file is missing.
268 mr_yaml_path = find_mapreduce_yaml()
269 if not mr_yaml_path:
270 raise errors.MissingYamlError()
271 mr_yaml_file = open(mr_yaml_path)
272 try:
273 return parse(mr_yaml_file.read())
274 finally:
275 mr_yaml_file.close()
278 class ResourceHandler(webapp.RequestHandler):
279 """Handler for static resources."""
281 _RESOURCE_MAP = {
282 "status": ("overview.html", "text/html"),
283 "detail": ("detail.html", "text/html"),
284 "base.css": ("base.css", "text/css"),
285 "jquery.js": ("jquery-1.6.1.min.js", "text/javascript"),
286 "jquery-json.js": ("jquery.json-2.2.min.js", "text/javascript"),
287 "status.js": ("status.js", "text/javascript"),
290 def get(self, relative):
291 if relative not in self._RESOURCE_MAP:
292 self.response.set_status(404)
293 self.response.out.write("Resource not found.")
294 return
296 real_path, content_type = self._RESOURCE_MAP[relative]
297 path = os.path.join(os.path.dirname(__file__), "static", real_path)
298 self.response.headers["Cache-Control"] = "public; max-age=300"
299 self.response.headers["Content-Type"] = content_type
300 try:
301 data = pkgutil.get_data(__name__, "static/" + real_path)
302 except AttributeError:
303 data = None
304 self.response.out.write(data or open(path).read())
307 class ListConfigsHandler(base_handler.GetJsonHandler):
308 """Lists mapreduce configs as JSON for users to start jobs."""
310 def handle(self):
311 self.json_response["configs"] = MapReduceYaml.to_dict(get_mapreduce_yaml())
314 class ListJobsHandler(base_handler.GetJsonHandler):
315 """Lists running and completed mapreduce jobs for an overview as JSON."""
317 def handle(self):
318 cursor = self.request.get("cursor")
319 count = int(self.request.get("count", "50"))
321 query = model.MapreduceState.all()
322 if cursor:
323 query.filter("__key__ >=", db.Key(cursor))
324 query.order("__key__")
326 jobs_list = query.fetch(count + 1)
327 if len(jobs_list) == (count + 1):
328 self.json_response["cursor"] = str(jobs_list[-1].key())
329 jobs_list = jobs_list[:-1]
331 all_jobs = []
332 for job in jobs_list:
333 out = {
335 "name": job.mapreduce_spec.name,
336 "mapreduce_id": job.mapreduce_spec.mapreduce_id,
337 "active": job.active,
338 "start_timestamp_ms":
339 int(time.mktime(job.start_time.utctimetuple()) * 1000),
340 "updated_timestamp_ms":
341 int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
344 "chart_url": job.sparkline_url,
345 "chart_width": job.chart_width,
346 "active_shards": job.active_shards,
347 "shards": job.mapreduce_spec.mapper.shard_count,
349 if job.result_status:
350 out["result_status"] = job.result_status
351 all_jobs.append(out)
353 self.json_response["jobs"] = all_jobs
356 class GetJobDetailHandler(base_handler.GetJsonHandler):
357 """Retrieves the details of a mapreduce job as JSON."""
359 def handle(self):
360 mapreduce_id = self.request.get("mapreduce_id")
361 if not mapreduce_id:
362 raise BadStatusParameterError("'mapreduce_id' was invalid")
363 job = model.MapreduceState.get_by_key_name(mapreduce_id)
364 if job is None:
365 raise KeyError("Could not find job with ID %r" % mapreduce_id)
367 self.json_response.update(job.mapreduce_spec.to_json())
368 self.json_response.update(job.counters_map.to_json())
369 self.json_response.update({
371 "active": job.active,
372 "start_timestamp_ms":
373 int(time.mktime(job.start_time.utctimetuple()) * 1000),
374 "updated_timestamp_ms":
375 int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
378 "chart_url": job.chart_url,
379 "chart_width": job.chart_width,
381 self.json_response["result_status"] = job.result_status
383 all_shards = []
384 for shard in model.ShardState.find_all_by_mapreduce_state(job):
385 out = {
386 "active": shard.active,
387 "result_status": shard.result_status,
388 "shard_number": shard.shard_number,
389 "shard_id": shard.shard_id,
390 "updated_timestamp_ms":
391 int(time.mktime(shard.update_time.utctimetuple()) * 1000),
392 "shard_description": shard.shard_description,
393 "last_work_item": shard.last_work_item,
395 out.update(shard.counters_map.to_json())
396 all_shards.append(out)
397 all_shards.sort(key=lambda x: x["shard_number"])
398 self.json_response["shards"] = all_shards