App Engine Python SDK version 1.8.8
[gae.git] / python / google / appengine / ext / mapreduce / handlers.py
blob1e21012aa4d5b6fe982477da7baffe29a6945a10
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
32 """Defines executor tasks handlers for MapReduce implementation."""
38 import datetime
39 import gc
40 import logging
41 import math
42 import os
43 import random
44 import sys
45 import time
46 import traceback
47 import simplejson
49 from google.appengine.ext import ndb
51 from google.appengine import runtime
52 from google.appengine.api import datastore_errors
53 from google.appengine.api import logservice
54 from google.appengine.api import modules
55 from google.appengine.api import taskqueue
56 from google.appengine.ext import db
57 from google.appengine.ext.mapreduce import base_handler
58 from google.appengine.ext.mapreduce import context
59 from google.appengine.ext.mapreduce import errors
60 from google.appengine.ext.mapreduce import input_readers
61 from google.appengine.ext.mapreduce import model
62 from google.appengine.ext.mapreduce import operation
63 from google.appengine.ext.mapreduce import parameters
64 from google.appengine.ext.mapreduce import util
65 from google.appengine.runtime import apiproxy_errors
68 try:
69 from google.appengine.ext import cloudstorage
74 if hasattr(cloudstorage, "_STUB"):
75 cloudstorage = None
76 except ImportError:
77 cloudstorage = None
81 _TEST_INJECTED_FAULTS = set()
84 def _run_task_hook(hooks, method, task, queue_name):
85 """Invokes hooks.method(task, queue_name).
87 Args:
88 hooks: A hooks.Hooks instance or None.
89 method: The name of the method to invoke on the hooks class e.g.
90 "enqueue_kickoff_task".
91 task: The taskqueue.Task to pass to the hook method.
92 queue_name: The name of the queue to pass to the hook method.
94 Returns:
95 True if the hooks.Hooks instance handled the method, False otherwise.
96 """
97 if hooks is not None:
98 try:
99 getattr(hooks, method)(task, queue_name)
100 except NotImplementedError:
102 return False
104 return True
105 return False
108 class MapperWorkerCallbackHandler(base_handler.HugeTaskHandler):
109 """Callback handler for mapreduce worker task."""
111 _TASK_STATE = util._enum(
113 RETRY_TASK="retry_task",
115 DROP_TASK="drop_task",
117 PROCEED_TASK="proceed_task",
119 RETRY_SHARD="retry_shard",
121 FAIL_TASK="fail_task")
123 def __init__(self, *args):
124 """Constructor."""
125 super(MapperWorkerCallbackHandler, self).__init__(*args)
126 self._time = time.time
128 def _drop_gracefully(self):
129 """Drop worker task gracefully.
131 Set current shard_state to failed. Controller logic will take care of
132 other shards and the entire MR.
134 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
135 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
136 shard_state, mr_state = db.get([
137 model.ShardState.get_key_by_shard_id(shard_id),
138 model.MapreduceState.get_key_by_job_id(mr_id)])
140 if shard_state and shard_state.active:
141 shard_state.set_for_failure()
142 config = util.create_datastore_write_config(mr_state.mapreduce_spec)
143 shard_state.put(config=config)
145 def _try_acquire_lease(self, shard_state, tstate):
146 """Validate datastore and the task payload are consistent.
148 If so, attempt to get a lease on this slice's execution.
149 See model.ShardState doc on slice_start_time.
151 Args:
152 shard_state: model.ShardState from datastore.
153 tstate: model.TransientShardState from taskqueue paylod.
155 Returns:
156 A _TASK_STATE enum. PROCEED_TASK if lock is acquired.
157 RETRY_TASK if task should be retried, DROP_TASK if task should
158 be dropped. Only old tasks (comparing to datastore state)
159 will be dropped. Future tasks are retried until they naturally
160 become old so that we don't ever stuck MR.
163 if not shard_state:
164 logging.warning("State not found for shard %s; Possible spurious task "
165 "execution. Dropping this task.",
166 tstate.shard_id)
167 return self._TASK_STATE.DROP_TASK
169 if not shard_state.active:
170 logging.warning("Shard %s is not active. Possible spurious task "
171 "execution. Dropping this task.", tstate.shard_id)
172 logging.warning(str(shard_state))
173 return self._TASK_STATE.DROP_TASK
176 if shard_state.retries > tstate.retries:
177 logging.warning(
178 "Got shard %s from previous shard retry %s. Possible spurious "
179 "task execution. Dropping this task.",
180 tstate.shard_id,
181 tstate.retries)
182 logging.warning(str(shard_state))
183 return self._TASK_STATE.DROP_TASK
184 elif shard_state.retries < tstate.retries:
188 logging.warning(
189 "ShardState for %s is behind slice. Waiting for it to catch up",
190 shard_state.shard_id)
191 return self._TASK_STATE.RETRY_TASK
195 if shard_state.slice_id > tstate.slice_id:
196 logging.warning(
197 "Task %s-%s is behind ShardState %s. Dropping task.""",
198 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
199 return self._TASK_STATE.DROP_TASK
203 elif shard_state.slice_id < tstate.slice_id:
204 logging.warning(
205 "Task %s-%s is ahead of ShardState %s. Waiting for it to catch up.",
206 tstate.shard_id, tstate.slice_id, shard_state.slice_id)
207 return self._TASK_STATE.RETRY_TASK
211 if shard_state.slice_start_time:
212 countdown = self._wait_time(shard_state,
213 parameters.config._LEASE_GRACE_PERIOD +
214 parameters.config._SLICE_DURATION_SEC)
215 if countdown > 0:
216 logging.warning(
217 "Last retry of slice %s-%s may be still running."
218 "Will try again in %s seconds", tstate.shard_id, tstate.slice_id,
219 countdown)
223 time.sleep(countdown)
224 return self._TASK_STATE.RETRY_TASK
226 else:
227 if self._wait_time(shard_state,
228 parameters.config._REQUEST_EVENTUAL_TIMEOUT):
229 if not self._old_request_ended(shard_state):
230 logging.warning(
231 "Last retry of slice %s-%s is still in flight with request_id "
232 "%s. Will try again later.", tstate.shard_id, tstate.slice_id,
233 shard_state.slice_request_id)
234 return self._TASK_STATE.RETRY_TASK
235 else:
236 logging.warning(
237 "Last retry of slice %s-%s has no log entry and has"
238 "timed out after %s seconds",
239 tstate.shard_id, tstate.slice_id,
240 parameters.config._REQUEST_EVENTUAL_TIMEOUT)
243 config = util.create_datastore_write_config(tstate.mapreduce_spec)
244 @db.transactional(retries=5)
245 def _tx():
246 """Use datastore to set slice_start_time to now.
248 If failed for any reason, raise error to retry the task (hence all
249 the previous validation code). The task would die naturally eventually.
251 Returns:
252 A _TASK_STATE enum.
254 fresh_state = model.ShardState.get_by_shard_id(tstate.shard_id)
255 if not fresh_state:
256 logging.error("ShardState missing.")
257 raise db.Rollback()
258 if (fresh_state.active and
259 fresh_state.slice_id == shard_state.slice_id and
260 fresh_state.slice_start_time == shard_state.slice_start_time):
261 shard_state.slice_start_time = datetime.datetime.now()
262 shard_state.slice_request_id = os.environ.get("REQUEST_LOG_ID")
263 shard_state.acquired_once = True
264 shard_state.put(config=config)
265 return self._TASK_STATE.PROCEED_TASK
266 else:
267 logging.warning(
268 "Contention on slice %s-%s execution. Will retry again.",
269 tstate.shard_id, tstate.slice_id)
271 time.sleep(random.randrange(1, 5))
272 return self._TASK_STATE.RETRY_TASK
274 return _tx()
276 def _old_request_ended(self, shard_state):
277 """Whether previous slice retry has ended according to Logs API.
279 Args:
280 shard_state: shard state.
282 Returns:
283 True if the request of previous slice retry has ended. False if it has
284 not or unknown.
286 assert shard_state.slice_start_time is not None
287 assert shard_state.slice_request_id is not None
288 request_ids = [shard_state.slice_request_id]
289 logs = list(logservice.fetch(
290 request_ids=request_ids,
292 module_versions=[(os.environ["CURRENT_MODULE_ID"],
293 modules.get_current_version_name())]))
295 if not logs or not logs[0].finished:
296 return False
297 return True
299 def _wait_time(self, shard_state, secs, now=datetime.datetime.now):
300 """Time to wait until slice_start_time is secs ago from now.
302 Args:
303 shard_state: shard state.
304 secs: duration in seconds.
305 now: a func that gets now.
307 Returns:
308 0 if no wait. A positive int in seconds otherwise. Always around up.
310 assert shard_state.slice_start_time is not None
311 delta = now() - shard_state.slice_start_time
312 duration = datetime.timedelta(seconds=secs)
313 if delta < duration:
314 return util.total_seconds(duration - delta)
315 else:
316 return 0
318 def _try_free_lease(self, shard_state, slice_retry=False):
319 """Try to free lease.
321 A lightweight transaction to update shard_state and unset
322 slice_start_time to allow the next retry to happen without blocking.
323 We don't care if this fails or not because the lease will expire
324 anyway.
326 Under normal execution, _save_state_and_schedule_next is the exit point.
327 It updates/saves shard state and schedules the next slice or returns.
328 Other exit points are:
329 1. _are_states_consistent: at the beginning of handle, checks
330 if datastore states and the task are in sync.
331 If not, raise or return.
332 2. _attempt_slice_retry: may raise exception to taskqueue.
333 3. _save_state_and_schedule_next: may raise exception when taskqueue/db
334 unreachable.
336 This handler should try to free the lease on every exceptional exit point.
338 Args:
339 shard_state: model.ShardState.
340 slice_retry: whether to count this as a failed slice execution.
342 @db.transactional
343 def _tx():
344 fresh_state = model.ShardState.get_by_shard_id(shard_state.shard_id)
345 if fresh_state and fresh_state.active:
347 fresh_state.slice_start_time = None
348 fresh_state.slice_request_id = None
349 if slice_retry:
350 fresh_state.slice_retries += 1
351 fresh_state.put()
352 try:
353 _tx()
355 except Exception, e:
356 logging.warning(e)
357 logging.warning(
358 "Release lock for shard %s failed. Wait for lease to expire.",
359 shard_state.shard_id)
361 def handle(self):
362 """Handle request.
364 This method has to be careful to pass the same ShardState instance to
365 its subroutines calls if the calls mutate or read from ShardState.
366 Note especially that Context instance caches and updates the ShardState
367 instance.
369 Returns:
370 Set HTTP status code and always returns None.
372 self._start_time = self._time()
374 shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
375 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
376 spec = model.MapreduceSpec._get_mapreduce_spec(mr_id)
377 shard_state, control = db.get([
378 model.ShardState.get_key_by_shard_id(shard_id),
379 model.MapreduceControl.get_key_by_job_id(mr_id),
383 ctx = context.Context(spec, shard_state,
384 task_retry_count=self.task_retry_count())
385 context.Context._set(ctx)
388 tstate = model.TransientShardState.from_request(self.request)
389 task_state = self._try_acquire_lease(shard_state, tstate)
390 if task_state == self._TASK_STATE.RETRY_TASK:
391 return self.retry_task()
392 if task_state == self._TASK_STATE.DROP_TASK:
393 return
394 assert task_state == self._TASK_STATE.PROCEED_TASK
396 if control and control.command == model.MapreduceControl.ABORT:
397 logging.info("Abort command received by shard %d of job '%s'",
398 shard_state.shard_number, shard_state.mapreduce_id)
401 shard_state.set_for_abort()
402 shard_state.put(config=util.create_datastore_write_config(spec))
403 return
407 util._set_ndb_cache_policy()
408 if cloudstorage:
409 cloudstorage.set_default_retry_params(
410 cloudstorage.RetryParams(
411 urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC))
413 try:
414 finished_shard = self.process_inputs(
415 tstate.input_reader, shard_state, tstate, ctx)
417 if finished_shard:
420 if tstate.output_writer:
425 tstate.output_writer.finalize(ctx, shard_state)
426 shard_state.set_for_success()
428 except Exception, e:
429 task_state = self._retry_logic(
430 e, shard_state, tstate, spec.mapreduce_id)
432 if task_state == self._TASK_STATE.RETRY_TASK:
434 return self.retry_task()
435 self._save_state_and_schedule_next(shard_state, tstate, task_state)
437 def process_inputs(self,
438 input_reader,
439 shard_state,
440 tstate,
441 ctx):
442 """Read inputs, process them, and write out outputs.
444 This is the core logic of MapReduce. It reads inputs from input reader,
445 invokes user specified mapper function, and writes output with
446 output writer. It also updates shard_state accordingly.
447 e.g. if shard processing is done, set shard_state.active to False.
449 If errors.FailJobError is caught, it will fail this MR job.
450 All other exceptions will be logged and raised to taskqueue for retry
451 until the number of retries exceeds a limit.
453 Args:
454 input_reader: input reader.
455 shard_state: shard state.
456 tstate: transient shard state.
457 ctx: mapreduce context.
459 Returns:
460 Whether this shard has finished processing all its input split.
462 processing_limit = self._processing_limit(tstate.mapreduce_spec)
463 if processing_limit == 0:
464 return
466 finished_shard = True
468 for entity in input_reader:
469 if isinstance(entity, db.Model):
470 shard_state.last_work_item = repr(entity.key())
471 elif isinstance(entity, ndb.Model):
472 shard_state.last_work_item = repr(entity.key)
473 else:
474 shard_state.last_work_item = repr(entity)[:100]
476 processing_limit -= 1
478 if not self.process_data(
479 entity, input_reader, ctx, tstate):
480 finished_shard = False
481 break
482 elif processing_limit == 0:
483 finished_shard = False
484 break
487 operation.counters.Increment(
488 context.COUNTER_MAPPER_WALLTIME_MS,
489 int((self._time() - self._start_time)*1000))(ctx)
490 ctx.flush()
492 return finished_shard
494 def process_data(self, data, input_reader, ctx, transient_shard_state):
495 """Process a single data piece.
497 Call mapper handler on the data.
499 Args:
500 data: a datum to process.
501 input_reader: input reader.
502 ctx: mapreduce context
503 transient_shard_state: transient shard state.
505 Returns:
506 True if scan should be continued, False if scan should be stopped.
508 if data is not input_readers.ALLOW_CHECKPOINT:
509 ctx.counters.increment(context.COUNTER_MAPPER_CALLS)
511 handler = transient_shard_state.handler
513 if input_reader.expand_parameters:
514 result = handler(*data)
515 else:
516 result = handler(data)
518 if util.is_generator(result):
519 for output in result:
520 if isinstance(output, operation.Operation):
521 output(ctx)
522 else:
523 output_writer = transient_shard_state.output_writer
524 if not output_writer:
525 logging.error(
526 "Handler yielded %s, but no output writer is set.", output)
527 else:
528 output_writer.write(output)
530 if self._time() - self._start_time >= parameters.config._SLICE_DURATION_SEC:
531 return False
532 return True
534 def _save_state_and_schedule_next(self, shard_state, tstate, task_state):
535 """Save state to datastore and schedule next task for this shard.
537 Update and save shard state. Schedule next slice if needed.
538 This method handles interactions with datastore and taskqueue.
540 Args:
541 shard_state: model.ShardState for current shard.
542 tstate: model.TransientShardState for current shard.
543 task_state: enum _TASK_STATE.
546 spec = tstate.mapreduce_spec
547 config = util.create_datastore_write_config(spec)
550 if task_state == self._TASK_STATE.RETRY_SHARD:
553 task = self._state_to_task(tstate, shard_state)
554 elif task_state == self._TASK_STATE.PROCEED_TASK:
555 shard_state.advance_for_next_slice()
556 tstate.advance_for_next_slice()
557 countdown = self._get_countdown_for_next_slice(spec)
558 task = self._state_to_task(tstate, shard_state, countdown=countdown)
559 else:
560 assert task_state == self._TASK_STATE.FAIL_TASK
561 task = None
563 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
565 @db.transactional(retries=5)
566 def _tx():
567 fresh_shard_state = model.ShardState.get_by_shard_id(tstate.shard_id)
568 if not fresh_shard_state:
569 raise db.Rollback()
570 if (not fresh_shard_state.active or
571 "worker_active_state_collision" in _TEST_INJECTED_FAULTS):
572 logging.error("Shard %s is not active. Possible spurious task "
573 "execution. Dropping this task.", tstate.shard_id)
574 logging.error("Datastore's %s", str(fresh_shard_state))
575 logging.error("Slice's %s", str(shard_state))
576 return
577 fresh_shard_state.copy_from(shard_state)
578 fresh_shard_state.put(config=config)
583 if fresh_shard_state.active:
586 self._add_task(task, spec, queue_name)
588 try:
589 _tx()
590 except (datastore_errors.Error,
591 taskqueue.Error,
592 runtime.DeadlineExceededError,
593 apiproxy_errors.Error), e:
594 logging.error(
595 "Can't transactionally continue shard. "
596 "Will retry slice %s %s for the %s time.",
597 tstate.shard_id,
598 tstate.slice_id,
599 self.task_retry_count() + 1)
600 self._try_free_lease(shard_state)
601 raise e
602 finally:
603 gc.collect()
605 def _retry_logic(self, e, shard_state, tstate, mr_id):
606 """Handle retry for this slice.
608 This method may modify shard_state and tstate to prepare for retry or fail.
610 Args:
611 e: the exception caught.
612 shard_state: model.ShardState for current shard.
613 tstate: model.TransientShardState for current shard.
614 mr_id: mapreduce id.
616 Returns:
617 A _TASK_STATE enum. RETRY_SHARD if shard should be retried.
618 RETRY_TASK if slice should be retried. FAIL_TASK otherwise.
620 logging.error("Shard %s got error.", shard_state.shard_id)
623 logging.error(traceback.format_exc())
626 if type(e) is errors.FailJobError:
627 logging.error("Got FailJobError. Shard %s failed permanently.",
628 shard_state.shard_id)
629 shard_state.set_for_failure()
630 return self._TASK_STATE.FAIL_TASK
632 task_state = self._attempt_slice_retry(shard_state, tstate)
633 if task_state == self._TASK_STATE.RETRY_SHARD:
634 task_state = self._attempt_shard_retry(shard_state, tstate, mr_id)
635 if task_state == self._TASK_STATE.FAIL_TASK:
636 shard_state.set_for_failure()
637 logging.error("Shard %s failed permanently.", shard_state.shard_id)
638 return task_state
640 def _attempt_shard_retry(self, shard_state, tstate, mr_id):
641 """Whether to retry shard.
643 This method may modify shard_state and tstate to prepare for retry or fail.
645 Args:
646 shard_state: model.ShardState for current shard.
647 tstate: model.TransientShardState for current shard.
648 mr_id: mapreduce id.
650 Returns:
651 A _TASK_STATE enum. RETRY_SHARD if shard should be retried.
652 FAIL_TASK otherwise.
654 shard_attempts = shard_state.retries + 1
656 if shard_attempts >= parameters.config.SHARD_MAX_ATTEMPTS:
657 logging.error(
658 "Shard attempt %s exceeded %s max attempts.",
659 shard_attempts, parameters.config.SHARD_MAX_ATTEMPTS)
660 return self._TASK_STATE.FAIL_TASK
661 if tstate.output_writer and (
662 not tstate.output_writer._can_be_retried(tstate)):
663 logging.error("Output writer %s does not support shard retry.",
664 tstate.output_writer.__class__.__name__)
665 return self._TASK_STATE.FAIL_TASK
667 shard_state.reset_for_retry()
668 logging.error("Shard %s attempt %s failed with up to %s attempts.",
669 shard_state.shard_id,
670 shard_state.retries,
671 parameters.config.SHARD_MAX_ATTEMPTS)
672 output_writer = None
673 if tstate.output_writer:
674 mr_state = model.MapreduceState.get_by_job_id(mr_id)
675 output_writer = tstate.output_writer.create(
676 mr_state, shard_state)
677 tstate.reset_for_retry(output_writer)
678 return self._TASK_STATE.RETRY_SHARD
680 def _attempt_slice_retry(self, shard_state, tstate):
681 """Attempt to retry this slice.
683 This method may modify shard_state and tstate to prepare for retry or fail.
685 Args:
686 shard_state: model.ShardState for current shard.
687 tstate: model.TransientShardState for current shard.
689 Returns:
690 A _TASK_STATE enum. RETRY_TASK if slice should be retried.
691 RETRY_SHARD if shard retry should be attempted.
693 if (shard_state.slice_retries + 1 <
694 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS):
695 logging.error(
696 "Slice %s %s failed for the %s of up to %s attempts "
697 "(%s of %s taskqueue execution attempts). "
698 "Will retry now.",
699 tstate.shard_id,
700 tstate.slice_id,
701 shard_state.slice_retries + 1,
702 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS,
703 self.task_retry_count() + 1,
704 parameters.config.TASK_MAX_ATTEMPTS)
708 sys.exc_clear()
709 self._try_free_lease(shard_state, slice_retry=True)
710 return self._TASK_STATE.RETRY_TASK
712 if parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS > 0:
713 logging.error("Slice attempt %s exceeded %s max attempts.",
714 self.task_retry_count() + 1,
715 parameters.config.TASK_MAX_DATA_PROCESSING_ATTEMPTS)
716 return self._TASK_STATE.RETRY_SHARD
718 @staticmethod
719 def get_task_name(shard_id, slice_id, retry=0):
720 """Compute single worker task name.
722 Args:
723 shard_id: shard id.
724 slice_id: slice id.
725 retry: current shard retry count.
727 Returns:
728 task name which should be used to process specified shard/slice.
732 return "appengine-mrshard-%s-%s-retry-%s" % (
733 shard_id, slice_id, retry)
735 def _get_countdown_for_next_slice(self, spec):
736 """Get countdown for next slice's task.
738 When user sets processing rate, we set countdown to delay task execution.
740 Args:
741 spec: model.MapreduceSpec
743 Returns:
744 countdown in int.
746 countdown = 0
747 if self._processing_limit(spec) != -1:
748 countdown = max(
749 int(parameters.config._SLICE_DURATION_SEC -
750 (self._time() - self._start_time)), 0)
751 return countdown
753 @classmethod
754 def _state_to_task(cls,
755 tstate,
756 shard_state,
757 eta=None,
758 countdown=None):
759 """Generate task for slice according to current states.
761 Args:
762 tstate: An instance of TransientShardState.
763 shard_state: An instance of ShardState.
764 eta: Absolute time when the MR should execute. May not be specified
765 if 'countdown' is also supplied. This may be timezone-aware or
766 timezone-naive.
767 countdown: Time in seconds into the future that this MR should execute.
768 Defaults to zero.
770 Returns:
771 A model.HugeTask instance for the slice specified by current states.
773 base_path = tstate.base_path
775 task_name = MapperWorkerCallbackHandler.get_task_name(
776 tstate.shard_id,
777 tstate.slice_id,
778 tstate.retries)
780 headers = util._get_task_headers(tstate.mapreduce_spec)
781 headers[util._MR_SHARD_ID_TASK_HEADER] = tstate.shard_id
783 worker_task = model.HugeTask(
784 url=base_path + "/worker_callback",
785 params=tstate.to_dict(),
786 name=task_name,
787 eta=eta,
788 countdown=countdown,
789 parent=shard_state,
790 headers=headers)
791 return worker_task
793 @classmethod
794 def _add_task(cls,
795 worker_task,
796 mapreduce_spec,
797 queue_name):
798 """Schedule slice scanning by adding it to the task queue.
800 Args:
801 worker_task: a model.HugeTask task for slice. This is NOT a taskqueue
802 task.
803 mapreduce_spec: an instance of model.MapreduceSpec.
804 queue_name: Optional queue to run on; uses the current queue of
805 execution or the default queue if unspecified.
807 if not _run_task_hook(mapreduce_spec.get_hooks(),
808 "enqueue_worker_task",
809 worker_task,
810 queue_name):
811 try:
814 worker_task.add(queue_name)
815 except (taskqueue.TombstonedTaskError,
816 taskqueue.TaskAlreadyExistsError), e:
817 logging.warning("Task %r already exists. %s: %s",
818 worker_task.name,
819 e.__class__,
822 def _processing_limit(self, spec):
823 """Get the limit on the number of map calls allowed by this slice.
825 Args:
826 spec: a Mapreduce spec.
828 Returns:
829 The limit as a positive int if specified by user. -1 otherwise.
831 processing_rate = float(spec.mapper.params.get("processing_rate", 0))
832 slice_processing_limit = -1
833 if processing_rate > 0:
834 slice_processing_limit = int(math.ceil(
835 parameters.config._SLICE_DURATION_SEC*processing_rate/
836 int(spec.mapper.shard_count)))
837 return slice_processing_limit
841 @classmethod
842 def _schedule_slice(cls,
843 shard_state,
844 tstate,
845 queue_name=None,
846 eta=None,
847 countdown=None):
848 """Schedule slice scanning by adding it to the task queue.
850 Args:
851 shard_state: An instance of ShardState.
852 tstate: An instance of TransientShardState.
853 queue_name: Optional queue to run on; uses the current queue of
854 execution or the default queue if unspecified.
855 eta: Absolute time when the MR should execute. May not be specified
856 if 'countdown' is also supplied. This may be timezone-aware or
857 timezone-naive.
858 countdown: Time in seconds into the future that this MR should execute.
859 Defaults to zero.
861 queue_name = queue_name or os.environ.get("HTTP_X_APPENGINE_QUEUENAME",
862 "default")
863 task = cls._state_to_task(tstate, shard_state, eta, countdown)
864 cls._add_task(task, tstate.mapreduce_spec, queue_name)
867 class ControllerCallbackHandler(base_handler.HugeTaskHandler):
868 """Supervises mapreduce execution.
870 Is also responsible for gathering execution status from shards together.
872 This task is "continuously" running by adding itself again to taskqueue if
873 and only if mapreduce is still active. A mapreduce is active if it has
874 actively running shards.
877 def __init__(self, *args):
878 """Constructor."""
879 super(ControllerCallbackHandler, self).__init__(*args)
880 self._time = time.time
882 def _drop_gracefully(self):
883 """Gracefully drop controller task.
885 This method is called when decoding controller task payload failed.
886 Upon this we mark ShardState and MapreduceState as failed so all
887 tasks can stop.
889 Writing to datastore is forced (ignore read-only mode) because we
890 want the tasks to stop badly, and if force_writes was False,
891 the job would have never been started.
893 mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
894 state = model.MapreduceState.get_by_job_id(mr_id)
895 if not state or not state.active:
896 return
898 state.active = False
899 state.result_status = model.MapreduceState.RESULT_FAILED
900 config = util.create_datastore_write_config(state.mapreduce_spec)
901 puts = []
902 for ss in model.ShardState.find_all_by_mapreduce_state(state):
903 if ss.active:
904 ss.set_for_failure()
905 puts.append(ss)
907 if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
908 db.put(puts, config=config)
909 puts = []
910 db.put(puts, config=config)
912 db.put(state, config=config)
914 def handle(self):
915 """Handle request."""
916 spec = model.MapreduceSpec.from_json_str(
917 self.request.get("mapreduce_spec"))
918 state, control = db.get([
919 model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
920 model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
923 if not state:
924 logging.warning("State not found for MR '%s'; dropping controller task.",
925 spec.mapreduce_id)
926 return
927 if not state.active:
928 logging.warning(
929 "MR %r is not active. Looks like spurious controller task execution.",
930 spec.mapreduce_id)
931 self._clean_up_mr(spec, self.base_path())
932 return
934 shard_states = model.ShardState.find_all_by_mapreduce_state(state)
935 self._update_state_from_shard_states(state, shard_states, control)
937 if state.active:
938 ControllerCallbackHandler.reschedule(
939 state, self.base_path(), spec, self.serial_id() + 1)
941 def _update_state_from_shard_states(self, state, shard_states, control):
942 """Update mr state by examing shard states.
944 Args:
945 state: current mapreduce state as MapreduceState.
946 shard_states: an iterator over shard states.
947 control: model.MapreduceControl entity.
950 state.active_shards, state.aborted_shards, state.failed_shards = 0, 0, 0
951 total_shards = 0
952 processed_counts = []
953 state.counters_map.clear()
956 for s in shard_states:
957 total_shards += 1
958 if s.active:
959 state.active_shards += 1
960 if s.result_status == model.ShardState.RESULT_ABORTED:
961 state.aborted_shards += 1
962 elif s.result_status == model.ShardState.RESULT_FAILED:
963 state.failed_shards += 1
966 state.counters_map.add_map(s.counters_map)
967 processed_counts.append(s.counters_map.get(context.COUNTER_MAPPER_CALLS))
969 state.set_processed_counts(processed_counts)
970 state.last_poll_time = datetime.datetime.utcfromtimestamp(self._time())
972 spec = state.mapreduce_spec
974 if total_shards != spec.mapper.shard_count:
975 logging.error("Found %d shard states. Expect %d. "
976 "Issuing abort command to job '%s'",
977 total_shards, spec.mapper.shard_count,
978 spec.mapreduce_id)
980 model.MapreduceControl.abort(spec.mapreduce_id)
984 state.active = bool(state.active_shards)
985 if not control and (state.failed_shards or state.aborted_shards):
987 model.MapreduceControl.abort(spec.mapreduce_id)
989 if not state.active:
991 if state.failed_shards or not total_shards:
992 state.result_status = model.MapreduceState.RESULT_FAILED
995 elif state.aborted_shards:
996 state.result_status = model.MapreduceState.RESULT_ABORTED
997 else:
998 state.result_status = model.MapreduceState.RESULT_SUCCESS
999 self._finalize_outputs(spec, state)
1000 self._finalize_job(spec, state, self.base_path())
1001 else:
1002 @db.transactional(retries=5)
1003 def _put_state():
1004 fresh_state = model.MapreduceState.get_by_job_id(spec.mapreduce_id)
1007 if not fresh_state.active:
1008 logging.warning(
1009 "Job %s is not active. Looks like spurious task execution. "
1010 "Dropping controller task.", spec.mapreduce_id)
1011 return
1012 config = util.create_datastore_write_config(spec)
1013 state.put(config=config)
1015 _put_state()
1017 def serial_id(self):
1018 """Get serial unique identifier of this task from request.
1020 Returns:
1021 serial identifier as int.
1023 return int(self.request.get("serial_id"))
1025 @classmethod
1026 def _finalize_outputs(cls, mapreduce_spec, mapreduce_state):
1027 """Finalize outputs.
1029 Args:
1030 mapreduce_spec: an instance of MapreduceSpec.
1031 mapreduce_state: an instance of MapreduceState.
1034 if (mapreduce_spec.mapper.output_writer_class() and
1035 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
1036 mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
1038 @classmethod
1039 def _finalize_job(cls, mapreduce_spec, mapreduce_state, base_path):
1040 """Finalize job execution.
1042 Invokes done callback and save mapreduce state in a transaction,
1043 and schedule necessary clean ups. This method is idempotent.
1045 Args:
1046 mapreduce_spec: an instance of MapreduceSpec
1047 mapreduce_state: an instance of MapreduceState
1048 base_path: handler_base path.
1050 config = util.create_datastore_write_config(mapreduce_spec)
1051 queue_name = util.get_queue_name(mapreduce_spec.params.get(
1052 model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
1053 done_callback = mapreduce_spec.params.get(
1054 model.MapreduceSpec.PARAM_DONE_CALLBACK)
1055 done_task = None
1056 if done_callback:
1057 done_task = taskqueue.Task(
1058 url=done_callback,
1059 headers=util._get_task_headers(mapreduce_spec,
1060 util.CALLBACK_MR_ID_TASK_HEADER),
1061 method=mapreduce_spec.params.get("done_callback_method", "POST"))
1063 @db.transactional(retries=5)
1064 def _put_state():
1065 fresh_state = model.MapreduceState.get_by_job_id(
1066 mapreduce_spec.mapreduce_id)
1067 if not fresh_state.active:
1068 logging.warning(
1069 "Job %s is not active. Looks like spurious task execution. "
1070 "Dropping task.", mapreduce_spec.mapreduce_id)
1071 return
1072 mapreduce_state.put(config=config)
1074 if done_task and not _run_task_hook(
1075 mapreduce_spec.get_hooks(),
1076 "enqueue_done_task",
1077 done_task,
1078 queue_name):
1079 done_task.add(queue_name, transactional=True)
1081 _put_state()
1082 logging.info("Final result for job '%s' is '%s'",
1083 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
1084 cls._clean_up_mr(mapreduce_spec, base_path)
1086 @classmethod
1087 def _clean_up_mr(cls, mapreduce_spec, base_path):
1088 FinalizeJobHandler.schedule(base_path, mapreduce_spec)
1090 @staticmethod
1091 def get_task_name(mapreduce_spec, serial_id):
1092 """Compute single controller task name.
1094 Args:
1095 transient_shard_state: an instance of TransientShardState.
1097 Returns:
1098 task name which should be used to process specified shard/slice.
1102 return "appengine-mrcontrol-%s-%s" % (
1103 mapreduce_spec.mapreduce_id, serial_id)
1105 @staticmethod
1106 def controller_parameters(mapreduce_spec, serial_id):
1107 """Fill in controller task parameters.
1109 Returned parameters map is to be used as task payload, and it contains
1110 all the data, required by controller to perform its function.
1112 Args:
1113 mapreduce_spec: specification of the mapreduce.
1114 serial_id: id of the invocation as int.
1116 Returns:
1117 string->string map of parameters to be used as task payload.
1119 return {"mapreduce_spec": mapreduce_spec.to_json_str(),
1120 "serial_id": str(serial_id)}
1122 @classmethod
1123 def reschedule(cls,
1124 mapreduce_state,
1125 base_path,
1126 mapreduce_spec,
1127 serial_id,
1128 queue_name=None):
1129 """Schedule new update status callback task.
1131 Args:
1132 mapreduce_state: mapreduce state as model.MapreduceState
1133 base_path: mapreduce handlers url base path as string.
1134 mapreduce_spec: mapreduce specification as MapreduceSpec.
1135 serial_id: id of the invocation as int.
1136 queue_name: The queue to schedule this task on. Will use the current
1137 queue of execution if not supplied.
1139 task_name = ControllerCallbackHandler.get_task_name(
1140 mapreduce_spec, serial_id)
1141 task_params = ControllerCallbackHandler.controller_parameters(
1142 mapreduce_spec, serial_id)
1143 if not queue_name:
1144 queue_name = os.environ.get("HTTP_X_APPENGINE_QUEUENAME", "default")
1146 controller_callback_task = model.HugeTask(
1147 url=base_path + "/controller_callback",
1148 name=task_name, params=task_params,
1149 countdown=parameters.config._CONTROLLER_PERIOD_SEC,
1150 parent=mapreduce_state,
1151 headers=util._get_task_headers(mapreduce_spec))
1153 if not _run_task_hook(mapreduce_spec.get_hooks(),
1154 "enqueue_controller_task",
1155 controller_callback_task,
1156 queue_name):
1157 try:
1158 controller_callback_task.add(queue_name)
1159 except (taskqueue.TombstonedTaskError,
1160 taskqueue.TaskAlreadyExistsError), e:
1161 logging.warning("Task %r with params %r already exists. %s: %s",
1162 task_name, task_params, e.__class__, e)
1165 class KickOffJobHandler(base_handler.TaskQueueHandler):
1166 """Taskqueue handler which kicks off a mapreduce processing.
1168 This handler is idempotent.
1170 Precondition:
1171 The Model.MapreduceState entity for this mr is already created and
1172 saved to datastore by StartJobHandler._start_map.
1174 Request Parameters:
1175 mapreduce_id: in string.
1179 _SERIALIZED_INPUT_READERS_KEY = "input_readers_for_mr_%s"
1181 def handle(self):
1182 """Handles kick off request."""
1184 mr_id = self.request.get("mapreduce_id")
1186 logging.info("Processing kickoff for job %s", mr_id)
1187 state = model.MapreduceState.get_by_job_id(mr_id)
1188 if not self._check_mr_state(state, mr_id):
1189 return
1192 readers, serialized_readers_entity = self._get_input_readers(state)
1193 if readers is None:
1195 logging.warning("Found no mapper input data to process.")
1196 state.active = False
1197 state.result_status = model.MapreduceState.RESULT_SUCCESS
1198 ControllerCallbackHandler._finalize_job(
1199 state.mapreduce_spec, state, self.base_path())
1200 return False
1203 self._setup_output_writer(state)
1207 result = self._save_states(state, serialized_readers_entity)
1208 if result is None:
1209 readers, _ = self._get_input_readers(state)
1210 elif not result:
1211 return
1213 queue_name = self.request.headers.get("X-AppEngine-QueueName")
1214 KickOffJobHandler._schedule_shards(state.mapreduce_spec, readers,
1215 queue_name, self.base_path(), state)
1217 ControllerCallbackHandler.reschedule(
1218 state, self.base_path(), state.mapreduce_spec, serial_id=0,
1219 queue_name=queue_name)
1221 def _get_input_readers(self, state):
1222 """Get input readers.
1224 Args:
1225 state: a MapreduceState model.
1227 Returns:
1228 A tuple: (a list of input readers, a model._HugeTaskPayload entity).
1229 The payload entity contains the json serialized input readers.
1230 (None, None) when input reader inplitting returned no data to process.
1232 serialized_input_readers_key = (self._SERIALIZED_INPUT_READERS_KEY %
1233 state.key().id_or_name())
1234 serialized_input_readers = model._HugeTaskPayload.get_by_key_name(
1235 serialized_input_readers_key, parent=state)
1238 input_reader_class = state.mapreduce_spec.mapper.input_reader_class()
1239 if serialized_input_readers is None:
1240 readers = input_reader_class.split_input(
1241 state.mapreduce_spec.mapper)
1242 else:
1243 readers = [input_reader_class.from_json_str(json) for json in
1244 simplejson.loads(serialized_input_readers.payload)]
1246 if not readers:
1247 return None, None
1250 state.mapreduce_spec.mapper.shard_count = len(readers)
1251 state.active_shards = len(readers)
1254 if serialized_input_readers is None:
1256 serialized_input_readers = model._HugeTaskPayload(
1257 key_name=serialized_input_readers_key, parent=state)
1258 readers_json_str = [i.to_json_str() for i in readers]
1259 serialized_input_readers.payload = simplejson.dumps(readers_json_str)
1260 return readers, serialized_input_readers
1262 def _setup_output_writer(self, state):
1263 if not state.writer_state:
1264 output_writer_class = state.mapreduce_spec.mapper.output_writer_class()
1265 if output_writer_class:
1266 output_writer_class.init_job(state)
1268 @db.transactional
1269 def _save_states(self, state, serialized_readers_entity):
1270 """Run transaction to save state.
1272 Args:
1273 state: a model.MapreduceState entity.
1274 serialized_readers_entity: a model._HugeTaskPayload entity containing
1275 json serialized input readers.
1277 Returns:
1278 False if a fatal error is encountered and this task should be dropped
1279 immediately. True if transaction is successful. None if a previous
1280 attempt of this same transaction has already succeeded.
1282 mr_id = state.key().id_or_name()
1283 fresh_state = model.MapreduceState.get_by_job_id(mr_id)
1284 if not self._check_mr_state(fresh_state, mr_id):
1285 return False
1286 if fresh_state.active_shards != 0:
1287 logging.warning(
1288 "Mapreduce %s already has active shards. Looks like spurious task "
1289 "execution.", mr_id)
1290 return None
1291 config = util.create_datastore_write_config(state.mapreduce_spec)
1292 db.put([state, serialized_readers_entity], config=config)
1293 return True
1295 @classmethod
1296 def _schedule_shards(cls,
1297 spec,
1298 readers,
1299 queue_name,
1300 base_path,
1301 mr_state):
1302 """Prepares shard states and schedules their execution.
1304 Even though this method does not schedule shard task and save shard state
1305 transactionally, it's safe for taskqueue to retry this logic because
1306 the initial shard_state for each shard is the same from any retry.
1307 This is an important yet reasonable assumption on model.ShardState.
1309 Args:
1310 spec: mapreduce specification as MapreduceSpec.
1311 readers: list of InputReaders describing shard splits.
1312 queue_name: The queue to run this job on.
1313 base_path: The base url path of mapreduce callbacks.
1314 mr_state: The MapReduceState of current job.
1317 shard_states = []
1318 for shard_number, input_reader in enumerate(readers):
1319 shard_state = model.ShardState.create_new(spec.mapreduce_id, shard_number)
1320 shard_state.shard_description = str(input_reader)
1321 shard_states.append(shard_state)
1324 existing_shard_states = db.get(shard.key() for shard in shard_states)
1325 existing_shard_keys = set(shard.key() for shard in existing_shard_states
1326 if shard is not None)
1329 db.put((shard for shard in shard_states
1330 if shard.key() not in existing_shard_keys),
1331 config=util.create_datastore_write_config(spec))
1334 writer_class = spec.mapper.output_writer_class()
1335 writers = [None] * len(readers)
1336 if writer_class:
1337 for shard_number, shard_state in enumerate(shard_states):
1338 writers[shard_number] = writer_class.create(mr_state, shard_state)
1343 for shard_number, (input_reader, output_writer) in enumerate(
1344 zip(readers, writers)):
1345 shard_id = model.ShardState.shard_id_from_number(
1346 spec.mapreduce_id, shard_number)
1347 task = MapperWorkerCallbackHandler._state_to_task(
1348 model.TransientShardState(
1349 base_path, spec, shard_id, 0, input_reader, input_reader,
1350 output_writer=output_writer,
1351 handler=spec.mapper.handler),
1352 shard_states[shard_number])
1353 MapperWorkerCallbackHandler._add_task(task,
1354 spec,
1355 queue_name)
1357 @classmethod
1358 def _check_mr_state(cls, state, mr_id):
1359 """Check MapreduceState.
1361 Args:
1362 state: an MapreduceState instance.
1363 mr_id: mapreduce id.
1365 Returns:
1366 True if state is valid. False if not and this task should be dropped.
1368 if state is None:
1369 logging.warning(
1370 "Mapreduce State for job %s is missing. Dropping Task.",
1371 mr_id)
1372 return False
1373 if not state.active:
1374 logging.warning(
1375 "Mapreduce %s is not active. Looks like spurious task "
1376 "execution. Dropping Task.", mr_id)
1377 return False
1378 return True
1381 class StartJobHandler(base_handler.PostJsonHandler):
1382 """Command handler starts a mapreduce job.
1384 This handler allows user to start a mr via a web form. It's _start_map
1385 method can also be used independently to start a mapreduce.
1388 def handle(self):
1389 """Handles start request."""
1391 mapreduce_name = self._get_required_param("name")
1392 mapper_input_reader_spec = self._get_required_param("mapper_input_reader")
1393 mapper_handler_spec = self._get_required_param("mapper_handler")
1394 mapper_output_writer_spec = self.request.get("mapper_output_writer")
1395 mapper_params = self._get_params(
1396 "mapper_params_validator", "mapper_params.")
1397 params = self._get_params(
1398 "params_validator", "params.")
1401 mapper_params["processing_rate"] = int(mapper_params.get(
1402 "processing_rate") or parameters.config.PROCESSING_RATE_PER_SEC)
1403 queue_name = mapper_params["queue_name"] = util.get_queue_name(
1404 mapper_params.get("queue_name", None))
1407 mapper_spec = model.MapperSpec(
1408 mapper_handler_spec,
1409 mapper_input_reader_spec,
1410 mapper_params,
1411 int(mapper_params.get("shard_count", parameters.config.SHARD_COUNT)),
1412 output_writer_spec=mapper_output_writer_spec)
1414 mapreduce_id = type(self)._start_map(
1415 mapreduce_name,
1416 mapper_spec,
1417 params,
1418 base_path=self.base_path(),
1419 queue_name=queue_name,
1420 _app=mapper_params.get("_app"))
1421 self.json_response["mapreduce_id"] = mapreduce_id
1423 def _get_params(self, validator_parameter, name_prefix):
1424 """Retrieves additional user-supplied params for the job and validates them.
1426 Args:
1427 validator_parameter: name of the request parameter which supplies
1428 validator for this parameter set.
1429 name_prefix: common prefix for all parameter names in the request.
1431 Raises:
1432 Any exception raised by the 'params_validator' request parameter if
1433 the params fail to validate.
1435 params_validator = self.request.get(validator_parameter)
1437 user_params = {}
1438 for key in self.request.arguments():
1439 if key.startswith(name_prefix):
1440 values = self.request.get_all(key)
1441 adjusted_key = key[len(name_prefix):]
1442 if len(values) == 1:
1443 user_params[adjusted_key] = values[0]
1444 else:
1445 user_params[adjusted_key] = values
1447 if params_validator:
1448 resolved_validator = util.for_name(params_validator)
1449 resolved_validator(user_params)
1451 return user_params
1453 def _get_required_param(self, param_name):
1454 """Get a required request parameter.
1456 Args:
1457 param_name: name of request parameter to fetch.
1459 Returns:
1460 parameter value
1462 Raises:
1463 errors.NotEnoughArgumentsError: if parameter is not specified.
1465 value = self.request.get(param_name)
1466 if not value:
1467 raise errors.NotEnoughArgumentsError(param_name + " not specified")
1468 return value
1470 @classmethod
1471 def _start_map(cls,
1472 name,
1473 mapper_spec,
1474 mapreduce_params,
1475 base_path,
1476 queue_name,
1477 eta=None,
1478 countdown=None,
1479 hooks_class_name=None,
1480 _app=None,
1481 in_xg_transaction=False):
1484 """See control.start_map.
1486 Requirements for this method:
1487 1. The request that invokes this method can either be regular or
1488 from taskqueue. So taskqueue specific headers can not be used.
1489 2. Each invocation transactionally starts an isolated mapreduce job with
1490 a unique id. MapreduceState should be immediately available after
1491 returning. See control.start_map's doc on transactional.
1492 3. Method should be lightweight.
1495 mapper_input_reader_class = mapper_spec.input_reader_class()
1496 mapper_input_reader_class.validate(mapper_spec)
1499 mapper_output_writer_class = mapper_spec.output_writer_class()
1500 if mapper_output_writer_class:
1501 mapper_output_writer_class.validate(mapper_spec)
1504 mapreduce_id = model.MapreduceState.new_mapreduce_id()
1505 mapreduce_spec = model.MapreduceSpec(
1506 name,
1507 mapreduce_id,
1508 mapper_spec.to_json(),
1509 mapreduce_params,
1510 hooks_class_name)
1513 ctx = context.Context(mapreduce_spec, None)
1514 context.Context._set(ctx)
1515 try:
1517 mapper_spec.handler
1518 finally:
1519 context.Context._set(None)
1522 if in_xg_transaction:
1523 propagation = db.MANDATORY
1524 else:
1525 propagation = db.INDEPENDENT
1527 @db.transactional(propagation=propagation)
1528 def _txn():
1529 cls._create_and_save_state(mapreduce_spec, _app)
1530 cls._add_kickoff_task(base_path, mapreduce_spec, eta,
1531 countdown, queue_name)
1532 _txn()
1534 return mapreduce_id
1536 @classmethod
1537 def _create_and_save_state(cls, mapreduce_spec, _app):
1538 """Save mapreduce state to datastore.
1540 Save state to datastore so that UI can see it immediately.
1542 Args:
1543 mapreduce_spec: model.MapreduceSpec,
1544 _app: app id if specified. None otherwise.
1546 Returns:
1547 The saved Mapreduce state.
1549 state = model.MapreduceState.create_new(mapreduce_spec.mapreduce_id)
1550 state.mapreduce_spec = mapreduce_spec
1551 state.active = True
1552 state.active_shards = 0
1553 if _app:
1554 state.app_id = _app
1555 config = util.create_datastore_write_config(mapreduce_spec)
1556 state.put(config=config)
1557 return state
1559 @classmethod
1560 def _add_kickoff_task(cls,
1561 base_path,
1562 mapreduce_spec,
1563 eta,
1564 countdown,
1565 queue_name):
1566 params = {"mapreduce_id": mapreduce_spec.mapreduce_id}
1568 kickoff_task = taskqueue.Task(
1569 url=base_path + "/kickoffjob_callback",
1570 headers=util._get_task_headers(mapreduce_spec),
1571 params=params,
1572 eta=eta,
1573 countdown=countdown)
1574 hooks = mapreduce_spec.get_hooks()
1575 if hooks is not None:
1576 try:
1577 hooks.enqueue_kickoff_task(kickoff_task, queue_name)
1578 return
1579 except NotImplementedError:
1580 pass
1581 kickoff_task.add(queue_name, transactional=True)
1584 class FinalizeJobHandler(base_handler.TaskQueueHandler):
1585 """Finalize map job by deleting all temporary entities."""
1587 def handle(self):
1588 mapreduce_id = self.request.get("mapreduce_id")
1589 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1590 if mapreduce_state:
1591 config=util.create_datastore_write_config(mapreduce_state.mapreduce_spec)
1592 keys = [model.MapreduceControl.get_key_by_job_id(mapreduce_id)]
1593 for ss in model.ShardState.find_all_by_mapreduce_state(mapreduce_state):
1594 keys.extend(list(
1595 model._HugeTaskPayload.all().ancestor(ss).run(keys_only=True)))
1596 keys.extend(list(model._HugeTaskPayload.all().ancestor(
1597 mapreduce_state).run(keys_only=True)))
1598 db.delete(keys, config=config)
1600 @classmethod
1601 def schedule(cls, base_path, mapreduce_spec):
1602 """Schedule finalize task.
1604 Args:
1605 mapreduce_spec: mapreduce specification as MapreduceSpec.
1607 task_name = mapreduce_spec.mapreduce_id + "-finalize"
1608 finalize_task = taskqueue.Task(
1609 name=task_name,
1610 url=base_path + "/finalizejob_callback",
1611 params={"mapreduce_id": mapreduce_spec.mapreduce_id},
1612 headers=util._get_task_headers(mapreduce_spec))
1613 queue_name = util.get_queue_name(None)
1614 if not _run_task_hook(mapreduce_spec.get_hooks(),
1615 "enqueue_controller_task",
1616 finalize_task,
1617 queue_name):
1618 try:
1619 finalize_task.add(queue_name)
1620 except (taskqueue.TombstonedTaskError,
1621 taskqueue.TaskAlreadyExistsError), e:
1622 logging.warning("Task %r already exists. %s: %s",
1623 task_name, e.__class__, e)
1626 class CleanUpJobHandler(base_handler.PostJsonHandler):
1627 """Command to kick off tasks to clean up a job's data."""
1629 def handle(self):
1630 mapreduce_id = self.request.get("mapreduce_id")
1632 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
1633 if mapreduce_state:
1634 shard_keys = model.ShardState.calculate_keys_by_mapreduce_state(
1635 mapreduce_state)
1636 db.delete(shard_keys)
1637 db.delete(mapreduce_state)
1638 self.json_response["status"] = ("Job %s successfully cleaned up." %
1639 mapreduce_id)
1642 class AbortJobHandler(base_handler.PostJsonHandler):
1643 """Command to abort a running job."""
1645 def handle(self):
1646 model.MapreduceControl.abort(self.request.get("mapreduce_id"))
1647 self.json_response["status"] = "Abort signal sent."