App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / mapreduce / base_handler.py
blobe600df5697e033078a279391f483705a79aeab6f
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
33 """Base handler class for all mapreduce handlers."""
40 import httplib
41 import logging
43 import google
44 import simplejson
46 try:
47 from google.appengine.ext.mapreduce import pipeline_base
48 except ImportError:
49 pipeline_base = None
50 try:
52 from google.appengine.ext import cloudstorage
53 if hasattr(cloudstorage, "_STUB"):
54 cloudstorage = None
55 except ImportError:
56 cloudstorage = None
58 from google.appengine.ext import webapp
59 from google.appengine.ext.mapreduce import errors
60 from google.appengine.ext.mapreduce import json_util
61 from google.appengine.ext.mapreduce import model
62 from google.appengine.ext.mapreduce import parameters
65 class Error(Exception):
66 """Base-class for exceptions in this module."""
69 class BadRequestPathError(Error):
70 """The request path for the handler is invalid."""
73 class TaskQueueHandler(webapp.RequestHandler):
74 """Base class for handlers intended to be run only from the task queue.
76 Sub-classes should implement
77 1. the 'handle' method for all POST request.
78 2. '_preprocess' method for decoding or validations before handle.
79 3. '_drop_gracefully' method if _preprocess fails and the task has to
80 be dropped.
82 In Python27 runtime, webapp2 will automatically replace webapp.
83 """
85 def __init__(self, *args, **kwargs):
91 self._preprocess_success = False
92 super(TaskQueueHandler, self).__init__(*args, **kwargs)
93 if cloudstorage:
94 cloudstorage.set_default_retry_params(
95 cloudstorage.RetryParams(save_access_token=True))
97 def initialize(self, request, response):
98 """Initialize.
100 1. call webapp init.
101 2. check request is indeed from taskqueue.
102 3. check the task has not been retried too many times.
103 4. run handler specific processing logic.
104 5. run error handling logic if precessing failed.
106 Args:
107 request: a webapp.Request instance.
108 response: a webapp.Response instance.
110 super(TaskQueueHandler, self).initialize(request, response)
113 if "X-AppEngine-QueueName" not in self.request.headers:
114 logging.error(self.request.headers)
115 logging.error("Task queue handler received non-task queue request")
116 self.response.set_status(
117 403, message="Task queue handler received non-task queue request")
118 return
121 if self.task_retry_count() + 1 > parameters.config.TASK_MAX_ATTEMPTS:
122 logging.error(
123 "Task %s has been attempted %s times. Dropping it permanently.",
124 self.request.headers["X-AppEngine-TaskName"],
125 self.task_retry_count() + 1)
126 self._drop_gracefully()
127 return
129 try:
130 self._preprocess()
131 self._preprocess_success = True
133 except:
134 self._preprocess_success = False
135 logging.error(
136 "Preprocess task %s failed. Dropping it permanently.",
137 self.request.headers["X-AppEngine-TaskName"])
138 self._drop_gracefully()
140 def post(self):
141 if self._preprocess_success:
142 self.handle()
144 def handle(self):
145 """To be implemented by subclasses."""
146 raise NotImplementedError()
148 def _preprocess(self):
149 """Preprocess.
151 This method is called after webapp initialization code has been run
152 successfully. It can thus access self.request, self.response and so on.
154 pass
156 def _drop_gracefully(self):
157 """Drop task gracefully.
159 When preprocess failed, this method is called before the task is dropped.
161 pass
163 def task_retry_count(self):
164 """Number of times this task has been retried."""
165 return int(self.request.headers.get("X-AppEngine-TaskExecutionCount", 0))
167 def retry_task(self):
168 """Ask taskqueue to retry this task.
170 Even though raising an exception can cause a task retry, it
171 will flood logs with highly visible ERROR logs. Handlers should uses
172 this method to perform controlled task retries. Only raise exceptions
173 for those deserve ERROR log entries.
175 self.response.set_status(httplib.SERVICE_UNAVAILABLE, "Retry task")
176 self.response.clear()
179 class JsonHandler(webapp.RequestHandler):
180 """Base class for JSON handlers for user interface.
182 Sub-classes should implement the 'handle' method. They should put their
183 response data in the 'self.json_response' dictionary. Any exceptions raised
184 by the sub-class implementation will be sent in a JSON response with the
185 name of the error_class and the error_message.
188 def __init__(self, *args):
189 """Initializer."""
190 super(JsonHandler, self).__init__(*args)
191 self.json_response = {}
193 def base_path(self):
194 """Base path for all mapreduce-related urls.
196 JSON handlers are mapped to /base_path/command/command_name thus they
197 require special treatment.
199 path = self.request.path
200 base_path = path[:path.rfind("/")]
201 if not base_path.endswith("/command"):
202 raise BadRequestPathError(
203 "Json handlers should have /command path prefix")
204 return base_path[:base_path.rfind("/")]
206 def _handle_wrapper(self):
207 if self.request.headers.get("X-Requested-With") != "XMLHttpRequest":
208 logging.error("Got JSON request with no X-Requested-With header")
209 self.response.set_status(
210 403, message="Got JSON request with no X-Requested-With header")
211 return
213 self.json_response.clear()
214 try:
215 self.handle()
216 except errors.MissingYamlError:
217 logging.debug("Could not find 'mapreduce.yaml' file.")
218 self.json_response.clear()
219 self.json_response["error_class"] = "Notice"
220 self.json_response["error_message"] = "Could not find 'mapreduce.yaml'"
221 except Exception, e:
222 logging.exception("Error in JsonHandler, returning exception.")
224 self.json_response.clear()
225 self.json_response["error_class"] = e.__class__.__name__
226 self.json_response["error_message"] = str(e)
228 self.response.headers["Content-Type"] = "text/javascript"
229 try:
230 output = simplejson.dumps(self.json_response, cls=json_util.JsonEncoder)
231 except:
232 logging.exception("Could not serialize to JSON")
233 self.response.set_status(500, message="Could not serialize to JSON")
234 return
235 else:
236 self.response.out.write(output)
238 def handle(self):
239 """To be implemented by sub-classes."""
240 raise NotImplementedError()
243 class PostJsonHandler(JsonHandler):
244 """JSON handler that accepts POST requests."""
246 def post(self):
247 self._handle_wrapper()
250 class GetJsonHandler(JsonHandler):
251 """JSON handler that accepts GET posts."""
253 def get(self):
254 self._handle_wrapper()
257 class HugeTaskHandler(TaskQueueHandler):
258 """Base handler for processing HugeTasks."""
260 class _RequestWrapper(object):
261 def __init__(self, request):
262 self._request = request
263 self._params = model.HugeTask.decode_payload(request)
265 def get(self, name, default=""):
266 return self._params.get(name, default)
268 def set(self, name, value):
269 self._params[name] = value
271 def __getattr__(self, name):
272 return getattr(self._request, name)
274 def __init__(self, *args, **kwargs):
275 super(HugeTaskHandler, self).__init__(*args, **kwargs)
277 def _preprocess(self):
278 self.request = self._RequestWrapper(self.request)
281 if pipeline_base:
283 PipelineBase = pipeline_base.PipelineBase
284 else:
285 PipelineBase = None