1.9.30 sync.
[gae.git] / python / google / appengine / ext / mapreduce / base_handler.py
blob7e8377a4ffb1bf4ee2aa7481d712150e3c44eba9
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Base handler class for all mapreduce handlers."""
39 import httplib
40 import logging
42 import google
43 import simplejson
45 try:
46 from google.appengine.ext.mapreduce import pipeline_base
47 except ImportError:
48 pipeline_base = None
49 try:
51 from google.appengine.ext import cloudstorage
52 if hasattr(cloudstorage, "_STUB"):
53 cloudstorage = None
54 except ImportError:
55 cloudstorage = None
57 from google.appengine.ext import webapp
58 from google.appengine.ext.mapreduce import errors
59 from google.appengine.ext.mapreduce import json_util
60 from google.appengine.ext.mapreduce import model
61 from google.appengine.ext.mapreduce import parameters
64 class Error(Exception):
65 """Base-class for exceptions in this module."""
68 class BadRequestPathError(Error):
69 """The request path for the handler is invalid."""
72 class TaskQueueHandler(webapp.RequestHandler):
73 """Base class for handlers intended to be run only from the task queue.
75 Sub-classes should implement
76 1. the 'handle' method for all POST request.
77 2. '_preprocess' method for decoding or validations before handle.
78 3. '_drop_gracefully' method if _preprocess fails and the task has to
79 be dropped.
81 In Python27 runtime, webapp2 will automatically replace webapp.
82 """
84 _DEFAULT_USER_AGENT = "App Engine Python MR"
86 def __init__(self, *args, **kwargs):
92 self._preprocess_success = False
93 super(TaskQueueHandler, self).__init__(*args, **kwargs)
94 if cloudstorage:
95 cloudstorage.set_default_retry_params(
96 cloudstorage.RetryParams(
97 min_retries=5,
98 max_retries=10,
99 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC,
100 save_access_token=True,
101 _user_agent=self._DEFAULT_USER_AGENT))
103 def initialize(self, request, response):
104 """Initialize.
106 1. call webapp init.
107 2. check request is indeed from taskqueue.
108 3. check the task has not been retried too many times.
109 4. run handler specific processing logic.
110 5. run error handling logic if precessing failed.
112 Args:
113 request: a webapp.Request instance.
114 response: a webapp.Response instance.
116 super(TaskQueueHandler, self).initialize(request, response)
119 if "X-AppEngine-QueueName" not in self.request.headers:
120 logging.error(self.request.headers)
121 logging.error("Task queue handler received non-task queue request")
122 self.response.set_status(
123 403, message="Task queue handler received non-task queue request")
124 return
127 if self.task_retry_count() + 1 > parameters.config.TASK_MAX_ATTEMPTS:
128 logging.error(
129 "Task %s has been attempted %s times. Dropping it permanently.",
130 self.request.headers["X-AppEngine-TaskName"],
131 self.task_retry_count() + 1)
132 self._drop_gracefully()
133 return
135 try:
136 self._preprocess()
137 self._preprocess_success = True
139 except:
140 self._preprocess_success = False
141 logging.error(
142 "Preprocess task %s failed. Dropping it permanently.",
143 self.request.headers["X-AppEngine-TaskName"])
144 self._drop_gracefully()
146 def post(self):
147 if self._preprocess_success:
148 self.handle()
150 def handle(self):
151 """To be implemented by subclasses."""
152 raise NotImplementedError()
154 def _preprocess(self):
155 """Preprocess.
157 This method is called after webapp initialization code has been run
158 successfully. It can thus access self.request, self.response and so on.
160 pass
162 def _drop_gracefully(self):
163 """Drop task gracefully.
165 When preprocess failed, this method is called before the task is dropped.
167 pass
169 def task_retry_count(self):
170 """Number of times this task has been retried."""
171 return int(self.request.headers.get("X-AppEngine-TaskExecutionCount", 0))
173 def retry_task(self):
174 """Ask taskqueue to retry this task.
176 Even though raising an exception can cause a task retry, it
177 will flood logs with highly visible ERROR logs. Handlers should uses
178 this method to perform controlled task retries. Only raise exceptions
179 for those deserve ERROR log entries.
181 self.response.set_status(httplib.SERVICE_UNAVAILABLE, "Retry task")
182 self.response.clear()
185 class JsonHandler(webapp.RequestHandler):
186 """Base class for JSON handlers for user interface.
188 Sub-classes should implement the 'handle' method. They should put their
189 response data in the 'self.json_response' dictionary. Any exceptions raised
190 by the sub-class implementation will be sent in a JSON response with the
191 name of the error_class and the error_message.
194 def __init__(self, *args):
195 """Initializer."""
196 super(JsonHandler, self).__init__(*args)
197 self.json_response = {}
199 def base_path(self):
200 """Base path for all mapreduce-related urls.
202 JSON handlers are mapped to /base_path/command/command_name thus they
203 require special treatment.
205 Raises:
206 BadRequestPathError: if the path does not end with "/command".
208 Returns:
209 The base path.
211 path = self.request.path
212 base_path = path[:path.rfind("/")]
213 if not base_path.endswith("/command"):
214 raise BadRequestPathError(
215 "Json handlers should have /command path prefix")
216 return base_path[:base_path.rfind("/")]
218 def _handle_wrapper(self):
219 """The helper method for handling JSON Post and Get requests."""
220 if self.request.headers.get("X-Requested-With") != "XMLHttpRequest":
221 logging.error("Got JSON request with no X-Requested-With header")
222 self.response.set_status(
223 403, message="Got JSON request with no X-Requested-With header")
224 return
226 self.json_response.clear()
227 try:
228 self.handle()
229 except errors.MissingYamlError:
230 logging.debug("Could not find 'mapreduce.yaml' file.")
231 self.json_response.clear()
232 self.json_response["error_class"] = "Notice"
233 self.json_response["error_message"] = "Could not find 'mapreduce.yaml'"
234 except Exception, e:
235 logging.exception("Error in JsonHandler, returning exception.")
237 self.json_response.clear()
238 self.json_response["error_class"] = e.__class__.__name__
239 self.json_response["error_message"] = str(e)
241 self.response.headers["Content-Type"] = "text/javascript"
242 try:
243 output = simplejson.dumps(self.json_response, cls=json_util.JsonEncoder)
245 except Exception, e:
246 logging.exception("Could not serialize to JSON")
247 self.response.set_status(500, message="Could not serialize to JSON")
248 return
249 else:
250 self.response.out.write(output)
252 def handle(self):
253 """To be implemented by sub-classes."""
254 raise NotImplementedError()
257 class PostJsonHandler(JsonHandler):
258 """JSON handler that accepts POST requests."""
260 def post(self):
261 self._handle_wrapper()
264 class GetJsonHandler(JsonHandler):
265 """JSON handler that accepts GET posts."""
267 def get(self):
268 self._handle_wrapper()
271 class HugeTaskHandler(TaskQueueHandler):
272 """Base handler for processing HugeTasks."""
274 class _RequestWrapper(object):
275 """Container of a request and associated parameters."""
277 def __init__(self, request):
278 self._request = request
279 self._params = model.HugeTask.decode_payload(request)
281 def get(self, name, default=""):
282 return self._params.get(name, default)
284 def set(self, name, value):
285 self._params[name] = value
287 def __getattr__(self, name):
288 return getattr(self._request, name)
290 def __init__(self, *args, **kwargs):
291 super(HugeTaskHandler, self).__init__(*args, **kwargs)
293 def _preprocess(self):
294 self.request = self._RequestWrapper(self.request)
297 if pipeline_base:
299 PipelineBase = pipeline_base.PipelineBase
300 else:
301 PipelineBase = None