3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
31 """Base handler class for all mapreduce handlers."""
46 from google
.appengine
.ext
.mapreduce
import pipeline_base
51 from google
.appengine
.ext
import cloudstorage
52 if hasattr(cloudstorage
, "_STUB"):
57 from google
.appengine
.ext
import webapp
58 from google
.appengine
.ext
.mapreduce
import errors
59 from google
.appengine
.ext
.mapreduce
import json_util
60 from google
.appengine
.ext
.mapreduce
import model
61 from google
.appengine
.ext
.mapreduce
import parameters
64 class Error(Exception):
65 """Base-class for exceptions in this module."""
68 class BadRequestPathError(Error
):
69 """The request path for the handler is invalid."""
72 class TaskQueueHandler(webapp
.RequestHandler
):
73 """Base class for handlers intended to be run only from the task queue.
75 Sub-classes should implement
76 1. the 'handle' method for all POST request.
77 2. '_preprocess' method for decoding or validations before handle.
78 3. '_drop_gracefully' method if _preprocess fails and the task has to
81 In Python27 runtime, webapp2 will automatically replace webapp.
84 _DEFAULT_USER_AGENT
= "App Engine Python MR"
86 def __init__(self
, *args
, **kwargs
):
92 self
._preprocess
_success
= False
93 super(TaskQueueHandler
, self
).__init
__(*args
, **kwargs
)
95 cloudstorage
.set_default_retry_params(
96 cloudstorage
.RetryParams(
99 urlfetch_timeout
=parameters
._GCS
_URLFETCH
_TIMEOUT
_SEC
,
100 save_access_token
=True,
101 _user_agent
=self
._DEFAULT
_USER
_AGENT
))
103 def initialize(self
, request
, response
):
107 2. check request is indeed from taskqueue.
108 3. check the task has not been retried too many times.
109 4. run handler specific processing logic.
110 5. run error handling logic if precessing failed.
113 request: a webapp.Request instance.
114 response: a webapp.Response instance.
116 super(TaskQueueHandler
, self
).initialize(request
, response
)
119 if "X-AppEngine-QueueName" not in self
.request
.headers
:
120 logging
.error(self
.request
.headers
)
121 logging
.error("Task queue handler received non-task queue request")
122 self
.response
.set_status(
123 403, message
="Task queue handler received non-task queue request")
127 if self
.task_retry_count() + 1 > parameters
.config
.TASK_MAX_ATTEMPTS
:
129 "Task %s has been attempted %s times. Dropping it permanently.",
130 self
.request
.headers
["X-AppEngine-TaskName"],
131 self
.task_retry_count() + 1)
132 self
._drop
_gracefully
()
137 self
._preprocess
_success
= True
140 self
._preprocess
_success
= False
142 "Preprocess task %s failed. Dropping it permanently.",
143 self
.request
.headers
["X-AppEngine-TaskName"])
144 self
._drop
_gracefully
()
147 if self
._preprocess
_success
:
151 """To be implemented by subclasses."""
152 raise NotImplementedError()
154 def _preprocess(self
):
157 This method is called after webapp initialization code has been run
158 successfully. It can thus access self.request, self.response and so on.
162 def _drop_gracefully(self
):
163 """Drop task gracefully.
165 When preprocess failed, this method is called before the task is dropped.
169 def task_retry_count(self
):
170 """Number of times this task has been retried."""
171 return int(self
.request
.headers
.get("X-AppEngine-TaskExecutionCount", 0))
173 def retry_task(self
):
174 """Ask taskqueue to retry this task.
176 Even though raising an exception can cause a task retry, it
177 will flood logs with highly visible ERROR logs. Handlers should uses
178 this method to perform controlled task retries. Only raise exceptions
179 for those deserve ERROR log entries.
181 self
.response
.set_status(httplib
.SERVICE_UNAVAILABLE
, "Retry task")
182 self
.response
.clear()
185 class JsonHandler(webapp
.RequestHandler
):
186 """Base class for JSON handlers for user interface.
188 Sub-classes should implement the 'handle' method. They should put their
189 response data in the 'self.json_response' dictionary. Any exceptions raised
190 by the sub-class implementation will be sent in a JSON response with the
191 name of the error_class and the error_message.
194 def __init__(self
, *args
):
196 super(JsonHandler
, self
).__init
__(*args
)
197 self
.json_response
= {}
200 """Base path for all mapreduce-related urls.
202 JSON handlers are mapped to /base_path/command/command_name thus they
203 require special treatment.
206 BadRequestPathError: if the path does not end with "/command".
211 path
= self
.request
.path
212 base_path
= path
[:path
.rfind("/")]
213 if not base_path
.endswith("/command"):
214 raise BadRequestPathError(
215 "Json handlers should have /command path prefix")
216 return base_path
[:base_path
.rfind("/")]
218 def _handle_wrapper(self
):
219 """The helper method for handling JSON Post and Get requests."""
220 if self
.request
.headers
.get("X-Requested-With") != "XMLHttpRequest":
221 logging
.error("Got JSON request with no X-Requested-With header")
222 self
.response
.set_status(
223 403, message
="Got JSON request with no X-Requested-With header")
226 self
.json_response
.clear()
229 except errors
.MissingYamlError
:
230 logging
.debug("Could not find 'mapreduce.yaml' file.")
231 self
.json_response
.clear()
232 self
.json_response
["error_class"] = "Notice"
233 self
.json_response
["error_message"] = "Could not find 'mapreduce.yaml'"
235 logging
.exception("Error in JsonHandler, returning exception.")
237 self
.json_response
.clear()
238 self
.json_response
["error_class"] = e
.__class
__.__name
__
239 self
.json_response
["error_message"] = str(e
)
241 self
.response
.headers
["Content-Type"] = "text/javascript"
243 output
= simplejson
.dumps(self
.json_response
, cls
=json_util
.JsonEncoder
)
246 logging
.exception("Could not serialize to JSON")
247 self
.response
.set_status(500, message
="Could not serialize to JSON")
250 self
.response
.out
.write(output
)
253 """To be implemented by sub-classes."""
254 raise NotImplementedError()
257 class PostJsonHandler(JsonHandler
):
258 """JSON handler that accepts POST requests."""
261 self
._handle
_wrapper
()
264 class GetJsonHandler(JsonHandler
):
265 """JSON handler that accepts GET posts."""
268 self
._handle
_wrapper
()
271 class HugeTaskHandler(TaskQueueHandler
):
272 """Base handler for processing HugeTasks."""
274 class _RequestWrapper(object):
275 """Container of a request and associated parameters."""
277 def __init__(self
, request
):
278 self
._request
= request
279 self
._params
= model
.HugeTask
.decode_payload(request
)
281 def get(self
, name
, default
=""):
282 return self
._params
.get(name
, default
)
284 def set(self
, name
, value
):
285 self
._params
[name
] = value
287 def __getattr__(self
, name
):
288 return getattr(self
._request
, name
)
290 def __init__(self
, *args
, **kwargs
):
291 super(HugeTaskHandler
, self
).__init
__(*args
, **kwargs
)
293 def _preprocess(self
):
294 self
.request
= self
._RequestWrapper
(self
.request
)
299 PipelineBase
= pipeline_base
.PipelineBase